You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by se...@apache.org on 2015/12/10 01:03:42 UTC

[1/2] hive git commit: HIVE-12341 : LLAP: add security to daemon protocol endpoint (excluding shuffle) (Sergey Shelukhin, reviewed by Siddharth Seth, Lefty Leverenz)

Repository: hive
Updated Branches:
  refs/heads/master 74e5c75e4 -> 915587b8c


http://git-wip-us.apache.org/repos/asf/hive/blob/915587b8/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemonProtocolServerImpl.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemonProtocolServerImpl.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemonProtocolServerImpl.java
index 784c631..db0b752 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemonProtocolServerImpl.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemonProtocolServerImpl.java
@@ -16,17 +16,26 @@ package org.apache.hadoop.hive.llap.daemon.impl;
 
 import java.io.IOException;
 import java.net.InetSocketAddress;
+import java.security.PrivilegedAction;
 import java.util.concurrent.atomic.AtomicReference;
 
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.io.ByteArrayDataOutput;
+import com.google.common.io.ByteStreams;
 import com.google.protobuf.BlockingService;
+import com.google.protobuf.ByteString;
 import com.google.protobuf.RpcController;
 import com.google.protobuf.ServiceException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
 import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos;
+import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.GetTokenRequestProto;
+import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.GetTokenResponseProto;
 import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.QueryCompleteRequestProto;
 import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.QueryCompleteResponseProto;
 import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SourceStateUpdatedRequestProto;
@@ -35,40 +44,52 @@ import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmitWor
 import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmitWorkResponseProto;
 import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.TerminateFragmentRequestProto;
 import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.TerminateFragmentResponseProto;
+import org.apache.hadoop.io.Text;
 import org.apache.hadoop.ipc.ProtobufRpcEngine;
 import org.apache.hadoop.ipc.RPC;
 import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.hive.llap.security.LlapSecurityHelper;
+import org.apache.hadoop.hive.llap.security.LlapTokenIdentifier;
+import org.apache.hadoop.hive.llap.security.SecretManager;
 import org.apache.hadoop.service.AbstractService;
 import org.apache.hadoop.hive.llap.daemon.ContainerRunner;
 import org.apache.hadoop.hive.llap.daemon.LlapDaemonProtocolBlockingPB;
+import org.apache.hadoop.hive.llap.daemon.LlapManagementProtocolBlockingPB;
+import org.apache.hadoop.hive.llap.security.LlapDaemonPolicyProvider;
 
 public class LlapDaemonProtocolServerImpl extends AbstractService
-    implements LlapDaemonProtocolBlockingPB {
+    implements LlapDaemonProtocolBlockingPB, LlapManagementProtocolBlockingPB {
 
   private static final Logger LOG = LoggerFactory.getLogger(LlapDaemonProtocolServerImpl.class);
 
   private final int numHandlers;
   private final ContainerRunner containerRunner;
-  private final int configuredPort;
-  private RPC.Server server;
-  private final AtomicReference<InetSocketAddress> bindAddress;
-
+  private final int srvPort, mngPort;
+  private RPC.Server server, mngServer;
+  private final AtomicReference<InetSocketAddress> srvAddress, mngAddress;
+  private SecretManager zkSecretManager;
 
   public LlapDaemonProtocolServerImpl(int numHandlers,
                                       ContainerRunner containerRunner,
-                                      AtomicReference<InetSocketAddress> address,
-                                      int configuredPort) {
+                                      AtomicReference<InetSocketAddress> srvAddress,
+                                      AtomicReference<InetSocketAddress> mngAddress,
+                                      int srvPort,
+                                      int mngPort) {
     super("LlapDaemonProtocolServerImpl");
     this.numHandlers = numHandlers;
     this.containerRunner = containerRunner;
-    this.bindAddress = address;
-    this.configuredPort = configuredPort;
+    this.srvAddress = srvAddress;
+    this.srvPort = srvPort;
+    this.mngAddress = mngAddress;
+    this.mngPort = mngPort;
     LOG.info("Creating: " + LlapDaemonProtocolServerImpl.class.getSimpleName() +
-        " with port configured to: " + configuredPort);
+        " with port configured to: " + srvPort);
   }
 
   @Override
-  public SubmitWorkResponseProto submitWork (RpcController controller,
+  public SubmitWorkResponseProto submitWork(RpcController controller,
                                             SubmitWorkRequestProto request) throws
       ServiceException {
     try {
@@ -81,16 +102,14 @@ public class LlapDaemonProtocolServerImpl extends AbstractService
 
   @Override
   public SourceStateUpdatedResponseProto sourceStateUpdated(RpcController controller,
-                                                            SourceStateUpdatedRequestProto request) throws
-      ServiceException {
+      SourceStateUpdatedRequestProto request) throws ServiceException {
     containerRunner.sourceStateUpdated(request);
     return SourceStateUpdatedResponseProto.getDefaultInstance();
   }
 
   @Override
   public QueryCompleteResponseProto queryComplete(RpcController controller,
-                                                  QueryCompleteRequestProto request) throws
-      ServiceException {
+      QueryCompleteRequestProto request) throws ServiceException {
     containerRunner.queryComplete(request);
     return QueryCompleteResponseProto.getDefaultInstance();
   }
@@ -105,24 +124,62 @@ public class LlapDaemonProtocolServerImpl extends AbstractService
 
   @Override
   public void serviceStart() {
-    Configuration conf = getConfig();
+    final Configuration conf = getConfig();
+    final BlockingService daemonImpl =
+        LlapDaemonProtocolProtos.LlapDaemonProtocol.newReflectiveBlockingService(this);
+    final BlockingService managementImpl =
+        LlapDaemonProtocolProtos.LlapManagementProtocol.newReflectiveBlockingService(this);
+    if (!UserGroupInformation.isSecurityEnabled()) {
+      startProtocolServers(conf, daemonImpl, managementImpl);
+      return;
+    }
+    String llapPrincipal = HiveConf.getVar(conf, ConfVars.LLAP_KERBEROS_PRINCIPAL),
+        llapKeytab = HiveConf.getVar(conf, ConfVars.LLAP_KERBEROS_KEYTAB_FILE);
+    zkSecretManager = SecretManager.createSecretManager(conf, llapPrincipal, llapKeytab);
 
-    InetSocketAddress addr = new InetSocketAddress(configuredPort);
+    // Start the protocol server after properly authenticating with daemon keytab.
+    UserGroupInformation daemonUgi = null;
     try {
-      server = createServer(LlapDaemonProtocolBlockingPB.class, addr, conf, numHandlers,
-          LlapDaemonProtocolProtos.LlapDaemonProtocol.newReflectiveBlockingService(this));
+      daemonUgi = LlapSecurityHelper.loginWithKerberos(llapPrincipal, llapKeytab);
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+    daemonUgi.doAs(new PrivilegedAction<Void>() {
+       @Override
+      public Void run() {
+         startProtocolServers(conf, daemonImpl, managementImpl);
+         return null;
+      }
+    });
+  }
+
+  private void startProtocolServers(
+      Configuration conf, BlockingService daemonImpl, BlockingService managementImpl) {
+    server = startProtocolServer(srvPort, numHandlers, srvAddress, conf, daemonImpl,
+        LlapDaemonProtocolBlockingPB.class);
+    mngServer = startProtocolServer(mngPort, 2, mngAddress, conf, managementImpl,
+        LlapManagementProtocolBlockingPB.class);
+  }
+
+  private RPC.Server startProtocolServer(int srvPort, int numHandlers,
+      AtomicReference<InetSocketAddress> bindAddress, Configuration conf,
+      BlockingService impl, Class<?> protocolClass) {
+    InetSocketAddress addr = new InetSocketAddress(srvPort);
+    RPC.Server server;
+    try {
+      server = createServer(protocolClass, addr, conf, numHandlers, impl);
       server.start();
     } catch (IOException e) {
-      LOG.error("Failed to run RPC Server on port: " + configuredPort, e);
+      LOG.error("Failed to run RPC Server on port: " + srvPort, e);
       throw new RuntimeException(e);
     }
 
     InetSocketAddress serverBindAddress = NetUtils.getConnectAddress(server);
-    this.bindAddress.set(NetUtils.createSocketAddrForHost(
+    bindAddress.set(NetUtils.createSocketAddrForHost(
         serverBindAddress.getAddress().getCanonicalHostName(),
         serverBindAddress.getPort()));
-    LOG.info("Instantiated " + LlapDaemonProtocolBlockingPB.class.getSimpleName() + " at " +
-        bindAddress);
+    LOG.info("Instantiated " + protocolClass.getSimpleName() + " at " + bindAddress);
+    return server;
   }
 
   @Override
@@ -130,26 +187,68 @@ public class LlapDaemonProtocolServerImpl extends AbstractService
     if (server != null) {
       server.stop();
     }
+    if (mngServer != null) {
+      mngServer.stop();
+    }
   }
 
   @InterfaceAudience.Private
   @VisibleForTesting
   InetSocketAddress getBindAddress() {
-    return bindAddress.get();
+    return srvAddress.get();
   }
 
   private RPC.Server createServer(Class<?> pbProtocol, InetSocketAddress addr, Configuration conf,
                                   int numHandlers, BlockingService blockingService) throws
       IOException {
     RPC.setProtocolEngine(conf, pbProtocol, ProtobufRpcEngine.class);
-    RPC.Server server = new RPC.Builder(conf)
+    RPC.Builder builder = new RPC.Builder(conf)
         .setProtocol(pbProtocol)
         .setInstance(blockingService)
         .setBindAddress(addr.getHostName())
         .setPort(addr.getPort())
-        .setNumHandlers(numHandlers)
-        .build();
-    // TODO Add security.
+        .setNumHandlers(numHandlers);
+    if (zkSecretManager != null) {
+      builder = builder.setSecretManager(zkSecretManager);
+    }
+    RPC.Server server = builder.build();
+
+    if (conf.getBoolean(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHORIZATION, false)) {
+      server.refreshServiceAcl(conf, new LlapDaemonPolicyProvider());
+    }
     return server;
   }
+
+  @Override
+  public GetTokenResponseProto getDelegationToken(RpcController controller,
+      GetTokenRequestProto request) throws ServiceException {
+    if (zkSecretManager == null) {
+      throw new ServiceException("Operation not supported on unsecure cluster");
+    }
+    UserGroupInformation ugi;
+    try {
+      ugi = UserGroupInformation.getCurrentUser();
+    } catch (IOException e) {
+      throw new ServiceException(e);
+    }
+    String user = ugi.getUserName();
+    Text owner = new Text(user);
+    Text realUser = null;
+    if (ugi.getRealUser() != null) {
+      realUser = new Text(ugi.getRealUser().getUserName());
+    }
+    Text renewer = new Text(ugi.getShortUserName());
+    LlapTokenIdentifier llapId = new LlapTokenIdentifier(owner, renewer, realUser);
+    // TODO: note that the token is not renewable right now and will last for 2 weeks by default.
+    Token<LlapTokenIdentifier> token = new Token<LlapTokenIdentifier>(llapId, zkSecretManager);
+    ByteArrayDataOutput out = ByteStreams.newDataOutput();
+    try {
+      token.write(out);
+    } catch (IOException e) {
+      throw new ServiceException(e);
+    }
+    ByteString bs = ByteString.copyFrom(out.toByteArray());
+    GetTokenResponseProto response = GetTokenResponseProto.newBuilder().setToken(bs).build();
+    return response;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/915587b8/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapManagementProtocolClientImpl.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapManagementProtocolClientImpl.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapManagementProtocolClientImpl.java
new file mode 100644
index 0000000..e293a95
--- /dev/null
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapManagementProtocolClientImpl.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.llap.daemon.impl;
+
+import javax.annotation.Nullable;
+import javax.net.SocketFactory;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+
+import com.google.protobuf.RpcController;
+import com.google.protobuf.ServiceException;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.retry.RetryPolicy;
+import org.apache.hadoop.ipc.ProtobufRpcEngine;
+import org.apache.hadoop.ipc.ProtocolProxy;
+import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.hive.llap.daemon.LlapManagementProtocolBlockingPB;
+import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.GetTokenRequestProto;
+import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.GetTokenResponseProto;
+import org.apache.hadoop.security.UserGroupInformation;
+
+public class LlapManagementProtocolClientImpl implements LlapManagementProtocolBlockingPB {
+
+  private final Configuration conf;
+  private final InetSocketAddress serverAddr;
+  private final RetryPolicy retryPolicy;
+  private final SocketFactory socketFactory;
+  LlapManagementProtocolBlockingPB proxy;
+
+
+  public LlapManagementProtocolClientImpl(Configuration conf, String hostname, int port,
+                                      @Nullable RetryPolicy retryPolicy,
+                                      @Nullable SocketFactory socketFactory) {
+    this.conf = conf;
+    this.serverAddr = NetUtils.createSocketAddr(hostname, port);
+    this.retryPolicy = retryPolicy;
+    if (socketFactory == null) {
+      this.socketFactory = NetUtils.getDefaultSocketFactory(conf);
+    } else {
+      this.socketFactory = socketFactory;
+    }
+  }
+
+  public LlapManagementProtocolBlockingPB getProxy() throws IOException {
+    if (proxy == null) {
+      proxy = createProxy();
+    }
+    return proxy;
+  }
+
+  public LlapManagementProtocolBlockingPB createProxy() throws IOException {
+    RPC.setProtocolEngine(conf, LlapManagementProtocolBlockingPB.class, ProtobufRpcEngine.class);
+    ProtocolProxy<LlapManagementProtocolBlockingPB> proxy =
+        RPC.getProtocolProxy(LlapManagementProtocolBlockingPB.class, 0, serverAddr,
+            UserGroupInformation.getCurrentUser(), conf, NetUtils.getDefaultSocketFactory(conf), 0,
+            retryPolicy);
+    return proxy.getProxy();
+  }
+
+  @Override
+  public GetTokenResponseProto getDelegationToken(RpcController controller,
+      GetTokenRequestProto request) throws ServiceException {
+    try {
+      return getProxy().getDelegationToken(null, request);
+    } catch (IOException e) {
+      throw new ServiceException(e);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/915587b8/llap-server/src/java/org/apache/hadoop/hive/llap/protocol/LlapTaskUmbilicalProtocol.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/protocol/LlapTaskUmbilicalProtocol.java b/llap-server/src/java/org/apache/hadoop/hive/llap/protocol/LlapTaskUmbilicalProtocol.java
index fae7654..9549567 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/protocol/LlapTaskUmbilicalProtocol.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/protocol/LlapTaskUmbilicalProtocol.java
@@ -18,11 +18,14 @@ import java.io.IOException;
 
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.ipc.VersionedProtocol;
+import org.apache.hadoop.security.token.TokenInfo;
 import org.apache.tez.dag.api.TezException;
 import org.apache.tez.dag.records.TezTaskAttemptID;
 import org.apache.tez.runtime.api.impl.TezHeartbeatRequest;
 import org.apache.tez.runtime.api.impl.TezHeartbeatResponse;
+import org.apache.tez.runtime.common.security.JobTokenSelector;
 
+@TokenInfo(JobTokenSelector.class)
 public interface LlapTaskUmbilicalProtocol extends VersionedProtocol {
 
   public static final long versionID = 1L;

http://git-wip-us.apache.org/repos/asf/hive/blob/915587b8/llap-server/src/java/org/apache/hadoop/hive/llap/security/LlapDaemonPolicyProvider.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/security/LlapDaemonPolicyProvider.java b/llap-server/src/java/org/apache/hadoop/hive/llap/security/LlapDaemonPolicyProvider.java
new file mode 100644
index 0000000..d67647b
--- /dev/null
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/security/LlapDaemonPolicyProvider.java
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.llap.security;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.llap.daemon.LlapDaemonProtocolBlockingPB;
+import org.apache.hadoop.hive.llap.daemon.LlapManagementProtocolBlockingPB;
+import org.apache.hadoop.security.authorize.PolicyProvider;
+import org.apache.hadoop.security.authorize.Service;
+
+public class LlapDaemonPolicyProvider extends PolicyProvider {
+  private static final Service[] services = new Service[] {
+    new Service(HiveConf.ConfVars.LLAP_SECURITY_ACL.varname,
+        LlapDaemonProtocolBlockingPB.class),
+    new Service(HiveConf.ConfVars.LLAP_MANAGEMENT_ACL.varname,
+        LlapManagementProtocolBlockingPB.class)
+  };
+
+  @Override
+  public Service[] getServices() {
+    return services;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/915587b8/llap-server/src/java/org/apache/hadoop/hive/llap/security/LlapSecurityHelper.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/security/LlapSecurityHelper.java b/llap-server/src/java/org/apache/hadoop/hive/llap/security/LlapSecurityHelper.java
new file mode 100644
index 0000000..a00b631
--- /dev/null
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/security/LlapSecurityHelper.java
@@ -0,0 +1,155 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.llap.security;
+
+import java.io.IOException;
+import java.security.PrivilegedExceptionAction;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import javax.net.SocketFactory;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.llap.daemon.impl.LlapManagementProtocolClientImpl;
+import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.GetTokenRequestProto;
+import org.apache.hadoop.hive.llap.registry.impl.LlapRegistryService;
+import org.apache.hadoop.hive.llap.registry.ServiceInstance;
+import org.apache.hadoop.hive.llap.registry.ServiceInstanceSet;
+import org.apache.hadoop.io.DataInputByteBuffer;
+import org.apache.hadoop.io.retry.RetryPolicies;
+import org.apache.hadoop.io.retry.RetryPolicy;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.security.SecurityUtil;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.protobuf.ByteString;
+
+/** Individual instances of this class are not thread safe. */
+public class LlapSecurityHelper implements LlapTokenProvider {
+  private static final Logger LOG = LoggerFactory.getLogger(LlapSecurityHelper.class);
+
+  private UserGroupInformation llapUgi;
+
+  private final LlapRegistryService registry;
+  private ServiceInstanceSet activeInstances;
+  private final Configuration conf;
+  private LlapManagementProtocolClientImpl client;
+
+  private final SocketFactory socketFactory;
+  private final RetryPolicy retryPolicy;
+
+  public LlapSecurityHelper(Configuration conf) {
+    this.conf = conf;
+    registry = new LlapRegistryService(false);
+    registry.init(conf);
+    socketFactory = NetUtils.getDefaultSocketFactory(conf);
+    retryPolicy = RetryPolicies.retryUpToMaximumTimeWithFixedSleep(
+        16000, 2000l, TimeUnit.MILLISECONDS);
+  }
+
+  public static UserGroupInformation loginWithKerberos(
+      String principal, String keytabFile) throws IOException {
+    if (!UserGroupInformation.isSecurityEnabled()) return null;
+    if (principal.isEmpty() || keytabFile.isEmpty()) {
+      throw new RuntimeException("Kerberos principal and/or keytab are empty");
+    }
+    LOG.info("Logging in as " + principal + " via " + keytabFile);
+    UserGroupInformation.loginUserFromKeytab(
+        SecurityUtil.getServerPrincipal(principal, "0.0.0.0"), keytabFile);
+    return UserGroupInformation.getLoginUser();
+  }
+
+  @Override
+  public Token<LlapTokenIdentifier> getDelegationToken() throws IOException {
+    if (!UserGroupInformation.isSecurityEnabled()) return null;
+    if (llapUgi == null) {
+      llapUgi = UserGroupInformation.getCurrentUser();
+      // We could have also added keytab support; right now client must do smth like kinit.
+    }
+    Iterator<ServiceInstance> llaps = null;
+    ServiceInstance someLlap = null;
+    if (client == null) {
+      llaps = getLlapServices(false);
+      someLlap = llaps.next();
+    }
+
+    ByteString tokenBytes = null;
+    boolean hasRefreshed = false;
+    while (true) {
+      try {
+        tokenBytes = getTokenBytes(someLlap);
+        break;
+      } catch (InterruptedException ie) {
+        throw new RuntimeException(ie);
+      } catch (IOException ex) {
+        LOG.error("Cannot get a token, trying a different instance", ex);
+        client = null;
+      }
+      if (llaps == null || !llaps.hasNext()) {
+        if (hasRefreshed) { // Only refresh once.
+          throw new RuntimeException("Cannot find any LLAPs to get the token from");
+        }
+        llaps = getLlapServices(true);
+        hasRefreshed = true;
+      }
+      someLlap = llaps.next();
+    }
+
+    // Stupid protobuf byte-buffer reinvention.
+    Token<LlapTokenIdentifier> token = new Token<>();
+    DataInputByteBuffer in = new DataInputByteBuffer();
+    in.reset(tokenBytes.asReadOnlyByteBuffer());
+    token.readFields(in);
+    return token;
+  }
+
+  private ByteString getTokenBytes(
+      final ServiceInstance si) throws InterruptedException, IOException {
+    return llapUgi.doAs(new PrivilegedExceptionAction<ByteString>() {
+      @Override
+      public ByteString run() throws Exception {
+        if (client == null) {
+          client = new LlapManagementProtocolClientImpl(
+            conf, si.getHost(), si.getManagementPort(), retryPolicy, socketFactory);
+        }
+        // Client only connects on the first call, so this has to be done in doAs.
+        GetTokenRequestProto req = GetTokenRequestProto.newBuilder().build();
+        return client.getDelegationToken(null, req).getToken();
+      }
+    });
+  }
+
+  private Iterator<ServiceInstance> getLlapServices(boolean doForceRefresh) throws IOException {
+    if (activeInstances == null) {
+      registry.start();
+      activeInstances = registry.getInstances();
+    }
+    Map<String, ServiceInstance> daemons = activeInstances.getAll();
+    if (doForceRefresh || daemons == null || daemons.isEmpty()) {
+      activeInstances.refresh();
+      daemons = activeInstances.getAll();
+      if (daemons == null || daemons.isEmpty()) throw new RuntimeException("No LLAPs found");
+    }
+    return daemons.values().iterator();
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/915587b8/llap-server/src/java/org/apache/hadoop/hive/llap/security/LlapServerSecurityInfo.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/security/LlapServerSecurityInfo.java b/llap-server/src/java/org/apache/hadoop/hive/llap/security/LlapServerSecurityInfo.java
new file mode 100644
index 0000000..4dca2ce
--- /dev/null
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/security/LlapServerSecurityInfo.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.llap.security;
+
+import java.lang.annotation.Annotation;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.llap.daemon.LlapDaemonProtocolBlockingPB;
+import org.apache.hadoop.hive.llap.daemon.LlapManagementProtocolBlockingPB;
+import org.apache.hadoop.security.KerberosInfo;
+import org.apache.hadoop.security.SecurityInfo;
+import org.apache.hadoop.security.token.TokenIdentifier;
+import org.apache.hadoop.security.token.TokenInfo;
+import org.apache.hadoop.security.token.TokenSelector;
+
+public class LlapServerSecurityInfo extends SecurityInfo {
+  private static final Log LOG = LogFactory.getLog(LlapServerSecurityInfo.class);
+
+  @Override
+  public KerberosInfo getKerberosInfo(Class<?> protocol, Configuration conf) {
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Trying to get KerberosInfo for " + protocol);
+    }
+    if (!LlapDaemonProtocolBlockingPB.class.isAssignableFrom(protocol)
+        && !LlapManagementProtocolBlockingPB.class.isAssignableFrom(protocol)) return null;
+    return new KerberosInfo() {
+      @Override
+      public Class<? extends Annotation> annotationType() {
+        return null;
+      }
+
+      @Override
+      public String serverPrincipal() {
+        return HiveConf.ConfVars.LLAP_KERBEROS_PRINCIPAL.varname;
+      }
+
+      @Override
+      public String clientPrincipal() {
+        return null;
+      }
+    };
+  }
+
+  @Override
+  public TokenInfo getTokenInfo(Class<?> protocol, Configuration conf) {
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Trying to get TokenInfo for " + protocol);
+    }
+    // Tokens cannot be used for the management protocol (for now).
+    if (!LlapDaemonProtocolBlockingPB.class.isAssignableFrom(protocol)) return null;
+    return new TokenInfo() {
+      @Override
+      public Class<? extends Annotation> annotationType() {
+        return null;
+      }
+
+      @Override
+      public Class<? extends TokenSelector<? extends TokenIdentifier>> value() {
+        return LlapTokenSelector.class;
+      }
+    };
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/915587b8/llap-server/src/java/org/apache/hadoop/hive/llap/security/LlapTokenSelector.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/security/LlapTokenSelector.java b/llap-server/src/java/org/apache/hadoop/hive/llap/security/LlapTokenSelector.java
new file mode 100644
index 0000000..b6e7499
--- /dev/null
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/security/LlapTokenSelector.java
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.llap.security;
+
+import java.util.Collection;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.TokenIdentifier;
+import org.apache.hadoop.security.token.TokenSelector;
+
+public class LlapTokenSelector implements TokenSelector<LlapTokenIdentifier> {
+  private static final Log LOG = LogFactory.getLog(LlapTokenSelector.class);
+
+  @Override
+  public Token<LlapTokenIdentifier> selectToken(Text service,
+      Collection<Token<? extends TokenIdentifier>> tokens) {
+    if (service == null) return null;
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Looking for a token with service " + service);
+    }
+    for (Token<? extends TokenIdentifier> token : tokens) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Token = " + token.getKind() + "; service = " + token.getService());
+      }
+      if (LlapTokenIdentifier.KIND_NAME.equals(token.getKind())
+          && service.equals(token.getService())) {
+        @SuppressWarnings("unchecked")
+        Token<LlapTokenIdentifier> result = (Token<LlapTokenIdentifier>)token;
+        return result;
+      }
+    }
+    return null;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/915587b8/llap-server/src/java/org/apache/hadoop/hive/llap/security/SecretManager.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/security/SecretManager.java b/llap-server/src/java/org/apache/hadoop/hive/llap/security/SecretManager.java
new file mode 100644
index 0000000..dc4e81a
--- /dev/null
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/security/SecretManager.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.llap.security;
+
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.security.PrivilegedAction;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
+import org.apache.hadoop.hive.llap.security.LlapTokenIdentifier;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.delegation.ZKDelegationTokenSecretManager;
+import org.apache.hadoop.security.token.delegation.web.DelegationTokenManager;
+
+public class SecretManager extends ZKDelegationTokenSecretManager<LlapTokenIdentifier> {
+  public SecretManager(Configuration conf) {
+    super(conf);
+  }
+
+  @Override
+  public LlapTokenIdentifier createIdentifier() {
+    return new LlapTokenIdentifier();
+  }
+
+  @Override
+  public LlapTokenIdentifier decodeTokenIdentifier(
+      Token<LlapTokenIdentifier> token) throws IOException {
+    DataInputStream dis = new DataInputStream(new ByteArrayInputStream(token.getIdentifier()));
+    LlapTokenIdentifier id = new LlapTokenIdentifier();
+    id.readFields(dis);
+    dis.close();
+    return id;
+  }
+
+  public static SecretManager createSecretManager(
+      final Configuration conf, String llapPrincipal, String llapKeytab) {
+    // Create ZK connection under a separate ugi (if specified) - ZK works in mysterious ways.
+    UserGroupInformation zkUgi = null;
+    String principal = HiveConf.getVar(conf, ConfVars.LLAP_ZKSM_KERBEROS_PRINCIPAL, llapPrincipal);
+    String keyTab = HiveConf.getVar(conf, ConfVars.LLAP_ZKSM_KERBEROS_KEYTAB_FILE, llapKeytab);
+    try {
+      zkUgi = LlapSecurityHelper.loginWithKerberos(principal, keyTab);
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+    // Override the default delegation token lifetime for LLAP.
+    // Also set all the necessary ZK settings to defaults and LLAP configs, if not set.
+    final Configuration zkConf = new Configuration(conf);
+    zkConf.setLong(DelegationTokenManager.MAX_LIFETIME,
+        HiveConf.getTimeVar(conf, ConfVars.LLAP_DELEGATION_TOKEN_LIFETIME, TimeUnit.SECONDS));
+    zkConf.set(SecretManager.ZK_DTSM_ZK_KERBEROS_PRINCIPAL, principal);
+    zkConf.set(SecretManager.ZK_DTSM_ZK_KERBEROS_KEYTAB, keyTab);
+    setZkConfIfNotSet(zkConf, SecretManager.ZK_DTSM_ZNODE_WORKING_PATH, "llapzkdtsm");
+    setZkConfIfNotSet(zkConf, SecretManager.ZK_DTSM_ZK_AUTH_TYPE, "sasl");
+    setZkConfIfNotSet(zkConf, SecretManager.ZK_DTSM_ZK_CONNECTION_STRING,
+        HiveConf.getVar(zkConf, ConfVars.LLAP_ZKSM_ZK_CONNECTION_STRING));
+    return zkUgi.doAs(new PrivilegedAction<SecretManager>() {
+      @Override
+      public SecretManager run() {
+        SecretManager zkSecretManager = new SecretManager(zkConf);
+        try {
+          zkSecretManager.startThreads();
+        } catch (IOException e) {
+          throw new RuntimeException(e);
+        }
+        return zkSecretManager;
+      }
+    });
+  }
+
+  private static void setZkConfIfNotSet(Configuration zkConf, String name, String value) {
+    if (zkConf.get(name) != null) return;
+    zkConf.set(name, value);
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hive/blob/915587b8/llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java b/llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java
index b93650d..ce248e9 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java
@@ -25,11 +25,13 @@ import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 
+import com.google.common.base.Preconditions;
 import com.google.common.collect.BiMap;
 import com.google.common.collect.HashBiMap;
 import com.google.protobuf.ByteString;
 import com.google.protobuf.ServiceException;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
 import org.apache.hadoop.hive.llap.LlapNodeId;
@@ -44,6 +46,7 @@ import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmitWor
 import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.TerminateFragmentRequestProto;
 import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.TerminateFragmentResponseProto;
 import org.apache.hadoop.hive.llap.protocol.LlapTaskUmbilicalProtocol;
+import org.apache.hadoop.hive.llap.security.LlapTokenIdentifier;
 import org.apache.hadoop.hive.llap.tezplugins.helpers.SourceStateTracker;
 import org.apache.hadoop.io.DataOutputBuffer;
 import org.apache.hadoop.io.Text;
@@ -52,6 +55,8 @@ import org.apache.hadoop.ipc.RPC;
 import org.apache.hadoop.ipc.RemoteException;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.yarn.api.ApplicationConstants;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.LocalResource;
@@ -88,12 +93,23 @@ public class LlapTaskCommunicator extends TezTaskCommunicatorImpl {
   private TaskCommunicator communicator;
   private long deleteDelayOnDagComplete;
   private final LlapTaskUmbilicalProtocol umbilical;
+  private final Token<LlapTokenIdentifier> token;
 
   private volatile String currentDagName;
 
   public LlapTaskCommunicator(
       TaskCommunicatorContext taskCommunicatorContext) {
     super(taskCommunicatorContext);
+    Credentials credentials = taskCommunicatorContext.getCredentials();
+    if (credentials != null) {
+      @SuppressWarnings("unchecked")
+      Token<LlapTokenIdentifier> llapToken =
+          (Token<LlapTokenIdentifier>)credentials.getToken(LlapTokenIdentifier.KIND_NAME);
+      this.token = llapToken;
+    } else {
+      this.token = null;
+    }
+    Preconditions.checkState((token != null) == UserGroupInformation.isSecurityEnabled());
 
     umbilical = new LlapTaskUmbilicalProtocolImpl(getUmbilical());
     SubmitWorkRequestProto.Builder baseBuilder = SubmitWorkRequestProto.newBuilder();
@@ -117,7 +133,7 @@ public class LlapTaskCommunicator extends TezTaskCommunicatorImpl {
     super.initialize();
     Configuration conf = getConf();
     int numThreads = HiveConf.getIntVar(conf, ConfVars.LLAP_DAEMON_COMMUNICATOR_NUM_THREADS);
-    this.communicator = new TaskCommunicator(numThreads, conf);
+    this.communicator = new TaskCommunicator(numThreads, conf, token);
     this.deleteDelayOnDagComplete = HiveConf.getTimeVar(
         conf, ConfVars.LLAP_FILE_CLEANUP_DELAY_SECONDS, TimeUnit.SECONDS);
     LOG.info("Running LlapTaskCommunicator with "
@@ -158,7 +174,9 @@ public class LlapTaskCommunicator extends TezTaskCommunicatorImpl {
           .setNumHandlers(numHandlers)
           .setSecretManager(jobTokenSecretManager).build();
 
-      // Do serviceACLs need to be refreshed, like in Tez ?
+      if (conf.getBoolean(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHORIZATION, false)) {
+        server.refreshServiceAcl(conf, new LlapUmbilicalPolicyProvider());
+      }
 
       server.start();
       this.address = NetUtils.getConnectAddress(server);

http://git-wip-us.apache.org/repos/asf/hive/blob/915587b8/llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapUmbilicalPolicyProvider.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapUmbilicalPolicyProvider.java b/llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapUmbilicalPolicyProvider.java
new file mode 100644
index 0000000..4102f5b
--- /dev/null
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapUmbilicalPolicyProvider.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.hadoop.hive.llap.tezplugins;
+
+import java.util.Arrays;
+
+import org.apache.hadoop.hive.llap.protocol.LlapTaskUmbilicalProtocol;
+import org.apache.hadoop.security.authorize.Service;
+import org.apache.tez.dag.api.TezConstants;
+import org.apache.tez.dag.app.security.authorize.TezAMPolicyProvider;
+
+public class LlapUmbilicalPolicyProvider extends TezAMPolicyProvider {
+
+  private static Service[] services;
+  private static final Object servicesLock = new Object();
+
+  @Override
+  public Service[] getServices() {
+    if (services != null) return services;
+    synchronized (servicesLock) {
+      if (services != null) return services;
+      Service[] parentSvc = super.getServices();
+      Service[] result = Arrays.copyOf(parentSvc, parentSvc.length + 1);
+      result[parentSvc.length] =  new Service(
+          TezConstants.TEZ_AM_SECURITY_SERVICE_AUTHORIZATION_TASK_UMBILICAL,
+          LlapTaskUmbilicalProtocol.class);
+      return (services = result);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/915587b8/llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/TaskCommunicator.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/TaskCommunicator.java b/llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/TaskCommunicator.java
index 8144165..f9ca677 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/TaskCommunicator.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/TaskCommunicator.java
@@ -15,6 +15,11 @@
 package org.apache.hadoop.hive.llap.tezplugins;
 
 import javax.net.SocketFactory;
+
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.security.PrivilegedAction;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.LinkedList;
@@ -55,9 +60,13 @@ import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmitWor
 import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmitWorkResponseProto;
 import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.TerminateFragmentRequestProto;
 import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.TerminateFragmentResponseProto;
+import org.apache.hadoop.hive.llap.security.LlapTokenIdentifier;
 import org.apache.hadoop.io.retry.RetryPolicies;
 import org.apache.hadoop.io.retry.RetryPolicy;
 import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.security.SecurityUtil;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.service.AbstractService;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -74,12 +83,14 @@ public class TaskCommunicator extends AbstractService {
 
   private final ListeningExecutorService requestManagerExecutor;
   private volatile ListenableFuture<Void> requestManagerFuture;
+  private final Token<LlapTokenIdentifier> llapToken;
 
-
-  public TaskCommunicator(int numThreads, Configuration conf) {
+  public TaskCommunicator(
+      int numThreads, Configuration conf, Token<LlapTokenIdentifier> llapToken) {
     super(TaskCommunicator.class.getSimpleName());
     this.hostProxies = new ConcurrentHashMap<>();
     this.socketFactory = NetUtils.getDefaultSocketFactory(conf);
+    this.llapToken = llapToken;
 
     long connectionTimeout = HiveConf.getTimeVar(conf,
         ConfVars.LLAP_TASK_COMMUNICATOR_CONNECTION_TIMEOUT_MS, TimeUnit.MILLISECONDS);
@@ -458,13 +469,34 @@ public class TaskCommunicator extends AbstractService {
     void indicateError(Throwable t);
   }
 
-  private LlapDaemonProtocolBlockingPB getProxy(LlapNodeId nodeId) {
+  private LlapDaemonProtocolBlockingPB getProxy(final LlapNodeId nodeId) {
     String hostId = getHostIdentifier(nodeId.getHostname(), nodeId.getPort());
 
     LlapDaemonProtocolBlockingPB proxy = hostProxies.get(hostId);
     if (proxy == null) {
-      proxy = new LlapDaemonProtocolClientImpl(getConfig(), nodeId.getHostname(), nodeId.getPort(),
-          retryPolicy, socketFactory);
+      if (llapToken == null) {
+        proxy = new LlapDaemonProtocolClientImpl(getConfig(), nodeId.getHostname(),
+            nodeId.getPort(), retryPolicy, socketFactory);
+      } else {
+        UserGroupInformation ugi;
+        try {
+          ugi = UserGroupInformation.getCurrentUser();
+        } catch (IOException e) {
+          throw new RuntimeException(e);
+        }
+        Token<LlapTokenIdentifier> nodeToken = new Token<LlapTokenIdentifier>(llapToken);
+        SecurityUtil.setTokenService(nodeToken, NetUtils.createSocketAddrForHost(
+            nodeId.getHostname(), nodeId.getPort()));
+        ugi.addToken(nodeToken);
+        proxy = ugi.doAs(new PrivilegedAction<LlapDaemonProtocolBlockingPB>() {
+          @Override
+          public LlapDaemonProtocolBlockingPB run() {
+           return new LlapDaemonProtocolClientImpl(getConfig(), nodeId.getHostname(),
+               nodeId.getPort(), retryPolicy, socketFactory);
+          }
+        });
+      }
+
       LlapDaemonProtocolBlockingPB proxyOld = hostProxies.putIfAbsent(hostId, proxy);
       if (proxyOld != null) {
         // TODO Shutdown the new proxy.

http://git-wip-us.apache.org/repos/asf/hive/blob/915587b8/llap-server/src/main/resources/META-INF/services/org.apache.hadoop.security.SecurityInfo
----------------------------------------------------------------------
diff --git a/llap-server/src/main/resources/META-INF/services/org.apache.hadoop.security.SecurityInfo b/llap-server/src/main/resources/META-INF/services/org.apache.hadoop.security.SecurityInfo
new file mode 100644
index 0000000..dcc6988
--- /dev/null
+++ b/llap-server/src/main/resources/META-INF/services/org.apache.hadoop.security.SecurityInfo
@@ -0,0 +1,14 @@
+#
+#   Licensed under the Apache License, Version 2.0 (the "License");
+#   you may not use this file except in compliance with the License.
+#   You may obtain a copy of the License at
+#
+#       http://www.apache.org/licenses/LICENSE-2.0
+#
+#   Unless required by applicable law or agreed to in writing, software
+#   distributed under the License is distributed on an "AS IS" BASIS,
+#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#   See the License for the specific language governing permissions and
+#   limitations under the License.
+#
+org.apache.hadoop.hive.llap.security.LlapServerSecurityInfo

http://git-wip-us.apache.org/repos/asf/hive/blob/915587b8/llap-server/src/main/resources/META-INF/services/org.apache.hadoop.security.token.TokenRenewer
----------------------------------------------------------------------
diff --git a/llap-server/src/main/resources/META-INF/services/org.apache.hadoop.security.token.TokenRenewer b/llap-server/src/main/resources/META-INF/services/org.apache.hadoop.security.token.TokenRenewer
new file mode 100644
index 0000000..e80ac41
--- /dev/null
+++ b/llap-server/src/main/resources/META-INF/services/org.apache.hadoop.security.token.TokenRenewer
@@ -0,0 +1,14 @@
+#
+#   Licensed under the Apache License, Version 2.0 (the "License");
+#   you may not use this file except in compliance with the License.
+#   You may obtain a copy of the License at
+#
+#       http://www.apache.org/licenses/LICENSE-2.0
+#
+#   Unless required by applicable law or agreed to in writing, software
+#   distributed under the License is distributed on an "AS IS" BASIS,
+#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#   See the License for the specific language governing permissions and
+#   limitations under the License.
+#
+org.apache.hadoop.hive.llap.security.LlapTokenIdentifier.Renewer

http://git-wip-us.apache.org/repos/asf/hive/blob/915587b8/llap-server/src/protobuf/LlapDaemonProtocol.proto
----------------------------------------------------------------------
diff --git a/llap-server/src/protobuf/LlapDaemonProtocol.proto b/llap-server/src/protobuf/LlapDaemonProtocol.proto
index 0ba6acf..07721df 100644
--- a/llap-server/src/protobuf/LlapDaemonProtocol.proto
+++ b/llap-server/src/protobuf/LlapDaemonProtocol.proto
@@ -117,9 +117,20 @@ message TerminateFragmentRequestProto {
 message TerminateFragmentResponseProto {
 }
 
+message GetTokenRequestProto {
+}
+
+message GetTokenResponseProto {
+  optional bytes token = 1;
+}
+
 service LlapDaemonProtocol {
   rpc submitWork(SubmitWorkRequestProto) returns (SubmitWorkResponseProto);
   rpc sourceStateUpdated(SourceStateUpdatedRequestProto) returns (SourceStateUpdatedResponseProto);
   rpc queryComplete(QueryCompleteRequestProto) returns (QueryCompleteResponseProto);
   rpc terminateFragment(TerminateFragmentRequestProto) returns (TerminateFragmentResponseProto);
 }
+
+service LlapManagementProtocol {
+  rpc getDelegationToken(GetTokenRequestProto) returns (GetTokenResponseProto);
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/915587b8/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/MiniLlapCluster.java
----------------------------------------------------------------------
diff --git a/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/MiniLlapCluster.java b/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/MiniLlapCluster.java
index 52ba360..deade5f 100644
--- a/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/MiniLlapCluster.java
+++ b/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/MiniLlapCluster.java
@@ -143,7 +143,7 @@ public class MiniLlapCluster extends AbstractService {
   @Override
   public void serviceInit(Configuration conf) {
     llapDaemon = new LlapDaemon(conf, numExecutorsPerService, execBytesPerService, llapIoEnabled,
-        ioIsDirect, ioBytesPerService, localDirs, 0, 0);
+        ioIsDirect, ioBytesPerService, localDirs, 0, 0, 0);
     llapDaemon.init(conf);
   }
 

http://git-wip-us.apache.org/repos/asf/hive/blob/915587b8/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestLlapDaemonProtocolServerImpl.java
----------------------------------------------------------------------
diff --git a/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestLlapDaemonProtocolServerImpl.java b/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestLlapDaemonProtocolServerImpl.java
index bf8a673..0006a9a 100644
--- a/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestLlapDaemonProtocolServerImpl.java
+++ b/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestLlapDaemonProtocolServerImpl.java
@@ -39,7 +39,8 @@ public class TestLlapDaemonProtocolServerImpl {
     int numHandlers = HiveConf.getIntVar(daemonConf, ConfVars.LLAP_DAEMON_RPC_NUM_HANDLERS);
     LlapDaemonProtocolServerImpl server =
         new LlapDaemonProtocolServerImpl(numHandlers, mock(ContainerRunner.class),
-            new AtomicReference<InetSocketAddress>(), rpcPort);
+           new AtomicReference<InetSocketAddress>(), new AtomicReference<InetSocketAddress>(),
+           rpcPort, rpcPort + 1);
 
     try {
       server.init(new Configuration());

http://git-wip-us.apache.org/repos/asf/hive/blob/915587b8/ql/src/gen/protobuf/gen-java/org/apache/hadoop/hive/ql/io/orc/OrcProto.java
----------------------------------------------------------------------
diff --git a/ql/src/gen/protobuf/gen-java/org/apache/hadoop/hive/ql/io/orc/OrcProto.java b/ql/src/gen/protobuf/gen-java/org/apache/hadoop/hive/ql/io/orc/OrcProto.java
index f4cc240..5202476 100644
--- a/ql/src/gen/protobuf/gen-java/org/apache/hadoop/hive/ql/io/orc/OrcProto.java
+++ b/ql/src/gen/protobuf/gen-java/org/apache/hadoop/hive/ql/io/orc/OrcProto.java
@@ -17765,6 +17765,7 @@ public final class OrcProto {
      * Version of the writer:
      *   0 (or missing) = original
      *   1 = HIVE-8732 fixed
+     *   2 = HIVE-4243 fixed
      * </pre>
      */
     boolean hasWriterVersion();
@@ -17775,6 +17776,7 @@ public final class OrcProto {
      * Version of the writer:
      *   0 (or missing) = original
      *   1 = HIVE-8732 fixed
+     *   2 = HIVE-4243 fixed
      * </pre>
      */
     int getWriterVersion();
@@ -18077,6 +18079,7 @@ public final class OrcProto {
      * Version of the writer:
      *   0 (or missing) = original
      *   1 = HIVE-8732 fixed
+     *   2 = HIVE-4243 fixed
      * </pre>
      */
     public boolean hasWriterVersion() {
@@ -18089,6 +18092,7 @@ public final class OrcProto {
      * Version of the writer:
      *   0 (or missing) = original
      *   1 = HIVE-8732 fixed
+     *   2 = HIVE-4243 fixed
      * </pre>
      */
     public int getWriterVersion() {
@@ -18759,6 +18763,7 @@ public final class OrcProto {
        * Version of the writer:
        *   0 (or missing) = original
        *   1 = HIVE-8732 fixed
+       *   2 = HIVE-4243 fixed
        * </pre>
        */
       public boolean hasWriterVersion() {
@@ -18771,6 +18776,7 @@ public final class OrcProto {
        * Version of the writer:
        *   0 (or missing) = original
        *   1 = HIVE-8732 fixed
+       *   2 = HIVE-4243 fixed
        * </pre>
        */
       public int getWriterVersion() {
@@ -18783,6 +18789,7 @@ public final class OrcProto {
        * Version of the writer:
        *   0 (or missing) = original
        *   1 = HIVE-8732 fixed
+       *   2 = HIVE-4243 fixed
        * </pre>
        */
       public Builder setWriterVersion(int value) {
@@ -18798,6 +18805,7 @@ public final class OrcProto {
        * Version of the writer:
        *   0 (or missing) = original
        *   1 = HIVE-8732 fixed
+       *   2 = HIVE-4243 fixed
        * </pre>
        */
       public Builder clearWriterVersion() {

http://git-wip-us.apache.org/repos/asf/hive/blob/915587b8/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java
index 1a9469a..4fb6c00 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java
@@ -2993,7 +2993,7 @@ public class DDLTask extends Task<DDLWork> implements Serializable {
           cols.addAll(tbl.getPartCols());
         }
       } else {
-        cols = Hive.getFieldsFromDeserializer(colPath, deserializer); // TODO#: here - desc
+        cols = Hive.getFieldsFromDeserializer(colPath, deserializer);
         if (descTbl.isFormatted()) {
           // when column name is specified in describe table DDL, colPath will
           // will be table_name.column_name

http://git-wip-us.apache.org/repos/asf/hive/blob/915587b8/ql/src/java/org/apache/hadoop/hive/ql/exec/GlobalWorkMapFactory.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/GlobalWorkMapFactory.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/GlobalWorkMapFactory.java
index 7c38dc3..338e495 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/GlobalWorkMapFactory.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/GlobalWorkMapFactory.java
@@ -28,7 +28,7 @@ import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
 import org.apache.hadoop.hive.ql.plan.BaseWork;
 import org.apache.hadoop.hive.ql.session.SessionState;
-import org.apache.hadoop.hive.llap.io.api.LlapIoProxy;
+import org.apache.hadoop.hive.llap.io.api.LlapProxy;
 
 public class GlobalWorkMapFactory {
 
@@ -99,7 +99,7 @@ public class GlobalWorkMapFactory {
   DummyMap<Path, BaseWork> dummy = new DummyMap<Path, BaseWork>();
 
   public Map<Path, BaseWork> get(Configuration conf) {
-    if (LlapIoProxy.isDaemon()
+    if (LlapProxy.isDaemon()
         || (SessionState.get() != null && SessionState.get().isHiveServerQuery())
         || HiveConf.getVar(conf, ConfVars.HIVE_EXECUTION_ENGINE).equals("spark")) {
       if (threadLocalWorkMap == null) {

http://git-wip-us.apache.org/repos/asf/hive/blob/915587b8/ql/src/java/org/apache/hadoop/hive/ql/exec/ObjectCacheFactory.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/ObjectCacheFactory.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/ObjectCacheFactory.java
index 3d9771a..5201120 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/ObjectCacheFactory.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/ObjectCacheFactory.java
@@ -24,8 +24,8 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.llap.io.api.LlapIoProxy;
-import org.apache.hadoop.hive.llap.io.api.LlapIoProxy;
+import org.apache.hadoop.hive.llap.io.api.LlapProxy;
+import org.apache.hadoop.hive.llap.io.api.LlapProxy;
 import org.apache.hadoop.hive.ql.exec.tez.LlapObjectCache;
 
 /**
@@ -46,7 +46,7 @@ public class ObjectCacheFactory {
    */
   public static ObjectCache getCache(Configuration conf, String queryId) {
     if (HiveConf.getVar(conf, HiveConf.ConfVars.HIVE_EXECUTION_ENGINE).equals("tez")) {
-      if (LlapIoProxy.isDaemon()) { // daemon
+      if (LlapProxy.isDaemon()) { // daemon
         if (HiveConf.getBoolVar(conf, HiveConf.ConfVars.LLAP_OBJECT_CACHE_ENABLED)) {
           // LLAP object cache, unlike others, does not use globals. Thus, get the existing one.
           return getLlapObjectCache(queryId);

http://git-wip-us.apache.org/repos/asf/hive/blob/915587b8/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java
index 914b4e7..ee62ab3 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java
@@ -34,7 +34,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.llap.io.api.LlapIoProxy;
+import org.apache.hadoop.hive.llap.io.api.LlapProxy;
 import org.apache.hadoop.hive.ql.exec.DummyStoreOperator;
 import org.apache.hadoop.hive.ql.exec.HashTableDummyOperator;
 import org.apache.hadoop.hive.ql.exec.MapOperator;
@@ -96,7 +96,7 @@ public class MapRecordProcessor extends RecordProcessor {
   public MapRecordProcessor(final JobConf jconf, final ProcessorContext context) throws Exception {
     super(jconf, context);
     String queryId = HiveConf.getVar(jconf, HiveConf.ConfVars.HIVEQUERYID);
-    if (LlapIoProxy.isDaemon()) { // do not cache plan
+    if (LlapProxy.isDaemon()) { // do not cache plan
       cache = new org.apache.hadoop.hive.ql.exec.mr.ObjectCache();
     } else {
       cache = ObjectCacheFactory.getCache(jconf, queryId);

http://git-wip-us.apache.org/repos/asf/hive/blob/915587b8/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java
index efcf88c..0579dbc 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java
@@ -29,7 +29,7 @@ import java.util.concurrent.Callable;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.llap.io.api.LlapIoProxy;
+import org.apache.hadoop.hive.llap.io.api.LlapProxy;
 import org.apache.hadoop.hive.ql.exec.DummyStoreOperator;
 import org.apache.hadoop.hive.ql.exec.HashTableDummyOperator;
 import org.apache.hadoop.hive.ql.exec.MapredContext;
@@ -92,7 +92,7 @@ public class ReduceRecordProcessor  extends RecordProcessor{
     ObjectCache cache;
 
     String queryId = HiveConf.getVar(jconf, HiveConf.ConfVars.HIVEQUERYID);
-    if (LlapIoProxy.isDaemon()) { // don't cache plan
+    if (LlapProxy.isDaemon()) { // don't cache plan
       cache = new org.apache.hadoop.hive.ql.exec.mr.ObjectCache();
     } else {
       cache = ObjectCacheFactory.getCache(jconf, queryId);

http://git-wip-us.apache.org/repos/asf/hive/blob/915587b8/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java
index 07f26be..e1a8041 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java
@@ -18,6 +18,8 @@
 package org.apache.hadoop.hive.ql.exec.tez;
 
 
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
 import java.io.File;
 import java.io.FileNotFoundException;
 import java.io.IOException;
@@ -40,6 +42,7 @@ import java.util.concurrent.TimeoutException;
 
 import javax.security.auth.login.LoginException;
 
+import org.apache.commons.codec.binary.Base64;
 import org.apache.commons.codec.digest.DigestUtils;
 import org.apache.commons.io.FilenameUtils;
 import org.apache.hadoop.fs.FileStatus;
@@ -48,11 +51,16 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
+import org.apache.hadoop.hive.llap.io.api.LlapProxy;
+import org.apache.hadoop.hive.llap.security.LlapTokenIdentifier;
+import org.apache.hadoop.hive.llap.security.LlapTokenProvider;
 import org.apache.hadoop.hive.ql.exec.Utilities;
 import org.apache.hadoop.hive.ql.session.SessionState;
 import org.apache.hadoop.hive.ql.session.SessionState.LogHelper;
 import org.apache.hadoop.hive.shims.Utils;
+import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.yarn.api.records.LocalResource;
 import org.apache.hadoop.yarn.api.records.LocalResourceType;
 import org.apache.tez.client.TezClient;
@@ -189,7 +197,8 @@ public class TezSessionState {
     this.queueName = conf.get("tez.queue.name");
     this.doAsEnabled = conf.getBoolVar(HiveConf.ConfVars.HIVE_SERVER2_ENABLE_DOAS);
 
-    final boolean llapMode = "llap".equals(HiveConf.getVar(conf, HiveConf.ConfVars.HIVE_EXECUTION_MODE));
+    final boolean llapMode = "llap".equals(HiveConf.getVar(
+        conf, HiveConf.ConfVars.HIVE_EXECUTION_MODE));
 
     UserGroupInformation ugi = Utils.getUGI();
     user = ugi.getShortUserName();
@@ -258,19 +267,25 @@ public class TezSessionState {
     conf.stripHiddenConfigurations(tezConfig);
 
     ServicePluginsDescriptor servicePluginsDescriptor;
-    UserPayload servicePluginPayload = TezUtils.createUserPayloadFromConf(tezConfig);
 
+    Credentials llapCredentials = null;
     if (llapMode) {
+      if (UserGroupInformation.isSecurityEnabled()) {
+        LlapTokenProvider tp = LlapProxy.getOrInitTokenProvider(conf);
+        Token<LlapTokenIdentifier> token = tp.getDelegationToken();
+        LOG.info("Obtained a token: " + token);
+        llapCredentials = new Credentials();
+        llapCredentials.addToken(LlapTokenIdentifier.KIND_NAME, token);
+      }
+      UserPayload servicePluginPayload = TezUtils.createUserPayloadFromConf(tezConfig);
       // we need plugins to handle llap and uber mode
       servicePluginsDescriptor = ServicePluginsDescriptor.create(true,
-          new TaskSchedulerDescriptor[]{
-              TaskSchedulerDescriptor.create(LLAP_SERVICE, LLAP_SCHEDULER)
-                  .setUserPayload(servicePluginPayload)},
-          new ContainerLauncherDescriptor[]{
-              ContainerLauncherDescriptor.create(LLAP_SERVICE, LLAP_LAUNCHER)},
-          new TaskCommunicatorDescriptor[]{
-              TaskCommunicatorDescriptor.create(LLAP_SERVICE, LLAP_TASK_COMMUNICATOR)
-                  .setUserPayload(servicePluginPayload)});
+          new TaskSchedulerDescriptor[] { TaskSchedulerDescriptor.create(
+              LLAP_SERVICE, LLAP_SCHEDULER).setUserPayload(servicePluginPayload) },
+          new ContainerLauncherDescriptor[] { ContainerLauncherDescriptor.create(
+              LLAP_SERVICE, LLAP_LAUNCHER) },
+          new TaskCommunicatorDescriptor[] { TaskCommunicatorDescriptor.create(
+              LLAP_SERVICE, LLAP_TASK_COMMUNICATOR).setUserPayload(servicePluginPayload) });
     } else {
       servicePluginsDescriptor = ServicePluginsDescriptor.create(true);
     }
@@ -286,7 +301,8 @@ public class TezSessionState {
 
     final TezClient session = TezClient.newBuilder("HIVE-" + sessionId, tezConfig)
         .setIsSession(true).setLocalResources(commonLocalResources)
-        .setServicePluginDescriptor(servicePluginsDescriptor).build();
+        .setCredentials(llapCredentials).setServicePluginDescriptor(servicePluginsDescriptor)
+        .build();
 
     LOG.info("Opening new Tez Session (id: " + sessionId
         + ", scratch dir: " + tezScratchDir + ")");

http://git-wip-us.apache.org/repos/asf/hive/blob/915587b8/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java
index b19c70a..bdf5dc2 100755
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java
@@ -40,7 +40,7 @@ import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
 import org.apache.hadoop.hive.io.HiveIOExceptionHandlerUtil;
 import org.apache.hadoop.hive.llap.io.api.LlapIo;
-import org.apache.hadoop.hive.llap.io.api.LlapIoProxy;
+import org.apache.hadoop.hive.llap.io.api.LlapProxy;
 import org.apache.hadoop.hive.ql.exec.spark.SparkDynamicPartitionPruner;
 import org.apache.hadoop.hive.ql.plan.TableDesc;
 import org.apache.hadoop.hive.ql.exec.Operator;
@@ -214,7 +214,7 @@ public class HiveInputFormat<K extends WritableComparable, V extends Writable>
       LOG.debug("Wrapping " + inputFormat);
     }
     @SuppressWarnings("unchecked")
-    LlapIo<VectorizedRowBatch> llapIo = LlapIoProxy.getIo();
+    LlapIo<VectorizedRowBatch> llapIo = LlapProxy.getIo();
     if (llapIo == null) {
       LOG.info("Not using LLAP because IO is not initialized");
       return inputFormat;

http://git-wip-us.apache.org/repos/asf/hive/blob/915587b8/serde/src/java/org/apache/hadoop/hive/serde2/AbstractSerDe.java
----------------------------------------------------------------------
diff --git a/serde/src/java/org/apache/hadoop/hive/serde2/AbstractSerDe.java b/serde/src/java/org/apache/hadoop/hive/serde2/AbstractSerDe.java
index 9269ff4..9434e91 100644
--- a/serde/src/java/org/apache/hadoop/hive/serde2/AbstractSerDe.java
+++ b/serde/src/java/org/apache/hadoop/hive/serde2/AbstractSerDe.java
@@ -122,6 +122,6 @@ public abstract class AbstractSerDe implements SerDe {
    *        does, in fact, store it inside metastore, based on table parameters.
    */
   public boolean shouldStoreFieldsInMetastore(Map<String, String> tableParams) {
-    return false; // The default, unless SerDe overrides it. TODO#
+    return false; // The default, unless SerDe overrides it.
   }
 }


[2/2] hive git commit: HIVE-12341 : LLAP: add security to daemon protocol endpoint (excluding shuffle) (Sergey Shelukhin, reviewed by Siddharth Seth, Lefty Leverenz)

Posted by se...@apache.org.
HIVE-12341 : LLAP: add security to daemon protocol endpoint (excluding shuffle) (Sergey Shelukhin, reviewed by Siddharth Seth, Lefty Leverenz)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/915587b8
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/915587b8
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/915587b8

Branch: refs/heads/master
Commit: 915587b8c0f2b1fd10f2d5c026027b58501b2896
Parents: 74e5c75
Author: Sergey Shelukhin <se...@apache.org>
Authored: Wed Dec 9 15:55:25 2015 -0800
Committer: Sergey Shelukhin <se...@apache.org>
Committed: Wed Dec 9 16:00:52 2015 -0800

----------------------------------------------------------------------
 .../org/apache/hadoop/hive/conf/HiveConf.java   |   46 +-
 .../hadoop/hive/llap/io/api/LlapIoProxy.java    |   78 --
 .../hadoop/hive/llap/io/api/LlapProxy.java      |  111 ++
 .../hive/llap/registry/ServiceInstance.java     |    7 +
 .../registry/impl/LlapFixedRegistryImpl.java    |    7 +
 .../registry/impl/LlapYarnRegistryImpl.java     |   30 +-
 .../hive/llap/security/LlapTokenIdentifier.java |   82 ++
 .../hive/llap/security/LlapTokenProvider.java   |   27 +
 .../daemon/rpc/LlapDaemonProtocolProtos.java    | 1059 +++++++++++++++++-
 .../daemon/LlapDaemonProtocolBlockingPB.java    |    6 +
 .../LlapManagementProtocolBlockingPB.java       |   24 +
 .../hive/llap/daemon/impl/LlapDaemon.java       |   37 +-
 .../impl/LlapDaemonProtocolClientImpl.java      |    1 -
 .../impl/LlapDaemonProtocolServerImpl.java      |  155 ++-
 .../impl/LlapManagementProtocolClientImpl.java  |   82 ++
 .../protocol/LlapTaskUmbilicalProtocol.java     |    3 +
 .../llap/security/LlapDaemonPolicyProvider.java |   38 +
 .../hive/llap/security/LlapSecurityHelper.java  |  155 +++
 .../llap/security/LlapServerSecurityInfo.java   |   78 ++
 .../hive/llap/security/LlapTokenSelector.java   |   53 +
 .../hive/llap/security/SecretManager.java       |   91 ++
 .../llap/tezplugins/LlapTaskCommunicator.java   |   22 +-
 .../tezplugins/LlapUmbilicalPolicyProvider.java |   42 +
 .../hive/llap/tezplugins/TaskCommunicator.java  |   42 +-
 .../org.apache.hadoop.security.SecurityInfo     |   14 +
 ...rg.apache.hadoop.security.token.TokenRenewer |   14 +
 .../src/protobuf/LlapDaemonProtocol.proto       |   11 +
 .../hive/llap/daemon/MiniLlapCluster.java       |    2 +-
 .../impl/TestLlapDaemonProtocolServerImpl.java  |    3 +-
 .../apache/hadoop/hive/ql/io/orc/OrcProto.java  |    8 +
 .../org/apache/hadoop/hive/ql/exec/DDLTask.java |    2 +-
 .../hive/ql/exec/GlobalWorkMapFactory.java      |    4 +-
 .../hadoop/hive/ql/exec/ObjectCacheFactory.java |    6 +-
 .../hive/ql/exec/tez/MapRecordProcessor.java    |    4 +-
 .../hive/ql/exec/tez/ReduceRecordProcessor.java |    4 +-
 .../hive/ql/exec/tez/TezSessionState.java       |   38 +-
 .../hadoop/hive/ql/io/HiveInputFormat.java      |    4 +-
 .../hadoop/hive/serde2/AbstractSerDe.java       |    2 +-
 38 files changed, 2206 insertions(+), 186 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/915587b8/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index 058f91d..7e9bf61 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -218,6 +218,9 @@ public class HiveConf extends Configuration {
     }
   }
 
+  public static final String HIVE_LLAP_DAEMON_SERVICE_PRINCIPAL_NAME = "hive.llap.daemon.service.principal";
+
+
   /**
    * dbVars are the parameters can be set per database. If these
    * parameters are set as a database property, when switching to that
@@ -2117,16 +2120,6 @@ public class HiveConf extends Configuration {
 
     HIVE_SECURITY_COMMAND_WHITELIST("hive.security.command.whitelist", "set,reset,dfs,add,list,delete,reload,compile",
         "Comma separated list of non-SQL Hive commands users are authorized to execute"),
-    HIVE_CONF_RESTRICTED_LIST("hive.conf.restricted.list",
-        "hive.security.authenticator.manager,hive.security.authorization.manager,hive.users.in.admin.role",
-        "Comma separated list of configuration options which are immutable at runtime"),
-    HIVE_CONF_HIDDEN_LIST("hive.conf.hidden.list",
-        METASTOREPWD.varname + "," + HIVE_SERVER2_SSL_KEYSTORE_PASSWORD.varname,
-        "Comma separated list of configuration options which should not be read by normal user like passwords"),
-
-    HIVE_CONF_INTERNAL_VARIABLE_LIST("hive.conf.internal.variable.list",
-        "hive.added.files.path,hive.added.jars.path,hive.added.archives.path",
-        "Comma separated list of variables which are used internally and should not be configurable."),
 
     // If this is set all move tasks at the end of a multi-insert query will only begin once all
     // outputs are ready
@@ -2377,6 +2370,26 @@ public class HiveConf extends Configuration {
         "By default, percentile latency metrics are disabled."),
     LLAP_IO_THREADPOOL_SIZE("hive.llap.io.threadpool.size", 10,
         "Specify the number of threads to use for low-level IO thread pool."),
+    LLAP_KERBEROS_PRINCIPAL(HIVE_LLAP_DAEMON_SERVICE_PRINCIPAL_NAME, "",
+        "The name of the LLAP daemon's service principal."),
+    LLAP_KERBEROS_KEYTAB_FILE("hive.llap.daemon.keytab.file", "",
+        "The path to the Kerberos Keytab file containing the LLAP daemon's service principal."),
+    LLAP_ZKSM_KERBEROS_PRINCIPAL("hive.llap.zk.sm.principal", "",
+        "The name of the principal to use to talk to ZooKeeper for ZooKeeper SecretManager."),
+    LLAP_ZKSM_KERBEROS_KEYTAB_FILE("hive.llap.zk.sm.keytab.file", "",
+        "The path to the Kerberos Keytab file containing the principal to use to talk to\n" +
+        "ZooKeeper for ZooKeeper SecretManager."),
+    LLAP_ZKSM_ZK_CONNECTION_STRING("hive.llap.zk.sm.connectionString", "",
+        "ZooKeeper connection string for ZooKeeper SecretManager."),
+    LLAP_SECURITY_ACL("hive.llap.daemon.service.acl", "*", "The ACL for LLAP daemon."),
+    LLAP_MANAGEMENT_ACL("hive.llap.management.service.acl", "*",
+        "The ACL for LLAP daemon management."),
+    // Hadoop DelegationTokenManager default is 1 week.
+    LLAP_DELEGATION_TOKEN_LIFETIME("hive.llap.daemon.delegation.token.lifetime", "14d",
+         new TimeValidator(TimeUnit.SECONDS),
+        "LLAP delegation token lifetime, in seconds if specified without a unit."),
+    LLAP_MANAGEMENT_RPC_PORT("hive.llap.management.rpc.port", 15004,
+        "RPC port for LLAP daemon management service."),
 
     LLAP_DAEMON_RPC_NUM_HANDLERS("hive.llap.daemon.rpc.num.handlers", 5,
       "Number of RPC handlers for LLAP daemon.", "llap.daemon.rpc.num.handlers"),
@@ -2532,7 +2545,18 @@ public class HiveConf extends Configuration {
         "Expected inflation factor between disk/in memory representation of hash tables"),
     HIVE_LOG_TRACE_ID("hive.log.trace.id", "",
         "Log tracing id that can be used by upstream clients for tracking respective logs. " +
-        "Truncated to " + LOG_PREFIX_LENGTH + " characters. Defaults to use auto-generated session id.");
+        "Truncated to " + LOG_PREFIX_LENGTH + " characters. Defaults to use auto-generated session id."),
+
+
+    HIVE_CONF_RESTRICTED_LIST("hive.conf.restricted.list",
+        "hive.security.authenticator.manager,hive.security.authorization.manager,hive.users.in.admin.role",
+        "Comma separated list of configuration options which are immutable at runtime"),
+    HIVE_CONF_HIDDEN_LIST("hive.conf.hidden.list",
+        METASTOREPWD.varname + "," + HIVE_SERVER2_SSL_KEYSTORE_PASSWORD.varname,
+        "Comma separated list of configuration options which should not be read by normal user like passwords"),
+    HIVE_CONF_INTERNAL_VARIABLE_LIST("hive.conf.internal.variable.list",
+        "hive.added.files.path,hive.added.jars.path,hive.added.archives.path",
+        "Comma separated list of variables which are used internally and should not be configurable.");
 
 
     public final String varname;

http://git-wip-us.apache.org/repos/asf/hive/blob/915587b8/llap-client/src/java/org/apache/hadoop/hive/llap/io/api/LlapIoProxy.java
----------------------------------------------------------------------
diff --git a/llap-client/src/java/org/apache/hadoop/hive/llap/io/api/LlapIoProxy.java b/llap-client/src/java/org/apache/hadoop/hive/llap/io/api/LlapIoProxy.java
deleted file mode 100644
index 4c31e32..0000000
--- a/llap-client/src/java/org/apache/hadoop/hive/llap/io/api/LlapIoProxy.java
+++ /dev/null
@@ -1,78 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hive.llap.io.api;
-
-import java.io.IOException;
-import java.lang.reflect.Constructor;
-
-import org.apache.hadoop.conf.Configuration;
-
-
-@SuppressWarnings("rawtypes")
-public class LlapIoProxy {
-  private final static String IMPL_CLASS = "org.apache.hadoop.hive.llap.io.api.impl.LlapIoImpl";
-
-  // Llap server depends on Hive execution, so the reverse cannot be true. We create the I/O
-  // singleton once (on daemon startup); the said singleton serves as the IO interface.
-  private static LlapIo io = null;
-
-  private static boolean isDaemon = false;
-
-  public static void setDaemon(boolean isDaemon) {
-    LlapIoProxy.isDaemon = isDaemon;
-  }
-
-  public static boolean isDaemon() {
-    return isDaemon;
-  }
-
-  public static LlapIo getIo() {
-    return io;
-  }
-
-  public static void initializeLlapIo(Configuration conf) {
-
-    if (io != null) {
-      return; // already initialized
-    }
-
-    try {
-      io = createIoImpl(conf);
-    } catch (IOException e) {
-      throw new RuntimeException("Cannot initialize local server", e);
-    }
-  }
-
-  private static LlapIo createIoImpl(Configuration conf) throws IOException {
-    try {
-      @SuppressWarnings("unchecked")
-      Class<? extends LlapIo> clazz = (Class<? extends LlapIo>)Class.forName(IMPL_CLASS);
-      Constructor<? extends LlapIo> ctor = clazz.getDeclaredConstructor(Configuration.class);
-      ctor.setAccessible(true);
-      return ctor.newInstance(conf);
-    } catch (Exception e) {
-      throw new RuntimeException("Failed to create impl class", e);
-    }
-  }
-
-  public static void close() {
-    if (io != null) {
-      io.close();
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/hive/blob/915587b8/llap-client/src/java/org/apache/hadoop/hive/llap/io/api/LlapProxy.java
----------------------------------------------------------------------
diff --git a/llap-client/src/java/org/apache/hadoop/hive/llap/io/api/LlapProxy.java b/llap-client/src/java/org/apache/hadoop/hive/llap/io/api/LlapProxy.java
new file mode 100644
index 0000000..6359bab
--- /dev/null
+++ b/llap-client/src/java/org/apache/hadoop/hive/llap/io/api/LlapProxy.java
@@ -0,0 +1,111 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.llap.io.api;
+
+import java.io.IOException;
+import java.lang.reflect.Constructor;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.llap.security.LlapTokenProvider;
+
+
+@SuppressWarnings("rawtypes")
+public class LlapProxy {
+  private final static String IMPL_CLASS = "org.apache.hadoop.hive.llap.io.api.impl.LlapIoImpl";
+  private final static String TOKEN_CLASS =
+      "org.apache.hadoop.hive.llap.security.LlapSecurityHelper";
+
+  // Llap server depends on Hive execution, so the reverse cannot be true. We create the I/O
+  // singleton once (on daemon startup); the said singleton serves as the IO interface.
+  private static LlapIo io = null;
+  private static LlapTokenProvider tokenProvider = null;
+  private static final Object tpInitLock = new Object();
+  private static volatile boolean isTpInitDone = false;
+
+  private static boolean isDaemon = false;
+
+  public static void setDaemon(boolean isDaemon) {
+    LlapProxy.isDaemon = isDaemon;
+  }
+
+  public static boolean isDaemon() {
+    return isDaemon;
+  }
+
+  public static LlapIo getIo() {
+    return io;
+  }
+
+  public static void initializeLlapIo(Configuration conf) {
+    if (io != null) {
+      return; // already initialized
+    }
+
+    try {
+      io = createIoImpl(conf);
+    } catch (IOException e) {
+      throw new RuntimeException("Cannot initialize local server", e);
+    }
+  }
+
+  private static LlapIo createIoImpl(Configuration conf) throws IOException {
+    try {
+      @SuppressWarnings("unchecked")
+      Class<? extends LlapIo> clazz = (Class<? extends LlapIo>)Class.forName(IMPL_CLASS);
+      Constructor<? extends LlapIo> ctor = clazz.getDeclaredConstructor(Configuration.class);
+      ctor.setAccessible(true);
+      return ctor.newInstance(conf);
+    } catch (Exception e) {
+      throw new RuntimeException("Failed to create impl class", e);
+    }
+  }
+
+  public static LlapTokenProvider getOrInitTokenProvider(Configuration conf) {
+    if (isTpInitDone) return tokenProvider;
+    synchronized (tpInitLock) {
+      if (isTpInitDone) return tokenProvider;
+      try {
+        tokenProvider = createTokenProviderImpl(conf);
+        isTpInitDone = true;
+      } catch (IOException e) {
+        throw new RuntimeException("Cannot initialize token provider", e);
+      }
+      return tokenProvider;
+    }
+  }
+
+  private static LlapTokenProvider createTokenProviderImpl(Configuration conf) throws IOException {
+    try {
+      @SuppressWarnings("unchecked")
+      Class<? extends LlapTokenProvider> clazz =
+        (Class<? extends LlapTokenProvider>)Class.forName(TOKEN_CLASS);
+      Constructor<? extends LlapTokenProvider> ctor =
+          clazz.getDeclaredConstructor(Configuration.class);
+      ctor.setAccessible(true);
+      return ctor.newInstance(conf);
+    } catch (Exception e) {
+      throw new RuntimeException("Failed to create token provider class", e);
+    }
+  }
+
+  public static void close() {
+    if (io != null) {
+      io.close();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/915587b8/llap-client/src/java/org/apache/hadoop/hive/llap/registry/ServiceInstance.java
----------------------------------------------------------------------
diff --git a/llap-client/src/java/org/apache/hadoop/hive/llap/registry/ServiceInstance.java b/llap-client/src/java/org/apache/hadoop/hive/llap/registry/ServiceInstance.java
index f116de4..2bd860a 100644
--- a/llap-client/src/java/org/apache/hadoop/hive/llap/registry/ServiceInstance.java
+++ b/llap-client/src/java/org/apache/hadoop/hive/llap/registry/ServiceInstance.java
@@ -40,6 +40,13 @@ public interface ServiceInstance {
   public int getRpcPort();
 
   /**
+   * Management endpoint for service instance
+   *
+   * @return
+   */
+  public int getManagementPort();
+
+  /**
    * Shuffle Endpoint for service instance
    * 
    * @return

http://git-wip-us.apache.org/repos/asf/hive/blob/915587b8/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapFixedRegistryImpl.java
----------------------------------------------------------------------
diff --git a/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapFixedRegistryImpl.java b/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapFixedRegistryImpl.java
index a085427..ef9de32 100644
--- a/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapFixedRegistryImpl.java
+++ b/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapFixedRegistryImpl.java
@@ -46,6 +46,7 @@ public class LlapFixedRegistryImpl implements ServiceRegistry {
 
   private final int port;
   private final int shuffle;
+  private final int mngPort;
   private final String[] hosts;
   private final int memory;
   private final int vcores;
@@ -58,6 +59,7 @@ public class LlapFixedRegistryImpl implements ServiceRegistry {
     this.port = HiveConf.getIntVar(conf, ConfVars.LLAP_DAEMON_RPC_PORT);
     this.shuffle = HiveConf.getIntVar(conf, ConfVars.LLAP_DAEMON_YARN_SHUFFLE_PORT);
     this.resolveHosts = conf.getBoolean(FIXED_REGISTRY_RESOLVE_HOST_NAMES, true);
+    this.mngPort = HiveConf.getIntVar(conf, ConfVars.LLAP_MANAGEMENT_RPC_PORT);
 
     for (Map.Entry<String, String> kv : conf) {
       if (kv.getKey().startsWith(LlapConfiguration.LLAP_DAEMON_PREFIX)
@@ -136,6 +138,11 @@ public class LlapFixedRegistryImpl implements ServiceRegistry {
     }
 
     @Override
+    public int getManagementPort() {
+      return LlapFixedRegistryImpl.this.mngPort;
+    }
+
+    @Override
     public int getShufflePort() {
       return LlapFixedRegistryImpl.this.shuffle;
     }

http://git-wip-us.apache.org/repos/asf/hive/blob/915587b8/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapYarnRegistryImpl.java
----------------------------------------------------------------------
diff --git a/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapYarnRegistryImpl.java b/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapYarnRegistryImpl.java
index 2673ad7..fc2ebf2 100644
--- a/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapYarnRegistryImpl.java
+++ b/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapYarnRegistryImpl.java
@@ -58,6 +58,10 @@ import com.google.common.base.Preconditions;
 
 public class LlapYarnRegistryImpl implements ServiceRegistry {
 
+  /** IPC endpoint names. */
+  private static final String IPC_SERVICES = "services",
+      IPC_MNG = "llapmng", IPC_SHUFFLE = "shuffle", IPC_LLAP = "llap";
+
   private static final Logger LOG = LoggerFactory.getLogger(LlapYarnRegistryImpl.class);
 
   private final RegistryOperationsService client;
@@ -108,13 +112,13 @@ public class LlapYarnRegistryImpl implements ServiceRegistry {
 
   public Endpoint getRpcEndpoint() {
     final int rpcPort = HiveConf.getIntVar(conf, ConfVars.LLAP_DAEMON_RPC_PORT);
-    return RegistryTypeUtils.ipcEndpoint("llap", new InetSocketAddress(hostname, rpcPort));
+    return RegistryTypeUtils.ipcEndpoint(IPC_LLAP, new InetSocketAddress(hostname, rpcPort));
   }
 
   public Endpoint getShuffleEndpoint() {
     final int shufflePort = HiveConf.getIntVar(conf, ConfVars.LLAP_DAEMON_YARN_SHUFFLE_PORT);
     // HTTP today, but might not be
-    return RegistryTypeUtils.inetAddrEndpoint("shuffle", ProtocolTypes.PROTOCOL_TCP, hostname,
+    return RegistryTypeUtils.inetAddrEndpoint(IPC_SHUFFLE, ProtocolTypes.PROTOCOL_TCP, hostname,
         shufflePort);
   }
 
@@ -125,7 +129,7 @@ public class LlapYarnRegistryImpl implements ServiceRegistry {
     final URL serviceURL;
     try {
       serviceURL = new URL(scheme, hostname, servicePort, "");
-      return RegistryTypeUtils.webEndpoint("services", serviceURL.toURI());
+      return RegistryTypeUtils.webEndpoint(IPC_SERVICES, serviceURL.toURI());
     } catch (MalformedURLException e) {
       throw new RuntimeException(e);
     } catch (URISyntaxException e) {
@@ -133,6 +137,11 @@ public class LlapYarnRegistryImpl implements ServiceRegistry {
     }
   }
 
+  public Endpoint getMngEndpoint() {
+    return RegistryTypeUtils.ipcEndpoint(IPC_MNG, new InetSocketAddress(hostname,
+        HiveConf.getIntVar(conf, ConfVars.LLAP_MANAGEMENT_RPC_PORT)));
+  }
+
   private final String getPath() {
     return this.path;
   }
@@ -142,6 +151,7 @@ public class LlapYarnRegistryImpl implements ServiceRegistry {
     String path = getPath();
     ServiceRecord srv = new ServiceRecord();
     srv.addInternalEndpoint(getRpcEndpoint());
+    srv.addInternalEndpoint(getMngEndpoint());
     srv.addInternalEndpoint(getShuffleEndpoint());
     srv.addExternalEndpoint(getServicesEndpoint());
 
@@ -174,13 +184,15 @@ public class LlapYarnRegistryImpl implements ServiceRegistry {
     private boolean alive = true;
     private final String host;
     private final int rpcPort;
+    private final int mngPort;
     private final int shufflePort;
 
     public DynamicServiceInstance(ServiceRecord srv) throws IOException {
       this.srv = srv;
 
-      final Endpoint shuffle = srv.getInternalEndpoint("shuffle");
-      final Endpoint rpc = srv.getInternalEndpoint("llap");
+      final Endpoint shuffle = srv.getInternalEndpoint(IPC_SHUFFLE);
+      final Endpoint rpc = srv.getInternalEndpoint(IPC_LLAP);
+      final Endpoint mng = srv.getInternalEndpoint(IPC_MNG);
 
       this.host =
           RegistryTypeUtils.getAddressField(rpc.addresses.get(0),
@@ -188,6 +200,9 @@ public class LlapYarnRegistryImpl implements ServiceRegistry {
       this.rpcPort =
           Integer.valueOf(RegistryTypeUtils.getAddressField(rpc.addresses.get(0),
               AddressTypes.ADDRESS_PORT_FIELD));
+      this.mngPort =
+          Integer.valueOf(RegistryTypeUtils.getAddressField(mng.addresses.get(0),
+              AddressTypes.ADDRESS_PORT_FIELD));
       this.shufflePort =
           Integer.valueOf(RegistryTypeUtils.getAddressField(shuffle.addresses.get(0),
               AddressTypes.ADDRESS_PORT_FIELD));
@@ -241,6 +256,11 @@ public class LlapYarnRegistryImpl implements ServiceRegistry {
       return "DynamicServiceInstance [alive=" + alive + ", host=" + host + ":" + rpcPort + " with resources=" + getResource() +"]";
     }
 
+    @Override
+    public int getManagementPort() {
+      return mngPort;
+    }
+
     // Relying on the identity hashCode and equality, since refreshing instances retains the old copy
     // of an already known instance.
   }

http://git-wip-us.apache.org/repos/asf/hive/blob/915587b8/llap-client/src/java/org/apache/hadoop/hive/llap/security/LlapTokenIdentifier.java
----------------------------------------------------------------------
diff --git a/llap-client/src/java/org/apache/hadoop/hive/llap/security/LlapTokenIdentifier.java b/llap-client/src/java/org/apache/hadoop/hive/llap/security/LlapTokenIdentifier.java
new file mode 100644
index 0000000..f0bb495
--- /dev/null
+++ b/llap-client/src/java/org/apache/hadoop/hive/llap/security/LlapTokenIdentifier.java
@@ -0,0 +1,82 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.llap.security;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier;
+
+/** For now, a LLAP token gives access to any LLAP server. */
+public class LlapTokenIdentifier extends AbstractDelegationTokenIdentifier {
+  private static final String KIND = "LLAP_TOKEN";
+  public static final Text KIND_NAME = new Text(KIND);
+
+  public LlapTokenIdentifier() {
+    super();
+  }
+
+  public LlapTokenIdentifier(Text owner, Text renewer, Text realUser) {
+    super(owner, renewer, realUser);
+  }
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    super.write(out);
+    // Nothing right now.
+  }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    super.readFields(in);
+    // Nothing right now.
+  }
+
+  @Override
+  public Text getKind() {
+    return KIND_NAME;
+  }
+
+  @Override
+  public int hashCode() {
+    return -1;
+  }
+
+  @Override
+  public boolean equals(Object other) {
+    return (other != null) && other.getClass().isAssignableFrom(this.getClass());
+  }
+
+  @Override
+  public String toString() {
+    return KIND;
+  }
+
+  @InterfaceAudience.Private
+  public static class Renewer extends Token.TrivialRenewer {
+    @Override
+    protected Text getKind() {
+      return KIND_NAME;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/915587b8/llap-client/src/java/org/apache/hadoop/hive/llap/security/LlapTokenProvider.java
----------------------------------------------------------------------
diff --git a/llap-client/src/java/org/apache/hadoop/hive/llap/security/LlapTokenProvider.java b/llap-client/src/java/org/apache/hadoop/hive/llap/security/LlapTokenProvider.java
new file mode 100644
index 0000000..2e99a28
--- /dev/null
+++ b/llap-client/src/java/org/apache/hadoop/hive/llap/security/LlapTokenProvider.java
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.llap.security;
+
+import java.io.IOException;
+
+import org.apache.hadoop.security.token.Token;
+
+public interface LlapTokenProvider {
+  Token<LlapTokenIdentifier> getDelegationToken() throws IOException;
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hive/blob/915587b8/llap-server/src/gen/protobuf/gen-java/org/apache/hadoop/hive/llap/daemon/rpc/LlapDaemonProtocolProtos.java
----------------------------------------------------------------------
diff --git a/llap-server/src/gen/protobuf/gen-java/org/apache/hadoop/hive/llap/daemon/rpc/LlapDaemonProtocolProtos.java b/llap-server/src/gen/protobuf/gen-java/org/apache/hadoop/hive/llap/daemon/rpc/LlapDaemonProtocolProtos.java
index b044df9..af009b8 100644
--- a/llap-server/src/gen/protobuf/gen-java/org/apache/hadoop/hive/llap/daemon/rpc/LlapDaemonProtocolProtos.java
+++ b/llap-server/src/gen/protobuf/gen-java/org/apache/hadoop/hive/llap/daemon/rpc/LlapDaemonProtocolProtos.java
@@ -11991,6 +11991,781 @@ public final class LlapDaemonProtocolProtos {
     // @@protoc_insertion_point(class_scope:TerminateFragmentResponseProto)
   }
 
+  public interface GetTokenRequestProtoOrBuilder
+      extends com.google.protobuf.MessageOrBuilder {
+  }
+  /**
+   * Protobuf type {@code GetTokenRequestProto}
+   */
+  public static final class GetTokenRequestProto extends
+      com.google.protobuf.GeneratedMessage
+      implements GetTokenRequestProtoOrBuilder {
+    // Use GetTokenRequestProto.newBuilder() to construct.
+    private GetTokenRequestProto(com.google.protobuf.GeneratedMessage.Builder<?> builder) {
+      super(builder);
+      this.unknownFields = builder.getUnknownFields();
+    }
+    private GetTokenRequestProto(boolean noInit) { this.unknownFields = com.google.protobuf.UnknownFieldSet.getDefaultInstance(); }
+
+    private static final GetTokenRequestProto defaultInstance;
+    public static GetTokenRequestProto getDefaultInstance() {
+      return defaultInstance;
+    }
+
+    public GetTokenRequestProto getDefaultInstanceForType() {
+      return defaultInstance;
+    }
+
+    private final com.google.protobuf.UnknownFieldSet unknownFields;
+    @java.lang.Override
+    public final com.google.protobuf.UnknownFieldSet
+        getUnknownFields() {
+      return this.unknownFields;
+    }
+    private GetTokenRequestProto(
+        com.google.protobuf.CodedInputStream input,
+        com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+        throws com.google.protobuf.InvalidProtocolBufferException {
+      initFields();
+      com.google.protobuf.UnknownFieldSet.Builder unknownFields =
+          com.google.protobuf.UnknownFieldSet.newBuilder();
+      try {
+        boolean done = false;
+        while (!done) {
+          int tag = input.readTag();
+          switch (tag) {
+            case 0:
+              done = true;
+              break;
+            default: {
+              if (!parseUnknownField(input, unknownFields,
+                                     extensionRegistry, tag)) {
+                done = true;
+              }
+              break;
+            }
+          }
+        }
+      } catch (com.google.protobuf.InvalidProtocolBufferException e) {
+        throw e.setUnfinishedMessage(this);
+      } catch (java.io.IOException e) {
+        throw new com.google.protobuf.InvalidProtocolBufferException(
+            e.getMessage()).setUnfinishedMessage(this);
+      } finally {
+        this.unknownFields = unknownFields.build();
+        makeExtensionsImmutable();
+      }
+    }
+    public static final com.google.protobuf.Descriptors.Descriptor
+        getDescriptor() {
+      return org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.internal_static_GetTokenRequestProto_descriptor;
+    }
+
+    protected com.google.protobuf.GeneratedMessage.FieldAccessorTable
+        internalGetFieldAccessorTable() {
+      return org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.internal_static_GetTokenRequestProto_fieldAccessorTable
+          .ensureFieldAccessorsInitialized(
+              org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.GetTokenRequestProto.class, org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.GetTokenRequestProto.Builder.class);
+    }
+
+    public static com.google.protobuf.Parser<GetTokenRequestProto> PARSER =
+        new com.google.protobuf.AbstractParser<GetTokenRequestProto>() {
+      public GetTokenRequestProto parsePartialFrom(
+          com.google.protobuf.CodedInputStream input,
+          com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+          throws com.google.protobuf.InvalidProtocolBufferException {
+        return new GetTokenRequestProto(input, extensionRegistry);
+      }
+    };
+
+    @java.lang.Override
+    public com.google.protobuf.Parser<GetTokenRequestProto> getParserForType() {
+      return PARSER;
+    }
+
+    private void initFields() {
+    }
+    private byte memoizedIsInitialized = -1;
+    public final boolean isInitialized() {
+      byte isInitialized = memoizedIsInitialized;
+      if (isInitialized != -1) return isInitialized == 1;
+
+      memoizedIsInitialized = 1;
+      return true;
+    }
+
+    public void writeTo(com.google.protobuf.CodedOutputStream output)
+                        throws java.io.IOException {
+      getSerializedSize();
+      getUnknownFields().writeTo(output);
+    }
+
+    private int memoizedSerializedSize = -1;
+    public int getSerializedSize() {
+      int size = memoizedSerializedSize;
+      if (size != -1) return size;
+
+      size = 0;
+      size += getUnknownFields().getSerializedSize();
+      memoizedSerializedSize = size;
+      return size;
+    }
+
+    private static final long serialVersionUID = 0L;
+    @java.lang.Override
+    protected java.lang.Object writeReplace()
+        throws java.io.ObjectStreamException {
+      return super.writeReplace();
+    }
+
+    @java.lang.Override
+    public boolean equals(final java.lang.Object obj) {
+      if (obj == this) {
+       return true;
+      }
+      if (!(obj instanceof org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.GetTokenRequestProto)) {
+        return super.equals(obj);
+      }
+      org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.GetTokenRequestProto other = (org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.GetTokenRequestProto) obj;
+
+      boolean result = true;
+      result = result &&
+          getUnknownFields().equals(other.getUnknownFields());
+      return result;
+    }
+
+    private int memoizedHashCode = 0;
+    @java.lang.Override
+    public int hashCode() {
+      if (memoizedHashCode != 0) {
+        return memoizedHashCode;
+      }
+      int hash = 41;
+      hash = (19 * hash) + getDescriptorForType().hashCode();
+      hash = (29 * hash) + getUnknownFields().hashCode();
+      memoizedHashCode = hash;
+      return hash;
+    }
+
+    public static org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.GetTokenRequestProto parseFrom(
+        com.google.protobuf.ByteString data)
+        throws com.google.protobuf.InvalidProtocolBufferException {
+      return PARSER.parseFrom(data);
+    }
+    public static org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.GetTokenRequestProto parseFrom(
+        com.google.protobuf.ByteString data,
+        com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+        throws com.google.protobuf.InvalidProtocolBufferException {
+      return PARSER.parseFrom(data, extensionRegistry);
+    }
+    public static org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.GetTokenRequestProto parseFrom(byte[] data)
+        throws com.google.protobuf.InvalidProtocolBufferException {
+      return PARSER.parseFrom(data);
+    }
+    public static org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.GetTokenRequestProto parseFrom(
+        byte[] data,
+        com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+        throws com.google.protobuf.InvalidProtocolBufferException {
+      return PARSER.parseFrom(data, extensionRegistry);
+    }
+    public static org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.GetTokenRequestProto parseFrom(java.io.InputStream input)
+        throws java.io.IOException {
+      return PARSER.parseFrom(input);
+    }
+    public static org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.GetTokenRequestProto parseFrom(
+        java.io.InputStream input,
+        com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+        throws java.io.IOException {
+      return PARSER.parseFrom(input, extensionRegistry);
+    }
+    public static org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.GetTokenRequestProto parseDelimitedFrom(java.io.InputStream input)
+        throws java.io.IOException {
+      return PARSER.parseDelimitedFrom(input);
+    }
+    public static org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.GetTokenRequestProto parseDelimitedFrom(
+        java.io.InputStream input,
+        com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+        throws java.io.IOException {
+      return PARSER.parseDelimitedFrom(input, extensionRegistry);
+    }
+    public static org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.GetTokenRequestProto parseFrom(
+        com.google.protobuf.CodedInputStream input)
+        throws java.io.IOException {
+      return PARSER.parseFrom(input);
+    }
+    public static org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.GetTokenRequestProto parseFrom(
+        com.google.protobuf.CodedInputStream input,
+        com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+        throws java.io.IOException {
+      return PARSER.parseFrom(input, extensionRegistry);
+    }
+
+    public static Builder newBuilder() { return Builder.create(); }
+    public Builder newBuilderForType() { return newBuilder(); }
+    public static Builder newBuilder(org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.GetTokenRequestProto prototype) {
+      return newBuilder().mergeFrom(prototype);
+    }
+    public Builder toBuilder() { return newBuilder(this); }
+
+    @java.lang.Override
+    protected Builder newBuilderForType(
+        com.google.protobuf.GeneratedMessage.BuilderParent parent) {
+      Builder builder = new Builder(parent);
+      return builder;
+    }
+    /**
+     * Protobuf type {@code GetTokenRequestProto}
+     */
+    public static final class Builder extends
+        com.google.protobuf.GeneratedMessage.Builder<Builder>
+       implements org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.GetTokenRequestProtoOrBuilder {
+      public static final com.google.protobuf.Descriptors.Descriptor
+          getDescriptor() {
+        return org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.internal_static_GetTokenRequestProto_descriptor;
+      }
+
+      protected com.google.protobuf.GeneratedMessage.FieldAccessorTable
+          internalGetFieldAccessorTable() {
+        return org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.internal_static_GetTokenRequestProto_fieldAccessorTable
+            .ensureFieldAccessorsInitialized(
+                org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.GetTokenRequestProto.class, org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.GetTokenRequestProto.Builder.class);
+      }
+
+      // Construct using org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.GetTokenRequestProto.newBuilder()
+      private Builder() {
+        maybeForceBuilderInitialization();
+      }
+
+      private Builder(
+          com.google.protobuf.GeneratedMessage.BuilderParent parent) {
+        super(parent);
+        maybeForceBuilderInitialization();
+      }
+      private void maybeForceBuilderInitialization() {
+        if (com.google.protobuf.GeneratedMessage.alwaysUseFieldBuilders) {
+        }
+      }
+      private static Builder create() {
+        return new Builder();
+      }
+
+      public Builder clear() {
+        super.clear();
+        return this;
+      }
+
+      public Builder clone() {
+        return create().mergeFrom(buildPartial());
+      }
+
+      public com.google.protobuf.Descriptors.Descriptor
+          getDescriptorForType() {
+        return org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.internal_static_GetTokenRequestProto_descriptor;
+      }
+
+      public org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.GetTokenRequestProto getDefaultInstanceForType() {
+        return org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.GetTokenRequestProto.getDefaultInstance();
+      }
+
+      public org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.GetTokenRequestProto build() {
+        org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.GetTokenRequestProto result = buildPartial();
+        if (!result.isInitialized()) {
+          throw newUninitializedMessageException(result);
+        }
+        return result;
+      }
+
+      public org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.GetTokenRequestProto buildPartial() {
+        org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.GetTokenRequestProto result = new org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.GetTokenRequestProto(this);
+        onBuilt();
+        return result;
+      }
+
+      public Builder mergeFrom(com.google.protobuf.Message other) {
+        if (other instanceof org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.GetTokenRequestProto) {
+          return mergeFrom((org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.GetTokenRequestProto)other);
+        } else {
+          super.mergeFrom(other);
+          return this;
+        }
+      }
+
+      public Builder mergeFrom(org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.GetTokenRequestProto other) {
+        if (other == org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.GetTokenRequestProto.getDefaultInstance()) return this;
+        this.mergeUnknownFields(other.getUnknownFields());
+        return this;
+      }
+
+      public final boolean isInitialized() {
+        return true;
+      }
+
+      public Builder mergeFrom(
+          com.google.protobuf.CodedInputStream input,
+          com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+          throws java.io.IOException {
+        org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.GetTokenRequestProto parsedMessage = null;
+        try {
+          parsedMessage = PARSER.parsePartialFrom(input, extensionRegistry);
+        } catch (com.google.protobuf.InvalidProtocolBufferException e) {
+          parsedMessage = (org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.GetTokenRequestProto) e.getUnfinishedMessage();
+          throw e;
+        } finally {
+          if (parsedMessage != null) {
+            mergeFrom(parsedMessage);
+          }
+        }
+        return this;
+      }
+
+      // @@protoc_insertion_point(builder_scope:GetTokenRequestProto)
+    }
+
+    static {
+      defaultInstance = new GetTokenRequestProto(true);
+      defaultInstance.initFields();
+    }
+
+    // @@protoc_insertion_point(class_scope:GetTokenRequestProto)
+  }
+
+  public interface GetTokenResponseProtoOrBuilder
+      extends com.google.protobuf.MessageOrBuilder {
+
+    // optional bytes token = 1;
+    /**
+     * <code>optional bytes token = 1;</code>
+     */
+    boolean hasToken();
+    /**
+     * <code>optional bytes token = 1;</code>
+     */
+    com.google.protobuf.ByteString getToken();
+  }
+  /**
+   * Protobuf type {@code GetTokenResponseProto}
+   */
+  public static final class GetTokenResponseProto extends
+      com.google.protobuf.GeneratedMessage
+      implements GetTokenResponseProtoOrBuilder {
+    // Use GetTokenResponseProto.newBuilder() to construct.
+    private GetTokenResponseProto(com.google.protobuf.GeneratedMessage.Builder<?> builder) {
+      super(builder);
+      this.unknownFields = builder.getUnknownFields();
+    }
+    private GetTokenResponseProto(boolean noInit) { this.unknownFields = com.google.protobuf.UnknownFieldSet.getDefaultInstance(); }
+
+    private static final GetTokenResponseProto defaultInstance;
+    public static GetTokenResponseProto getDefaultInstance() {
+      return defaultInstance;
+    }
+
+    public GetTokenResponseProto getDefaultInstanceForType() {
+      return defaultInstance;
+    }
+
+    private final com.google.protobuf.UnknownFieldSet unknownFields;
+    @java.lang.Override
+    public final com.google.protobuf.UnknownFieldSet
+        getUnknownFields() {
+      return this.unknownFields;
+    }
+    private GetTokenResponseProto(
+        com.google.protobuf.CodedInputStream input,
+        com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+        throws com.google.protobuf.InvalidProtocolBufferException {
+      initFields();
+      int mutable_bitField0_ = 0;
+      com.google.protobuf.UnknownFieldSet.Builder unknownFields =
+          com.google.protobuf.UnknownFieldSet.newBuilder();
+      try {
+        boolean done = false;
+        while (!done) {
+          int tag = input.readTag();
+          switch (tag) {
+            case 0:
+              done = true;
+              break;
+            default: {
+              if (!parseUnknownField(input, unknownFields,
+                                     extensionRegistry, tag)) {
+                done = true;
+              }
+              break;
+            }
+            case 10: {
+              bitField0_ |= 0x00000001;
+              token_ = input.readBytes();
+              break;
+            }
+          }
+        }
+      } catch (com.google.protobuf.InvalidProtocolBufferException e) {
+        throw e.setUnfinishedMessage(this);
+      } catch (java.io.IOException e) {
+        throw new com.google.protobuf.InvalidProtocolBufferException(
+            e.getMessage()).setUnfinishedMessage(this);
+      } finally {
+        this.unknownFields = unknownFields.build();
+        makeExtensionsImmutable();
+      }
+    }
+    public static final com.google.protobuf.Descriptors.Descriptor
+        getDescriptor() {
+      return org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.internal_static_GetTokenResponseProto_descriptor;
+    }
+
+    protected com.google.protobuf.GeneratedMessage.FieldAccessorTable
+        internalGetFieldAccessorTable() {
+      return org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.internal_static_GetTokenResponseProto_fieldAccessorTable
+          .ensureFieldAccessorsInitialized(
+              org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.GetTokenResponseProto.class, org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.GetTokenResponseProto.Builder.class);
+    }
+
+    public static com.google.protobuf.Parser<GetTokenResponseProto> PARSER =
+        new com.google.protobuf.AbstractParser<GetTokenResponseProto>() {
+      public GetTokenResponseProto parsePartialFrom(
+          com.google.protobuf.CodedInputStream input,
+          com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+          throws com.google.protobuf.InvalidProtocolBufferException {
+        return new GetTokenResponseProto(input, extensionRegistry);
+      }
+    };
+
+    @java.lang.Override
+    public com.google.protobuf.Parser<GetTokenResponseProto> getParserForType() {
+      return PARSER;
+    }
+
+    private int bitField0_;
+    // optional bytes token = 1;
+    public static final int TOKEN_FIELD_NUMBER = 1;
+    private com.google.protobuf.ByteString token_;
+    /**
+     * <code>optional bytes token = 1;</code>
+     */
+    public boolean hasToken() {
+      return ((bitField0_ & 0x00000001) == 0x00000001);
+    }
+    /**
+     * <code>optional bytes token = 1;</code>
+     */
+    public com.google.protobuf.ByteString getToken() {
+      return token_;
+    }
+
+    private void initFields() {
+      token_ = com.google.protobuf.ByteString.EMPTY;
+    }
+    private byte memoizedIsInitialized = -1;
+    public final boolean isInitialized() {
+      byte isInitialized = memoizedIsInitialized;
+      if (isInitialized != -1) return isInitialized == 1;
+
+      memoizedIsInitialized = 1;
+      return true;
+    }
+
+    public void writeTo(com.google.protobuf.CodedOutputStream output)
+                        throws java.io.IOException {
+      getSerializedSize();
+      if (((bitField0_ & 0x00000001) == 0x00000001)) {
+        output.writeBytes(1, token_);
+      }
+      getUnknownFields().writeTo(output);
+    }
+
+    private int memoizedSerializedSize = -1;
+    public int getSerializedSize() {
+      int size = memoizedSerializedSize;
+      if (size != -1) return size;
+
+      size = 0;
+      if (((bitField0_ & 0x00000001) == 0x00000001)) {
+        size += com.google.protobuf.CodedOutputStream
+          .computeBytesSize(1, token_);
+      }
+      size += getUnknownFields().getSerializedSize();
+      memoizedSerializedSize = size;
+      return size;
+    }
+
+    private static final long serialVersionUID = 0L;
+    @java.lang.Override
+    protected java.lang.Object writeReplace()
+        throws java.io.ObjectStreamException {
+      return super.writeReplace();
+    }
+
+    @java.lang.Override
+    public boolean equals(final java.lang.Object obj) {
+      if (obj == this) {
+       return true;
+      }
+      if (!(obj instanceof org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.GetTokenResponseProto)) {
+        return super.equals(obj);
+      }
+      org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.GetTokenResponseProto other = (org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.GetTokenResponseProto) obj;
+
+      boolean result = true;
+      result = result && (hasToken() == other.hasToken());
+      if (hasToken()) {
+        result = result && getToken()
+            .equals(other.getToken());
+      }
+      result = result &&
+          getUnknownFields().equals(other.getUnknownFields());
+      return result;
+    }
+
+    private int memoizedHashCode = 0;
+    @java.lang.Override
+    public int hashCode() {
+      if (memoizedHashCode != 0) {
+        return memoizedHashCode;
+      }
+      int hash = 41;
+      hash = (19 * hash) + getDescriptorForType().hashCode();
+      if (hasToken()) {
+        hash = (37 * hash) + TOKEN_FIELD_NUMBER;
+        hash = (53 * hash) + getToken().hashCode();
+      }
+      hash = (29 * hash) + getUnknownFields().hashCode();
+      memoizedHashCode = hash;
+      return hash;
+    }
+
+    public static org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.GetTokenResponseProto parseFrom(
+        com.google.protobuf.ByteString data)
+        throws com.google.protobuf.InvalidProtocolBufferException {
+      return PARSER.parseFrom(data);
+    }
+    public static org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.GetTokenResponseProto parseFrom(
+        com.google.protobuf.ByteString data,
+        com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+        throws com.google.protobuf.InvalidProtocolBufferException {
+      return PARSER.parseFrom(data, extensionRegistry);
+    }
+    public static org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.GetTokenResponseProto parseFrom(byte[] data)
+        throws com.google.protobuf.InvalidProtocolBufferException {
+      return PARSER.parseFrom(data);
+    }
+    public static org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.GetTokenResponseProto parseFrom(
+        byte[] data,
+        com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+        throws com.google.protobuf.InvalidProtocolBufferException {
+      return PARSER.parseFrom(data, extensionRegistry);
+    }
+    public static org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.GetTokenResponseProto parseFrom(java.io.InputStream input)
+        throws java.io.IOException {
+      return PARSER.parseFrom(input);
+    }
+    public static org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.GetTokenResponseProto parseFrom(
+        java.io.InputStream input,
+        com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+        throws java.io.IOException {
+      return PARSER.parseFrom(input, extensionRegistry);
+    }
+    public static org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.GetTokenResponseProto parseDelimitedFrom(java.io.InputStream input)
+        throws java.io.IOException {
+      return PARSER.parseDelimitedFrom(input);
+    }
+    public static org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.GetTokenResponseProto parseDelimitedFrom(
+        java.io.InputStream input,
+        com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+        throws java.io.IOException {
+      return PARSER.parseDelimitedFrom(input, extensionRegistry);
+    }
+    public static org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.GetTokenResponseProto parseFrom(
+        com.google.protobuf.CodedInputStream input)
+        throws java.io.IOException {
+      return PARSER.parseFrom(input);
+    }
+    public static org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.GetTokenResponseProto parseFrom(
+        com.google.protobuf.CodedInputStream input,
+        com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+        throws java.io.IOException {
+      return PARSER.parseFrom(input, extensionRegistry);
+    }
+
+    public static Builder newBuilder() { return Builder.create(); }
+    public Builder newBuilderForType() { return newBuilder(); }
+    public static Builder newBuilder(org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.GetTokenResponseProto prototype) {
+      return newBuilder().mergeFrom(prototype);
+    }
+    public Builder toBuilder() { return newBuilder(this); }
+
+    @java.lang.Override
+    protected Builder newBuilderForType(
+        com.google.protobuf.GeneratedMessage.BuilderParent parent) {
+      Builder builder = new Builder(parent);
+      return builder;
+    }
+    /**
+     * Protobuf type {@code GetTokenResponseProto}
+     */
+    public static final class Builder extends
+        com.google.protobuf.GeneratedMessage.Builder<Builder>
+       implements org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.GetTokenResponseProtoOrBuilder {
+      public static final com.google.protobuf.Descriptors.Descriptor
+          getDescriptor() {
+        return org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.internal_static_GetTokenResponseProto_descriptor;
+      }
+
+      protected com.google.protobuf.GeneratedMessage.FieldAccessorTable
+          internalGetFieldAccessorTable() {
+        return org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.internal_static_GetTokenResponseProto_fieldAccessorTable
+            .ensureFieldAccessorsInitialized(
+                org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.GetTokenResponseProto.class, org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.GetTokenResponseProto.Builder.class);
+      }
+
+      // Construct using org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.GetTokenResponseProto.newBuilder()
+      private Builder() {
+        maybeForceBuilderInitialization();
+      }
+
+      private Builder(
+          com.google.protobuf.GeneratedMessage.BuilderParent parent) {
+        super(parent);
+        maybeForceBuilderInitialization();
+      }
+      private void maybeForceBuilderInitialization() {
+        if (com.google.protobuf.GeneratedMessage.alwaysUseFieldBuilders) {
+        }
+      }
+      private static Builder create() {
+        return new Builder();
+      }
+
+      public Builder clear() {
+        super.clear();
+        token_ = com.google.protobuf.ByteString.EMPTY;
+        bitField0_ = (bitField0_ & ~0x00000001);
+        return this;
+      }
+
+      public Builder clone() {
+        return create().mergeFrom(buildPartial());
+      }
+
+      public com.google.protobuf.Descriptors.Descriptor
+          getDescriptorForType() {
+        return org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.internal_static_GetTokenResponseProto_descriptor;
+      }
+
+      public org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.GetTokenResponseProto getDefaultInstanceForType() {
+        return org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.GetTokenResponseProto.getDefaultInstance();
+      }
+
+      public org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.GetTokenResponseProto build() {
+        org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.GetTokenResponseProto result = buildPartial();
+        if (!result.isInitialized()) {
+          throw newUninitializedMessageException(result);
+        }
+        return result;
+      }
+
+      public org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.GetTokenResponseProto buildPartial() {
+        org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.GetTokenResponseProto result = new org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.GetTokenResponseProto(this);
+        int from_bitField0_ = bitField0_;
+        int to_bitField0_ = 0;
+        if (((from_bitField0_ & 0x00000001) == 0x00000001)) {
+          to_bitField0_ |= 0x00000001;
+        }
+        result.token_ = token_;
+        result.bitField0_ = to_bitField0_;
+        onBuilt();
+        return result;
+      }
+
+      public Builder mergeFrom(com.google.protobuf.Message other) {
+        if (other instanceof org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.GetTokenResponseProto) {
+          return mergeFrom((org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.GetTokenResponseProto)other);
+        } else {
+          super.mergeFrom(other);
+          return this;
+        }
+      }
+
+      public Builder mergeFrom(org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.GetTokenResponseProto other) {
+        if (other == org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.GetTokenResponseProto.getDefaultInstance()) return this;
+        if (other.hasToken()) {
+          setToken(other.getToken());
+        }
+        this.mergeUnknownFields(other.getUnknownFields());
+        return this;
+      }
+
+      public final boolean isInitialized() {
+        return true;
+      }
+
+      public Builder mergeFrom(
+          com.google.protobuf.CodedInputStream input,
+          com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+          throws java.io.IOException {
+        org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.GetTokenResponseProto parsedMessage = null;
+        try {
+          parsedMessage = PARSER.parsePartialFrom(input, extensionRegistry);
+        } catch (com.google.protobuf.InvalidProtocolBufferException e) {
+          parsedMessage = (org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.GetTokenResponseProto) e.getUnfinishedMessage();
+          throw e;
+        } finally {
+          if (parsedMessage != null) {
+            mergeFrom(parsedMessage);
+          }
+        }
+        return this;
+      }
+      private int bitField0_;
+
+      // optional bytes token = 1;
+      private com.google.protobuf.ByteString token_ = com.google.protobuf.ByteString.EMPTY;
+      /**
+       * <code>optional bytes token = 1;</code>
+       */
+      public boolean hasToken() {
+        return ((bitField0_ & 0x00000001) == 0x00000001);
+      }
+      /**
+       * <code>optional bytes token = 1;</code>
+       */
+      public com.google.protobuf.ByteString getToken() {
+        return token_;
+      }
+      /**
+       * <code>optional bytes token = 1;</code>
+       */
+      public Builder setToken(com.google.protobuf.ByteString value) {
+        if (value == null) {
+    throw new NullPointerException();
+  }
+  bitField0_ |= 0x00000001;
+        token_ = value;
+        onChanged();
+        return this;
+      }
+      /**
+       * <code>optional bytes token = 1;</code>
+       */
+      public Builder clearToken() {
+        bitField0_ = (bitField0_ & ~0x00000001);
+        token_ = getDefaultInstance().getToken();
+        onChanged();
+        return this;
+      }
+
+      // @@protoc_insertion_point(builder_scope:GetTokenResponseProto)
+    }
+
+    static {
+      defaultInstance = new GetTokenResponseProto(true);
+      defaultInstance.initFields();
+    }
+
+    // @@protoc_insertion_point(class_scope:GetTokenResponseProto)
+  }
+
   /**
    * Protobuf service {@code LlapDaemonProtocol}
    */
@@ -12436,6 +13211,238 @@ public final class LlapDaemonProtocolProtos {
     // @@protoc_insertion_point(class_scope:LlapDaemonProtocol)
   }
 
+  /**
+   * Protobuf service {@code LlapManagementProtocol}
+   */
+  public static abstract class LlapManagementProtocol
+      implements com.google.protobuf.Service {
+    protected LlapManagementProtocol() {}
+
+    public interface Interface {
+      /**
+       * <code>rpc getDelegationToken(.GetTokenRequestProto) returns (.GetTokenResponseProto);</code>
+       */
+      public abstract void getDelegationToken(
+          com.google.protobuf.RpcController controller,
+          org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.GetTokenRequestProto request,
+          com.google.protobuf.RpcCallback<org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.GetTokenResponseProto> done);
+
+    }
+
+    public static com.google.protobuf.Service newReflectiveService(
+        final Interface impl) {
+      return new LlapManagementProtocol() {
+        @java.lang.Override
+        public  void getDelegationToken(
+            com.google.protobuf.RpcController controller,
+            org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.GetTokenRequestProto request,
+            com.google.protobuf.RpcCallback<org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.GetTokenResponseProto> done) {
+          impl.getDelegationToken(controller, request, done);
+        }
+
+      };
+    }
+
+    public static com.google.protobuf.BlockingService
+        newReflectiveBlockingService(final BlockingInterface impl) {
+      return new com.google.protobuf.BlockingService() {
+        public final com.google.protobuf.Descriptors.ServiceDescriptor
+            getDescriptorForType() {
+          return getDescriptor();
+        }
+
+        public final com.google.protobuf.Message callBlockingMethod(
+            com.google.protobuf.Descriptors.MethodDescriptor method,
+            com.google.protobuf.RpcController controller,
+            com.google.protobuf.Message request)
+            throws com.google.protobuf.ServiceException {
+          if (method.getService() != getDescriptor()) {
+            throw new java.lang.IllegalArgumentException(
+              "Service.callBlockingMethod() given method descriptor for " +
+              "wrong service type.");
+          }
+          switch(method.getIndex()) {
+            case 0:
+              return impl.getDelegationToken(controller, (org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.GetTokenRequestProto)request);
+            default:
+              throw new java.lang.AssertionError("Can't get here.");
+          }
+        }
+
+        public final com.google.protobuf.Message
+            getRequestPrototype(
+            com.google.protobuf.Descriptors.MethodDescriptor method) {
+          if (method.getService() != getDescriptor()) {
+            throw new java.lang.IllegalArgumentException(
+              "Service.getRequestPrototype() given method " +
+              "descriptor for wrong service type.");
+          }
+          switch(method.getIndex()) {
+            case 0:
+              return org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.GetTokenRequestProto.getDefaultInstance();
+            default:
+              throw new java.lang.AssertionError("Can't get here.");
+          }
+        }
+
+        public final com.google.protobuf.Message
+            getResponsePrototype(
+            com.google.protobuf.Descriptors.MethodDescriptor method) {
+          if (method.getService() != getDescriptor()) {
+            throw new java.lang.IllegalArgumentException(
+              "Service.getResponsePrototype() given method " +
+              "descriptor for wrong service type.");
+          }
+          switch(method.getIndex()) {
+            case 0:
+              return org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.GetTokenResponseProto.getDefaultInstance();
+            default:
+              throw new java.lang.AssertionError("Can't get here.");
+          }
+        }
+
+      };
+    }
+
+    /**
+     * <code>rpc getDelegationToken(.GetTokenRequestProto) returns (.GetTokenResponseProto);</code>
+     */
+    public abstract void getDelegationToken(
+        com.google.protobuf.RpcController controller,
+        org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.GetTokenRequestProto request,
+        com.google.protobuf.RpcCallback<org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.GetTokenResponseProto> done);
+
+    public static final
+        com.google.protobuf.Descriptors.ServiceDescriptor
+        getDescriptor() {
+      return org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.getDescriptor().getServices().get(1);
+    }
+    public final com.google.protobuf.Descriptors.ServiceDescriptor
+        getDescriptorForType() {
+      return getDescriptor();
+    }
+
+    public final void callMethod(
+        com.google.protobuf.Descriptors.MethodDescriptor method,
+        com.google.protobuf.RpcController controller,
+        com.google.protobuf.Message request,
+        com.google.protobuf.RpcCallback<
+          com.google.protobuf.Message> done) {
+      if (method.getService() != getDescriptor()) {
+        throw new java.lang.IllegalArgumentException(
+          "Service.callMethod() given method descriptor for wrong " +
+          "service type.");
+      }
+      switch(method.getIndex()) {
+        case 0:
+          this.getDelegationToken(controller, (org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.GetTokenRequestProto)request,
+            com.google.protobuf.RpcUtil.<org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.GetTokenResponseProto>specializeCallback(
+              done));
+          return;
+        default:
+          throw new java.lang.AssertionError("Can't get here.");
+      }
+    }
+
+    public final com.google.protobuf.Message
+        getRequestPrototype(
+        com.google.protobuf.Descriptors.MethodDescriptor method) {
+      if (method.getService() != getDescriptor()) {
+        throw new java.lang.IllegalArgumentException(
+          "Service.getRequestPrototype() given method " +
+          "descriptor for wrong service type.");
+      }
+      switch(method.getIndex()) {
+        case 0:
+          return org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.GetTokenRequestProto.getDefaultInstance();
+        default:
+          throw new java.lang.AssertionError("Can't get here.");
+      }
+    }
+
+    public final com.google.protobuf.Message
+        getResponsePrototype(
+        com.google.protobuf.Descriptors.MethodDescriptor method) {
+      if (method.getService() != getDescriptor()) {
+        throw new java.lang.IllegalArgumentException(
+          "Service.getResponsePrototype() given method " +
+          "descriptor for wrong service type.");
+      }
+      switch(method.getIndex()) {
+        case 0:
+          return org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.GetTokenResponseProto.getDefaultInstance();
+        default:
+          throw new java.lang.AssertionError("Can't get here.");
+      }
+    }
+
+    public static Stub newStub(
+        com.google.protobuf.RpcChannel channel) {
+      return new Stub(channel);
+    }
+
+    public static final class Stub extends org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.LlapManagementProtocol implements Interface {
+      private Stub(com.google.protobuf.RpcChannel channel) {
+        this.channel = channel;
+      }
+
+      private final com.google.protobuf.RpcChannel channel;
+
+      public com.google.protobuf.RpcChannel getChannel() {
+        return channel;
+      }
+
+      public  void getDelegationToken(
+          com.google.protobuf.RpcController controller,
+          org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.GetTokenRequestProto request,
+          com.google.protobuf.RpcCallback<org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.GetTokenResponseProto> done) {
+        channel.callMethod(
+          getDescriptor().getMethods().get(0),
+          controller,
+          request,
+          org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.GetTokenResponseProto.getDefaultInstance(),
+          com.google.protobuf.RpcUtil.generalizeCallback(
+            done,
+            org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.GetTokenResponseProto.class,
+            org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.GetTokenResponseProto.getDefaultInstance()));
+      }
+    }
+
+    public static BlockingInterface newBlockingStub(
+        com.google.protobuf.BlockingRpcChannel channel) {
+      return new BlockingStub(channel);
+    }
+
+    public interface BlockingInterface {
+      public org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.GetTokenResponseProto getDelegationToken(
+          com.google.protobuf.RpcController controller,
+          org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.GetTokenRequestProto request)
+          throws com.google.protobuf.ServiceException;
+    }
+
+    private static final class BlockingStub implements BlockingInterface {
+      private BlockingStub(com.google.protobuf.BlockingRpcChannel channel) {
+        this.channel = channel;
+      }
+
+      private final com.google.protobuf.BlockingRpcChannel channel;
+
+      public org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.GetTokenResponseProto getDelegationToken(
+          com.google.protobuf.RpcController controller,
+          org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.GetTokenRequestProto request)
+          throws com.google.protobuf.ServiceException {
+        return (org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.GetTokenResponseProto) channel.callBlockingMethod(
+          getDescriptor().getMethods().get(0),
+          controller,
+          request,
+          org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.GetTokenResponseProto.getDefaultInstance());
+      }
+
+    }
+
+    // @@protoc_insertion_point(class_scope:LlapManagementProtocol)
+  }
+
   private static com.google.protobuf.Descriptors.Descriptor
     internal_static_UserPayloadProto_descriptor;
   private static
@@ -12506,6 +13513,16 @@ public final class LlapDaemonProtocolProtos {
   private static
     com.google.protobuf.GeneratedMessage.FieldAccessorTable
       internal_static_TerminateFragmentResponseProto_fieldAccessorTable;
+  private static com.google.protobuf.Descriptors.Descriptor
+    internal_static_GetTokenRequestProto_descriptor;
+  private static
+    com.google.protobuf.GeneratedMessage.FieldAccessorTable
+      internal_static_GetTokenRequestProto_fieldAccessorTable;
+  private static com.google.protobuf.Descriptors.Descriptor
+    internal_static_GetTokenResponseProto_descriptor;
+  private static
+    com.google.protobuf.GeneratedMessage.FieldAccessorTable
+      internal_static_GetTokenResponseProto_fieldAccessorTable;
 
   public static com.google.protobuf.Descriptors.FileDescriptor
       getDescriptor() {
@@ -12558,19 +13575,23 @@ public final class LlapDaemonProtocolProtos {
       "mpleteResponseProto\"g\n\035TerminateFragment" +
       "RequestProto\022\020\n\010query_id\030\001 \001(\t\022\020\n\010dag_na" +
       "me\030\002 \001(\t\022\"\n\032fragment_identifier_string\030\007" +
-      " \001(\t\" \n\036TerminateFragmentResponseProto*2" +
-      "\n\020SourceStateProto\022\017\n\013S_SUCCEEDED\020\001\022\r\n\tS" +
-      "_RUNNING\020\0022\316\002\n\022LlapDaemonProtocol\022?\n\nsub" +
-      "mitWork\022\027.SubmitWorkRequestProto\032\030.Submi" +
-      "tWorkResponseProto\022W\n\022sourceStateUpdated" +
-      "\022\037.SourceStateUpdatedRequestProto\032 .Sour" +
-      "ceStateUpdatedResponseProto\022H\n\rqueryComp",
-      "lete\022\032.QueryCompleteRequestProto\032\033.Query" +
-      "CompleteResponseProto\022T\n\021terminateFragme" +
-      "nt\022\036.TerminateFragmentRequestProto\032\037.Ter" +
-      "minateFragmentResponseProtoBH\n&org.apach" +
-      "e.hadoop.hive.llap.daemon.rpcB\030LlapDaemo" +
-      "nProtocolProtos\210\001\001\240\001\001"
+      " \001(\t\" \n\036TerminateFragmentResponseProto\"\026" +
+      "\n\024GetTokenRequestProto\"&\n\025GetTokenRespon" +
+      "seProto\022\r\n\005token\030\001 \001(\014*2\n\020SourceStatePro" +
+      "to\022\017\n\013S_SUCCEEDED\020\001\022\r\n\tS_RUNNING\020\0022\316\002\n\022L" +
+      "lapDaemonProtocol\022?\n\nsubmitWork\022\027.Submit" +
+      "WorkRequestProto\032\030.SubmitWorkResponsePro" +
+      "to\022W\n\022sourceStateUpdated\022\037.SourceStateUp",
+      "datedRequestProto\032 .SourceStateUpdatedRe" +
+      "sponseProto\022H\n\rqueryComplete\022\032.QueryComp" +
+      "leteRequestProto\032\033.QueryCompleteResponse" +
+      "Proto\022T\n\021terminateFragment\022\036.TerminateFr" +
+      "agmentRequestProto\032\037.TerminateFragmentRe" +
+      "sponseProto2]\n\026LlapManagementProtocol\022C\n" +
+      "\022getDelegationToken\022\025.GetTokenRequestPro" +
+      "to\032\026.GetTokenResponseProtoBH\n&org.apache" +
+      ".hadoop.hive.llap.daemon.rpcB\030LlapDaemon" +
+      "ProtocolProtos\210\001\001\240\001\001"
     };
     com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
       new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {
@@ -12661,6 +13682,18 @@ public final class LlapDaemonProtocolProtos {
             com.google.protobuf.GeneratedMessage.FieldAccessorTable(
               internal_static_TerminateFragmentResponseProto_descriptor,
               new java.lang.String[] { });
+          internal_static_GetTokenRequestProto_descriptor =
+            getDescriptor().getMessageTypes().get(14);
+          internal_static_GetTokenRequestProto_fieldAccessorTable = new
+            com.google.protobuf.GeneratedMessage.FieldAccessorTable(
+              internal_static_GetTokenRequestProto_descriptor,
+              new java.lang.String[] { });
+          internal_static_GetTokenResponseProto_descriptor =
+            getDescriptor().getMessageTypes().get(15);
+          internal_static_GetTokenResponseProto_fieldAccessorTable = new
+            com.google.protobuf.GeneratedMessage.FieldAccessorTable(
+              internal_static_GetTokenResponseProto_descriptor,
+              new java.lang.String[] { "Token", });
           return null;
         }
       };

http://git-wip-us.apache.org/repos/asf/hive/blob/915587b8/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/LlapDaemonProtocolBlockingPB.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/LlapDaemonProtocolBlockingPB.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/LlapDaemonProtocolBlockingPB.java
index 5ad2344..4c09941 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/LlapDaemonProtocolBlockingPB.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/LlapDaemonProtocolBlockingPB.java
@@ -15,8 +15,14 @@
 package org.apache.hadoop.hive.llap.daemon;
 
 import org.apache.hadoop.ipc.ProtocolInfo;
+import org.apache.hadoop.security.KerberosInfo;
+import org.apache.hadoop.security.token.TokenInfo;
+import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos;
+import org.apache.hadoop.hive.llap.security.LlapTokenSelector;
 
 @ProtocolInfo(protocolName = "org.apache.hadoop.hive.llap.daemon.LlapDaemonProtocolBlockingPB", protocolVersion = 1)
+@KerberosInfo(serverPrincipal = HiveConf.HIVE_LLAP_DAEMON_SERVICE_PRINCIPAL_NAME)
+@TokenInfo(LlapTokenSelector.class)
 public interface LlapDaemonProtocolBlockingPB extends LlapDaemonProtocolProtos.LlapDaemonProtocol.BlockingInterface {
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/915587b8/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/LlapManagementProtocolBlockingPB.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/LlapManagementProtocolBlockingPB.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/LlapManagementProtocolBlockingPB.java
new file mode 100644
index 0000000..4efadac
--- /dev/null
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/LlapManagementProtocolBlockingPB.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.llap.daemon;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos;
+import org.apache.hadoop.ipc.ProtocolInfo;
+import org.apache.hadoop.security.KerberosInfo;
+
+@ProtocolInfo(protocolName = "org.apache.hadoop.hive.llap.daemon.LlapManagementProtocolBlockingPB", protocolVersion = 1)
+@KerberosInfo(serverPrincipal = HiveConf.HIVE_LLAP_DAEMON_SERVICE_PRINCIPAL_NAME)
+public interface LlapManagementProtocolBlockingPB extends LlapDaemonProtocolProtos.LlapManagementProtocol.BlockingInterface {
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hive/blob/915587b8/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java
index dbdf571..7ce8ba0 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java
@@ -38,7 +38,7 @@ import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SourceSta
 import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmitWorkRequestProto;
 import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.TerminateFragmentRequestProto;
 import org.apache.hadoop.hive.llap.daemon.services.impl.LlapWebServices;
-import org.apache.hadoop.hive.llap.io.api.LlapIoProxy;
+import org.apache.hadoop.hive.llap.io.api.LlapProxy;
 import org.apache.hadoop.hive.llap.metrics.LlapDaemonExecutorMetrics;
 import org.apache.hadoop.hive.llap.metrics.LlapMetricsSystem;
 import org.apache.hadoop.hive.llap.metrics.MetricsUtils;
@@ -80,19 +80,22 @@ public class LlapDaemon extends CompositeService implements ContainerRunner, Lla
   private final String[] localDirs;
 
   // TODO Not the best way to share the address
-  private final AtomicReference<InetSocketAddress> address = new AtomicReference<>();
+  private final AtomicReference<InetSocketAddress> srvAddress = new AtomicReference<>(),
+      mngAddress = new AtomicReference<>();
   private final AtomicReference<Integer> shufflePort = new AtomicReference<>();
 
   public LlapDaemon(Configuration daemonConf, int numExecutors, long executorMemoryBytes,
-      boolean ioEnabled, boolean isDirectCache, long ioMemoryBytes, String[] localDirs, int rpcPort,
-      int shufflePort) {
+      boolean ioEnabled, boolean isDirectCache, long ioMemoryBytes, String[] localDirs, int srvPort,
+      int mngPort, int shufflePort) {
     super("LlapDaemon");
 
     printAsciiArt();
 
     Preconditions.checkArgument(numExecutors > 0);
-    Preconditions.checkArgument(rpcPort == 0 || (rpcPort > 1024 && rpcPort < 65536),
-        "RPC Port must be between 1025 and 65535, or 0 automatic selection");
+    Preconditions.checkArgument(srvPort == 0 || (srvPort > 1024 && srvPort < 65536),
+        "Server RPC Port must be between 1025 and 65535, or 0 automatic selection");
+    Preconditions.checkArgument(mngPort == 0 || (mngPort > 1024 && mngPort < 65536),
+        "Management RPC Port must be between 1025 and 65535, or 0 automatic selection");
     Preconditions.checkArgument(localDirs != null && localDirs.length > 0,
         "Work dirs must be specified");
     Preconditions.checkArgument(shufflePort == 0 || (shufflePort > 1024 && shufflePort < 65536),
@@ -111,7 +114,8 @@ public class LlapDaemon extends CompositeService implements ContainerRunner, Lla
         daemonConf, ConfVars.LLAP_DAEMON_TASK_SCHEDULER_ENABLE_PREEMPTION);
     LOG.info("Attempting to start LlapDaemonConf with the following configuration: " +
         "numExecutors=" + numExecutors +
-        ", rpcListenerPort=" + rpcPort +
+        ", rpcListenerPort=" + srvPort +
+        ", mngListenerPort=" + mngPort +
         ", workDirs=" + Arrays.toString(localDirs) +
         ", shufflePort=" + shufflePort +
         ", executorMemory=" + executorMemoryBytes +
@@ -154,11 +158,10 @@ public class LlapDaemon extends CompositeService implements ContainerRunner, Lla
         " sessionId: " + sessionId);
 
 
-    this.amReporter = new AMReporter(address, new QueryFailedHandlerProxy(), daemonConf);
-
-
-    this.server = new LlapDaemonProtocolServerImpl(numHandlers, this, address, rpcPort);
+    this.amReporter = new AMReporter(srvAddress, new QueryFailedHandlerProxy(), daemonConf);
 
+    this.server = new LlapDaemonProtocolServerImpl(
+        numHandlers, this, srvAddress, mngAddress, srvPort, mngPort);
 
     this.containerRunner = new ContainerRunnerImpl(daemonConf,
         numExecutors,
@@ -166,7 +169,7 @@ public class LlapDaemon extends CompositeService implements ContainerRunner, Lla
         enablePreemption,
         localDirs,
         this.shufflePort,
-        address,
+        srvAddress,
         executorMemoryBytes,
         metrics,
         amReporter);
@@ -227,8 +230,8 @@ public class LlapDaemon extends CompositeService implements ContainerRunner, Lla
   @Override
   public void serviceInit(Configuration conf) throws Exception {
     super.serviceInit(conf);
-    LlapIoProxy.setDaemon(true);
-    LlapIoProxy.initializeLlapIo(conf);
+    LlapProxy.setDaemon(true);
+    LlapProxy.initializeLlapIo(conf);
   }
 
   @Override
@@ -262,7 +265,7 @@ public class LlapDaemon extends CompositeService implements ContainerRunner, Lla
       LlapMetricsSystem.shutdown();
     }
 
-    LlapIoProxy.close();
+    LlapProxy.close();
   }
 
   public static void main(String[] args) throws Exception {
@@ -276,6 +279,7 @@ public class LlapDaemon extends CompositeService implements ContainerRunner, Lla
 
       String[] localDirs = daemonConf.getTrimmedStrings(ConfVars.LLAP_DAEMON_WORK_DIRS.varname);
       int rpcPort = HiveConf.getIntVar(daemonConf, ConfVars.LLAP_DAEMON_RPC_PORT);
+      int mngPort = HiveConf.getIntVar(daemonConf, ConfVars.LLAP_MANAGEMENT_RPC_PORT);
       int shufflePort = daemonConf
           .getInt(ShuffleHandler.SHUFFLE_PORT_CONFIG_KEY, ShuffleHandler.DEFAULT_SHUFFLE_PORT);
       long executorMemoryBytes = HiveConf.getIntVar(
@@ -287,8 +291,7 @@ public class LlapDaemon extends CompositeService implements ContainerRunner, Lla
       boolean llapIoEnabled = HiveConf.getBoolVar(daemonConf, HiveConf.ConfVars.LLAP_IO_ENABLED);
       llapDaemon =
           new LlapDaemon(daemonConf, numExecutors, executorMemoryBytes, llapIoEnabled, isDirectCache,
-              cacheMemoryBytes, localDirs,
-              rpcPort, shufflePort);
+              cacheMemoryBytes, localDirs, rpcPort, mngPort, shufflePort);
 
       LOG.info("Adding shutdown hook for LlapDaemon");
       ShutdownHookManager.addShutdownHook(new CompositeServiceShutdownHook(llapDaemon), 1);

http://git-wip-us.apache.org/repos/asf/hive/blob/915587b8/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemonProtocolClientImpl.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemonProtocolClientImpl.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemonProtocolClientImpl.java
index 4b13277..9c7d2e2 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemonProtocolClientImpl.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemonProtocolClientImpl.java
@@ -115,7 +115,6 @@ public class LlapDaemonProtocolClientImpl implements LlapDaemonProtocolBlockingP
   }
 
   public LlapDaemonProtocolBlockingPB createProxy() throws IOException {
-    // TODO Fix security
     RPC.setProtocolEngine(conf, LlapDaemonProtocolBlockingPB.class, ProtobufRpcEngine.class);
     ProtocolProxy<LlapDaemonProtocolBlockingPB> proxy =
         RPC.getProtocolProxy(LlapDaemonProtocolBlockingPB.class, 0, serverAddr,