You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by se...@apache.org on 2016/05/31 20:24:08 UTC

[2/2] hive git commit: HIVE-13444 : LLAP: add HMAC signatures to LLAP; verify them on LLAP side (Sergey Shelukhin, reviewed by Siddharth Seth)

HIVE-13444 : LLAP: add HMAC signatures to LLAP; verify them on LLAP side (Sergey Shelukhin, reviewed by Siddharth Seth)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/9fe3dab7
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/9fe3dab7
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/9fe3dab7

Branch: refs/heads/branch-2.1
Commit: 9fe3dab7fe82d78435d9cd01f44f7a8e748f3420
Parents: 44a8f0a
Author: Sergey Shelukhin <se...@apache.org>
Authored: Tue May 31 13:09:45 2016 -0700
Committer: Sergey Shelukhin <se...@apache.org>
Committed: Tue May 31 13:24:05 2016 -0700

----------------------------------------------------------------------
 .../org/apache/hadoop/hive/conf/HiveConf.java   |  11 +-
 .../hive/llap/security/LlapTokenClient.java     | 148 ++++++++++++++
 .../llap/security/LlapTokenClientFactory.java   | 160 ---------------
 .../llap/security/LlapTokenLocalClient.java     |  12 +-
 .../hadoop/hive/llap/security/LlapSigner.java   |  41 ++++
 .../hive/llap/security/LlapTokenIdentifier.java |  14 +-
 .../hive/llap/security/SecretManager.java       |  96 ++++++---
 .../llap/security/SigningSecretManager.java     |  26 +++
 .../llap/daemon/impl/ContainerRunnerImpl.java   |  57 +++++-
 .../hive/llap/daemon/impl/LlapDaemon.java       |   2 +-
 .../daemon/impl/LlapProtocolServerImpl.java     |  59 ++++--
 .../hive/llap/daemon/impl/LlapTokenChecker.java |  30 ++-
 .../hive/llap/daemon/impl/QueryTracker.java     |  14 +-
 .../llap/daemon/impl/TaskExecutorService.java   |   6 +-
 .../llap/daemon/impl/TaskRunnerCallable.java    |  19 +-
 .../hive/llap/security/LlapSignerImpl.java      |  60 ++++++
 .../daemon/impl/TaskExecutorTestHelpers.java    |   3 +-
 .../llap/daemon/impl/TestLlapTokenChecker.java  |   8 +-
 .../TestFirstInFirstOutComparator.java          |  31 ---
 .../hive/llap/security/TestLlapSignerImpl.java  | 200 +++++++++++++++++++
 .../hive/ql/exec/tez/TezSessionState.java       |  26 +--
 21 files changed, 720 insertions(+), 303 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/9fe3dab7/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index 6a404bd..cad5d65 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -2716,6 +2716,12 @@ public class HiveConf extends Configuration {
     LLAP_MANAGEMENT_ACL("hive.llap.management.acl", "*", "The ACL for LLAP daemon management."),
     LLAP_MANAGEMENT_ACL_DENY("hive.llap.management.acl.blocked", "",
         "The deny ACL for LLAP daemon management."),
+    LLAP_REMOTE_TOKEN_REQUIRES_SIGNING("hive.llap.remote.token.requires.signing", "true",
+        new StringSet("false", "except_llap_owner", "true"),
+        "Whether the token returned from LLAP management API should require fragment signing.\n" +
+        "True by default; can be disabled to allow CLI to get tokens from LLAP in a secure\n" +
+        "cluster by setting it to true or 'except_llap_owner' (the latter returns such tokens\n" +
+        "to everyone except the user LLAP cluster is authenticating under)."),
 
     // Hadoop DelegationTokenManager default is 1 week.
     LLAP_DELEGATION_TOKEN_LIFETIME("hive.llap.daemon.delegation.token.lifetime", "14d",
@@ -2725,11 +2731,6 @@ public class HiveConf extends Configuration {
         "RPC port for LLAP daemon management service."),
     LLAP_WEB_AUTO_AUTH("hive.llap.auto.auth", false,
         "Whether or not to set Hadoop configs to enable auth in LLAP web app."),
-    LLAP_CREATE_TOKEN_LOCALLY("hive.llap.create.token.locally", "hs2",
-        new StringSet("true", "hs2", "false"),
-        "Whether to create LLAP tokens locally, saving directly to ZooKeeper SecretManager.\n" +
-        "Requires one to have access to ZK paths; in other words, this should only be used in\n" +
-        "HiveServer2. By default, the value is 'hs2', which means exactly that."),
 
     LLAP_DAEMON_RPC_NUM_HANDLERS("hive.llap.daemon.rpc.num.handlers", 5,
       "Number of RPC handlers for LLAP daemon.", "llap.daemon.rpc.num.handlers"),

http://git-wip-us.apache.org/repos/asf/hive/blob/9fe3dab7/llap-client/src/java/org/apache/hadoop/hive/llap/security/LlapTokenClient.java
----------------------------------------------------------------------
diff --git a/llap-client/src/java/org/apache/hadoop/hive/llap/security/LlapTokenClient.java b/llap-client/src/java/org/apache/hadoop/hive/llap/security/LlapTokenClient.java
new file mode 100644
index 0000000..921e050
--- /dev/null
+++ b/llap-client/src/java/org/apache/hadoop/hive/llap/security/LlapTokenClient.java
@@ -0,0 +1,148 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.llap.security;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import javax.net.SocketFactory;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.llap.impl.LlapManagementProtocolClientImpl;
+import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.GetTokenRequestProto;
+import org.apache.hadoop.hive.llap.registry.impl.LlapRegistryService;
+import org.apache.hadoop.hive.llap.registry.ServiceInstance;
+import org.apache.hadoop.hive.llap.registry.ServiceInstanceSet;
+import org.apache.hadoop.hive.llap.security.LlapTokenIdentifier;
+import org.apache.hadoop.io.DataInputByteBuffer;
+import org.apache.hadoop.io.retry.RetryPolicies;
+import org.apache.hadoop.io.retry.RetryPolicy;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.protobuf.ByteString;
+import com.google.protobuf.ServiceException;
+
+public class LlapTokenClient {
+  private static final Logger LOG = LoggerFactory.getLogger(LlapTokenClient.class);
+
+  private final LlapRegistryService registry;
+  private final SocketFactory socketFactory;
+  private final RetryPolicy retryPolicy;
+  private final Configuration conf;
+  private ServiceInstanceSet activeInstances;
+  private Collection<ServiceInstance> lastKnownInstances;
+  private LlapManagementProtocolClientImpl client;
+  private ServiceInstance clientInstance;
+
+  public LlapTokenClient(Configuration conf) {
+    this.conf = conf;
+    registry = new LlapRegistryService(false);
+    registry.init(conf);
+    socketFactory = NetUtils.getDefaultSocketFactory(conf);
+    retryPolicy = RetryPolicies.retryUpToMaximumTimeWithFixedSleep(
+        16000, 2000l, TimeUnit.MILLISECONDS);
+  }
+
+  public Token<LlapTokenIdentifier> getDelegationToken(String appId) throws IOException {
+    if (!UserGroupInformation.isSecurityEnabled()) return null;
+    Iterator<ServiceInstance> llaps = null;
+    if (clientInstance == null) {
+      assert client == null;
+      llaps = getLlapServices(false).iterator();
+      clientInstance = llaps.next();
+    }
+
+    ByteString tokenBytes = null;
+    boolean hasRefreshed = false;
+    while (true) {
+      try {
+        tokenBytes = getTokenBytes(appId);
+        break;
+      } catch (IOException | ServiceException ex) {
+        LOG.error("Cannot get a token, trying a different instance", ex);
+        client = null;
+        clientInstance = null;
+      }
+      if (llaps == null || !llaps.hasNext()) {
+        if (hasRefreshed) { // Only refresh once.
+          throw new RuntimeException("Cannot find any LLAPs to get the token from");
+        }
+        llaps = getLlapServices(true).iterator();
+        hasRefreshed = true;
+      }
+      clientInstance = llaps.next();
+    }
+
+    Token<LlapTokenIdentifier> token = extractToken(tokenBytes);
+    if (LOG.isInfoEnabled()) {
+      LOG.info("Obtained a LLAP delegation token from " + clientInstance + ": " + token);
+    }
+    return token;
+  }
+
+  private Token<LlapTokenIdentifier> extractToken(ByteString tokenBytes) throws IOException {
+    Token<LlapTokenIdentifier> token = new Token<>();
+    DataInputByteBuffer in = new DataInputByteBuffer();
+    in.reset(tokenBytes.asReadOnlyByteBuffer());
+    token.readFields(in);
+    return token;
+  }
+
+  private ByteString getTokenBytes(final String appId) throws IOException, ServiceException {
+    assert clientInstance != null;
+    if (client == null) {
+      client = new LlapManagementProtocolClientImpl(conf, clientInstance.getHost(),
+          clientInstance.getManagementPort(), retryPolicy, socketFactory);
+    }
+    GetTokenRequestProto.Builder req = GetTokenRequestProto.newBuilder();
+    if (!StringUtils.isBlank(appId)) {
+      req.setAppId(appId);
+    }
+    return client.getDelegationToken(null, req.build()).getToken();
+  }
+
+  /** Synchronized - LLAP registry and instance set are not thread safe. */
+  private synchronized List<ServiceInstance> getLlapServices(
+      boolean doForceRefresh) throws IOException {
+    if (!doForceRefresh && lastKnownInstances != null) {
+      return new ArrayList<>(lastKnownInstances);
+    }
+    if (activeInstances == null) {
+      registry.start();
+      activeInstances = registry.getInstances();
+    }
+    Map<String, ServiceInstance> daemons = activeInstances.getAll();
+    if (daemons == null || daemons.isEmpty()) {
+      throw new RuntimeException("No LLAPs found");
+    }
+    lastKnownInstances = daemons.values();
+    return new ArrayList<ServiceInstance>(lastKnownInstances);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/9fe3dab7/llap-client/src/java/org/apache/hadoop/hive/llap/security/LlapTokenClientFactory.java
----------------------------------------------------------------------
diff --git a/llap-client/src/java/org/apache/hadoop/hive/llap/security/LlapTokenClientFactory.java b/llap-client/src/java/org/apache/hadoop/hive/llap/security/LlapTokenClientFactory.java
deleted file mode 100644
index ebc91b1..0000000
--- a/llap-client/src/java/org/apache/hadoop/hive/llap/security/LlapTokenClientFactory.java
+++ /dev/null
@@ -1,160 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hive.llap.security;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.TimeUnit;
-
-import javax.net.SocketFactory;
-
-import org.apache.commons.lang3.StringUtils;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hive.llap.impl.LlapManagementProtocolClientImpl;
-import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.GetTokenRequestProto;
-import org.apache.hadoop.hive.llap.registry.impl.LlapRegistryService;
-import org.apache.hadoop.hive.llap.registry.ServiceInstance;
-import org.apache.hadoop.hive.llap.registry.ServiceInstanceSet;
-import org.apache.hadoop.hive.llap.security.LlapTokenIdentifier;
-import org.apache.hadoop.io.DataInputByteBuffer;
-import org.apache.hadoop.io.retry.RetryPolicies;
-import org.apache.hadoop.io.retry.RetryPolicy;
-import org.apache.hadoop.net.NetUtils;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.security.token.Token;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.protobuf.ByteString;
-import com.google.protobuf.ServiceException;
-
-public class LlapTokenClientFactory {
-  private static final Logger LOG = LoggerFactory.getLogger(LlapTokenClientFactory.class);
-
-  private final LlapRegistryService registry;
-  private final SocketFactory socketFactory;
-  private final RetryPolicy retryPolicy;
-  private final Configuration conf;
-  private ServiceInstanceSet activeInstances;
-  private Collection<ServiceInstance> lastKnownInstances;
-
-  public LlapTokenClientFactory(Configuration conf) {
-    this.conf = conf;
-    registry = new LlapRegistryService(false);
-    registry.init(conf);
-    socketFactory = NetUtils.getDefaultSocketFactory(conf);
-    retryPolicy = RetryPolicies.retryUpToMaximumTimeWithFixedSleep(
-        16000, 2000l, TimeUnit.MILLISECONDS);
-  }
-
-  public interface Client {
-    Token<LlapTokenIdentifier> getDelegationToken(String appId) throws IOException;
-  }
-
-  public Client createClient() {
-    return new ClientImpl(); // Client is separate from factory mostly for thread-safety reasons.
-  }
-
-  private class ClientImpl implements Client {
-    private LlapManagementProtocolClientImpl client;
-    private ServiceInstance clientInstance;
-
-    @Override
-    public Token<LlapTokenIdentifier> getDelegationToken(String appId) throws IOException {
-      if (!UserGroupInformation.isSecurityEnabled()) return null;
-      Iterator<ServiceInstance> llaps = null;
-      if (clientInstance == null) {
-        assert client == null;
-        llaps = getLlapServices(false).iterator();
-        clientInstance = llaps.next();
-      }
-
-      ByteString tokenBytes = null;
-      boolean hasRefreshed = false;
-      while (true) {
-        try {
-          tokenBytes = getTokenBytes(appId);
-          break;
-        } catch (IOException | ServiceException ex) {
-          LOG.error("Cannot get a token, trying a different instance", ex);
-          client = null;
-          clientInstance = null;
-        }
-        if (llaps == null || !llaps.hasNext()) {
-          if (hasRefreshed) { // Only refresh once.
-            throw new RuntimeException("Cannot find any LLAPs to get the token from");
-          }
-          llaps = getLlapServices(true).iterator();
-          hasRefreshed = true;
-        }
-        clientInstance = llaps.next();
-      }
-
-      Token<LlapTokenIdentifier> token = extractToken(tokenBytes);
-      if (LOG.isInfoEnabled()) {
-        LOG.info("Obtained a LLAP delegation token from " + clientInstance + ": " + token);
-      }
-      return token;
-    }
-
-    private Token<LlapTokenIdentifier> extractToken(ByteString tokenBytes) throws IOException {
-      Token<LlapTokenIdentifier> token = new Token<>();
-      DataInputByteBuffer in = new DataInputByteBuffer();
-      in.reset(tokenBytes.asReadOnlyByteBuffer());
-      token.readFields(in);
-      return token;
-    }
-
-    private ByteString getTokenBytes(final String appId) throws IOException, ServiceException {
-      assert clientInstance != null;
-      if (client == null) {
-        client = new LlapManagementProtocolClientImpl(conf, clientInstance.getHost(),
-            clientInstance.getManagementPort(), retryPolicy, socketFactory);
-      }
-      GetTokenRequestProto.Builder req = GetTokenRequestProto.newBuilder();
-      if (!StringUtils.isBlank(appId)) {
-        req.setAppId(appId);
-      }
-      return client.getDelegationToken(null, req.build()).getToken();
-    }
-  }
-
-  /** Synchronized - LLAP registry and instance set are not thread safe. */
-  private synchronized List<ServiceInstance> getLlapServices(
-      boolean doForceRefresh) throws IOException {
-    if (!doForceRefresh && lastKnownInstances != null) {
-      return new ArrayList<>(lastKnownInstances);
-    }
-    if (activeInstances == null) {
-      registry.start();
-      activeInstances = registry.getInstances();
-    }
-    Map<String, ServiceInstance> daemons = activeInstances.getAll();
-    if (daemons == null || daemons.isEmpty()) {
-      throw new RuntimeException("No LLAPs found");
-    }
-    lastKnownInstances = daemons.values();
-    return new ArrayList<ServiceInstance>(lastKnownInstances);
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/hive/blob/9fe3dab7/llap-client/src/java/org/apache/hadoop/hive/llap/security/LlapTokenLocalClient.java
----------------------------------------------------------------------
diff --git a/llap-client/src/java/org/apache/hadoop/hive/llap/security/LlapTokenLocalClient.java b/llap-client/src/java/org/apache/hadoop/hive/llap/security/LlapTokenLocalClient.java
index f10351b..af889b6 100644
--- a/llap-client/src/java/org/apache/hadoop/hive/llap/security/LlapTokenLocalClient.java
+++ b/llap-client/src/java/org/apache/hadoop/hive/llap/security/LlapTokenLocalClient.java
@@ -22,7 +22,6 @@ import java.io.IOException;
 
 
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
 import org.apache.hadoop.security.token.Token;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -32,20 +31,21 @@ public class LlapTokenLocalClient {
   private final SecretManager secretManager;
 
   public LlapTokenLocalClient(Configuration conf, String clusterId) {
+    // TODO: create this centrally in HS2 case
     secretManager = SecretManager.createSecretManager(conf, clusterId);
   }
 
-  public Token<LlapTokenIdentifier> createToken(String appId, String user) throws IOException {
+  public Token<LlapTokenIdentifier> createToken(
+      String appId, String user, boolean isSignatureRequired) throws IOException {
     try {
-      Token<LlapTokenIdentifier> token = secretManager.createLlapToken(appId, user);
+      Token<LlapTokenIdentifier> token = secretManager.createLlapToken(
+          appId, user, isSignatureRequired);
       if (LOG.isInfoEnabled()) {
         LOG.info("Created a LLAP delegation token locally: " + token);
       }
       return token;
     } catch (Exception ex) {
-      throw new IOException("Failed to create LLAP token locally. You might need to set "
-          + ConfVars.LLAP_CREATE_TOKEN_LOCALLY.varname
-          + " to false, or make sure you can access secure ZK paths.", ex);
+      throw (ex instanceof IOException) ? (IOException)ex : new IOException(ex);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/hive/blob/9fe3dab7/llap-common/src/java/org/apache/hadoop/hive/llap/security/LlapSigner.java
----------------------------------------------------------------------
diff --git a/llap-common/src/java/org/apache/hadoop/hive/llap/security/LlapSigner.java b/llap-common/src/java/org/apache/hadoop/hive/llap/security/LlapSigner.java
new file mode 100644
index 0000000..478a40a
--- /dev/null
+++ b/llap-common/src/java/org/apache/hadoop/hive/llap/security/LlapSigner.java
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.llap.security;
+
+import java.io.IOException;
+
+public interface LlapSigner {
+  /** An object signable by a signer. */
+  public interface Signable {
+    /** Called by the signer to record key information as part of the message to be signed. */
+    void setSignInfo(int masterKeyId, String user);
+    /** Called by the signer to get the serialized representation of the message to be signed. */
+    byte[] serialize() throws IOException;
+  }
+
+  /** Message with the signature. */
+  public static final class SignedMessage {
+    public byte[] message, signature;
+  }
+
+  /** Serializes and signs the message. */
+  SignedMessage serializeAndSign(Signable message) throws IOException;
+
+  void checkSignature(byte[] message, byte[] signature, int keyId);
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hive/blob/9fe3dab7/llap-common/src/java/org/apache/hadoop/hive/llap/security/LlapTokenIdentifier.java
----------------------------------------------------------------------
diff --git a/llap-common/src/java/org/apache/hadoop/hive/llap/security/LlapTokenIdentifier.java b/llap-common/src/java/org/apache/hadoop/hive/llap/security/LlapTokenIdentifier.java
index 7c47f0b..08c141f 100644
--- a/llap-common/src/java/org/apache/hadoop/hive/llap/security/LlapTokenIdentifier.java
+++ b/llap-common/src/java/org/apache/hadoop/hive/llap/security/LlapTokenIdentifier.java
@@ -30,23 +30,24 @@ import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdenti
 
 import com.google.common.base.Preconditions;
 
-/** For now, a LLAP token gives access to any LLAP server. */
 public class LlapTokenIdentifier extends AbstractDelegationTokenIdentifier {
   private static final String KIND = "LLAP_TOKEN";
   public static final Text KIND_NAME = new Text(KIND);
   private String clusterId;
   private String appId;
+  private boolean isSigningRequired;
 
   public LlapTokenIdentifier() {
     super();
   }
 
   public LlapTokenIdentifier(Text owner, Text renewer, Text realUser,
-      String clusterId, String appId) {
+      String clusterId, String appId, boolean isSigningRequired) {
     super(owner, renewer, realUser);
     Preconditions.checkNotNull(clusterId);
     this.clusterId = clusterId;
     this.appId = appId == null ? "" : appId;
+    this.isSigningRequired = isSigningRequired;
   }
 
   @Override
@@ -54,6 +55,7 @@ public class LlapTokenIdentifier extends AbstractDelegationTokenIdentifier {
     super.write(out);
     out.writeUTF(clusterId);
     out.writeUTF(appId);
+    out.writeBoolean(isSigningRequired);
   }
 
   @Override
@@ -62,6 +64,7 @@ public class LlapTokenIdentifier extends AbstractDelegationTokenIdentifier {
     clusterId = in.readUTF();
     Preconditions.checkNotNull(clusterId);
     appId = in.readUTF();
+    isSigningRequired = in.readBoolean();
     appId = appId == null ? "" : appId;
   }
 
@@ -78,10 +81,15 @@ public class LlapTokenIdentifier extends AbstractDelegationTokenIdentifier {
     return clusterId;
   }
 
+  public boolean isSigningRequired() {
+    return isSigningRequired;
+  }
+
   @Override
   public int hashCode() {
     final int prime = 31;
     int result = prime * super.hashCode() + (StringUtils.isBlank(appId) ? 0 : appId.hashCode());
+    result = prime * result + (isSigningRequired ? 1231 : 1237);
     return prime * result + ((clusterId == null) ? 0 : clusterId.hashCode());
   }
 
@@ -90,7 +98,7 @@ public class LlapTokenIdentifier extends AbstractDelegationTokenIdentifier {
     if (this == obj) return true;
     if (!(obj instanceof LlapTokenIdentifier) || !super.equals(obj)) return false;
     LlapTokenIdentifier other = (LlapTokenIdentifier) obj;
-    return (StringUtils.isBlank(appId)
+    return isSigningRequired == other.isSigningRequired && (StringUtils.isBlank(appId)
         ? StringUtils.isBlank(other.appId) : appId.equals(other.appId))
         && (clusterId == null ? other.clusterId == null : clusterId.equals(other.clusterId));
   }

http://git-wip-us.apache.org/repos/asf/hive/blob/9fe3dab7/llap-common/src/java/org/apache/hadoop/hive/llap/security/SecretManager.java
----------------------------------------------------------------------
diff --git a/llap-common/src/java/org/apache/hadoop/hive/llap/security/SecretManager.java b/llap-common/src/java/org/apache/hadoop/hive/llap/security/SecretManager.java
index 465b204..5aa4b84 100644
--- a/llap-common/src/java/org/apache/hadoop/hive/llap/security/SecretManager.java
+++ b/llap-common/src/java/org/apache/hadoop/hive/llap/security/SecretManager.java
@@ -1,7 +1,11 @@
-/*
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
  *
  *     http://www.apache.org/licenses/LICENSE-2.0
  *
@@ -27,12 +31,14 @@ import org.apache.hadoop.hive.llap.security.LlapTokenIdentifier;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.delegation.DelegationKey;
 import org.apache.hadoop.security.token.delegation.ZKDelegationTokenSecretManager;
 import org.apache.hadoop.security.token.delegation.web.DelegationTokenManager;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class SecretManager extends ZKDelegationTokenSecretManager<LlapTokenIdentifier> {
+public class SecretManager extends ZKDelegationTokenSecretManager<LlapTokenIdentifier>
+  implements SigningSecretManager {
   private static final Logger LOG = LoggerFactory.getLogger(SecretManager.class);
   private final String clusterId;
 
@@ -86,43 +92,76 @@ public class SecretManager extends ZKDelegationTokenSecretManager<LlapTokenIdent
     return id;
   }
 
-  public static SecretManager createSecretManager(final Configuration conf, String clusterId) {
-    String llapPrincipal = HiveConf.getVar(conf, ConfVars.LLAP_KERBEROS_PRINCIPAL),
-        llapKeytab = HiveConf.getVar(conf, ConfVars.LLAP_KERBEROS_KEYTAB_FILE);
-    return SecretManager.createSecretManager(conf, llapPrincipal, llapKeytab, clusterId);
+  @Override
+  public synchronized DelegationKey getCurrentKey() {
+    return allKeys.get(getCurrentKeyId());
   }
 
+  @Override
+  public byte[] signWithKey(byte[] message, DelegationKey key) {
+    return createPassword(message, key.getKey());
+  }
 
-  public static SecretManager createSecretManager(final Configuration conf,
-      String llapPrincipal, String llapKeytab, final String clusterId) {
-    // Create ZK connection under a separate ugi (if specified) - ZK works in mysterious ways.
-    UserGroupInformation zkUgi = null;
-    String principal = HiveConf.getVar(conf, ConfVars.LLAP_ZKSM_KERBEROS_PRINCIPAL, llapPrincipal);
-    String keyTab = HiveConf.getVar(conf, ConfVars.LLAP_ZKSM_KERBEROS_KEYTAB_FILE, llapKeytab);
-    try {
-      zkUgi = LlapUtil.loginWithKerberos(principal, keyTab);
-    } catch (IOException e) {
-      throw new RuntimeException(e);
+  @Override
+  public byte[] signWithKey(byte[] message, int keyId) throws SecurityException {
+    DelegationKey key = getDelegationKey(keyId);
+    if (key == null) {
+      throw new SecurityException("The key ID " + keyId + " was not found");
     }
-    // Override the default delegation token lifetime for LLAP.
-    // Also set all the necessary ZK settings to defaults and LLAP configs, if not set.
-    final Configuration zkConf = new Configuration(conf);
+    return createPassword(message, key.getKey());
+  }
+
+  static final class LlapZkConf {
+    public Configuration zkConf;
+    public UserGroupInformation zkUgi;
+    public LlapZkConf(Configuration zkConf, UserGroupInformation zkUgi) {
+      this.zkConf = zkConf;
+      this.zkUgi = zkUgi;
+    }
+  }
+
+  private static LlapZkConf createLlapZkConf(
+      Configuration conf, String llapPrincipal, String llapKeytab, String clusterId) {
+     String principal = HiveConf.getVar(conf, ConfVars.LLAP_ZKSM_KERBEROS_PRINCIPAL, llapPrincipal);
+     String keyTab = HiveConf.getVar(conf, ConfVars.LLAP_ZKSM_KERBEROS_KEYTAB_FILE, llapKeytab);
+     // Override the default delegation token lifetime for LLAP.
+     // Also set all the necessary ZK settings to defaults and LLAP configs, if not set.
+     final Configuration zkConf = new Configuration(conf);
     long tokenLifetime = HiveConf.getTimeVar(
         conf, ConfVars.LLAP_DELEGATION_TOKEN_LIFETIME, TimeUnit.SECONDS);
     zkConf.setLong(DelegationTokenManager.MAX_LIFETIME, tokenLifetime);
     zkConf.setLong(DelegationTokenManager.RENEW_INTERVAL, tokenLifetime);
     zkConf.set(SecretManager.ZK_DTSM_ZK_KERBEROS_PRINCIPAL, principal);
     zkConf.set(SecretManager.ZK_DTSM_ZK_KERBEROS_KEYTAB, keyTab);
-    String zkPath = clusterId;
+    String zkPath = "zkdtsm_" + clusterId;
     LOG.info("Using {} as ZK secret manager path", zkPath);
-    zkConf.set(SecretManager.ZK_DTSM_ZNODE_WORKING_PATH, "zkdtsm_" + zkPath);
+    zkConf.set(SecretManager.ZK_DTSM_ZNODE_WORKING_PATH, zkPath);
     setZkConfIfNotSet(zkConf, SecretManager.ZK_DTSM_ZK_AUTH_TYPE, "sasl");
     setZkConfIfNotSet(zkConf, SecretManager.ZK_DTSM_ZK_CONNECTION_STRING,
         HiveConf.getVar(zkConf, ConfVars.LLAP_ZKSM_ZK_CONNECTION_STRING));
-    return zkUgi.doAs(new PrivilegedAction<SecretManager>() {
+
+    UserGroupInformation zkUgi = null;
+    try {
+      zkUgi = LlapUtil.loginWithKerberos(principal, keyTab);
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+    return new LlapZkConf(zkConf, zkUgi);
+  }
+
+  public static SecretManager createSecretManager(final Configuration conf, String clusterId) {
+    String llapPrincipal = HiveConf.getVar(conf, ConfVars.LLAP_KERBEROS_PRINCIPAL),
+        llapKeytab = HiveConf.getVar(conf, ConfVars.LLAP_KERBEROS_KEYTAB_FILE);
+    return SecretManager.createSecretManager(conf, llapPrincipal, llapKeytab, clusterId);
+  }
+
+  public static SecretManager createSecretManager(
+      final Configuration conf, String llapPrincipal, String llapKeytab, final String clusterId) {
+    final LlapZkConf c = createLlapZkConf(conf, llapPrincipal, llapKeytab, clusterId);
+    return c.zkUgi.doAs(new PrivilegedAction<SecretManager>() {
       @Override
       public SecretManager run() {
-        SecretManager zkSecretManager = new SecretManager(zkConf, clusterId);
+        SecretManager zkSecretManager = new SecretManager(c.zkConf, clusterId);
         try {
           zkSecretManager.startThreads();
         } catch (IOException e) {
@@ -138,7 +177,8 @@ public class SecretManager extends ZKDelegationTokenSecretManager<LlapTokenIdent
     zkConf.set(name, value);
   }
 
-  public Token<LlapTokenIdentifier> createLlapToken(String appId, String user) throws IOException {
+  public Token<LlapTokenIdentifier> createLlapToken(
+      String appId, String user, boolean isSignatureRequired) throws IOException {
     Text realUser = null, renewer = null;
     if (user == null) {
       UserGroupInformation ugi  = UserGroupInformation.getCurrentUser();
@@ -151,7 +191,7 @@ public class SecretManager extends ZKDelegationTokenSecretManager<LlapTokenIdent
       renewer = new Text(user);
     }
     LlapTokenIdentifier llapId = new LlapTokenIdentifier(
-        new Text(user), renewer, realUser, clusterId, appId);
+        new Text(user), renewer, realUser, clusterId, appId, isSignatureRequired);
     // TODO: note that the token is not renewable right now and will last for 2 weeks by default.
     Token<LlapTokenIdentifier> token = new Token<LlapTokenIdentifier>(llapId, this);
     if (LOG.isInfoEnabled()) {

http://git-wip-us.apache.org/repos/asf/hive/blob/9fe3dab7/llap-common/src/java/org/apache/hadoop/hive/llap/security/SigningSecretManager.java
----------------------------------------------------------------------
diff --git a/llap-common/src/java/org/apache/hadoop/hive/llap/security/SigningSecretManager.java b/llap-common/src/java/org/apache/hadoop/hive/llap/security/SigningSecretManager.java
new file mode 100644
index 0000000..067a98e
--- /dev/null
+++ b/llap-common/src/java/org/apache/hadoop/hive/llap/security/SigningSecretManager.java
@@ -0,0 +1,26 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.llap.security;
+
+import org.apache.hadoop.security.token.delegation.DelegationKey;
+
+public interface SigningSecretManager {
+  DelegationKey getCurrentKey();
+  byte[] signWithKey(byte[] message, DelegationKey key);
+  byte[] signWithKey(byte[] message, int keyId) throws SecurityException;
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hive/blob/9fe3dab7/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java
index 2524dc2..d439c07 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java
@@ -27,11 +27,13 @@ import java.util.concurrent.atomic.AtomicReference;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
+import org.apache.hadoop.hive.llap.DaemonId;
 import org.apache.hadoop.hive.llap.daemon.ContainerRunner;
 import org.apache.hadoop.hive.llap.daemon.FragmentCompletionHandler;
 import org.apache.hadoop.hive.llap.daemon.HistoryLogger;
 import org.apache.hadoop.hive.llap.daemon.KilledTaskHandler;
 import org.apache.hadoop.hive.llap.daemon.QueryFailedHandler;
+import org.apache.hadoop.hive.llap.daemon.impl.LlapTokenChecker.LlapTokenInfo;
 import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.FragmentRuntimeInfo;
 import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.GroupInputSpecProto;
 import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.IOSpecProto;
@@ -46,11 +48,14 @@ import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmitWor
 import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.TerminateFragmentRequestProto;
 import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.TerminateFragmentResponseProto;
 import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.VertexIdentifier;
+import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.VertexOrBinary;
 import org.apache.hadoop.hive.llap.metrics.LlapDaemonExecutorMetrics;
+import org.apache.hadoop.hive.llap.security.LlapSignerImpl;
 import org.apache.hadoop.hive.llap.tez.Converters;
 import org.apache.hadoop.hive.ql.exec.tez.TezProcessor;
 import org.apache.hadoop.io.DataInputBuffer;
 import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.service.CompositeService;
 import org.apache.hadoop.yarn.api.ApplicationConstants;
@@ -68,7 +73,8 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.google.common.base.Preconditions;
-// TODO Convert this to a CompositeService
+import com.google.protobuf.ByteString;
+
 public class ContainerRunnerImpl extends CompositeService implements ContainerRunner, FragmentCompletionHandler, QueryFailedHandler {
 
   // TODO Setup a set of threads to process incoming requests.
@@ -89,12 +95,14 @@ public class ContainerRunnerImpl extends CompositeService implements ContainerRu
   private final TaskRunnerCallable.ConfParams confParams;
   private final KilledTaskHandler killedTaskHandler = new KilledTaskHandlerImpl();
   private final HadoopShim tezHadoopShim;
+  private final LlapSignerImpl signer;
+  private final String clusterId;
 
   public ContainerRunnerImpl(Configuration conf, int numExecutors, int waitQueueSize,
       boolean enablePreemption, String[] localDirsBase, AtomicReference<Integer> localShufflePort,
       AtomicReference<InetSocketAddress> localAddress,
       long totalMemoryAvailableBytes, LlapDaemonExecutorMetrics metrics,
-      AMReporter amReporter, ClassLoader classLoader, String clusterId) {
+      AMReporter amReporter, ClassLoader classLoader, DaemonId daemonId) {
     super("ContainerRunnerImpl");
     this.conf = conf;
     Preconditions.checkState(numExecutors > 0,
@@ -102,7 +110,10 @@ public class ContainerRunnerImpl extends CompositeService implements ContainerRu
     this.localAddress = localAddress;
     this.localShufflePort = localShufflePort;
     this.amReporter = amReporter;
+    this.signer = UserGroupInformation.isSecurityEnabled()
+        ? new LlapSignerImpl(conf, daemonId) : null;
 
+    this.clusterId = daemonId.getClusterString();
     this.queryTracker = new QueryTracker(conf, localDirsBase, clusterId);
     addIfService(queryTracker);
     String waitQueueSchedulerClassName = HiveConf.getVar(
@@ -153,9 +164,22 @@ public class ContainerRunnerImpl extends CompositeService implements ContainerRu
 
   @Override
   public SubmitWorkResponseProto submitWork(SubmitWorkRequestProto request) throws IOException {
-    // TODO: also support binary. Actually, we should figure out the binary stuff here and 
-    //       stop passing the protobuf around. We should pass around some plain objects/values.
-    SignableVertexSpec vertex = request.getWorkSpec().getVertex();
+    VertexOrBinary vob = request.getWorkSpec();
+    SignableVertexSpec vertex = vob.hasVertex() ? vob.getVertex() : null;
+    ByteString vertexBinary = vob.hasVertexBinary() ? vob.getVertexBinary() : null;
+    if (vertex != null && vertexBinary != null) {
+      throw new IOException(
+          "Vertex and vertexBinary in VertexOrBinary cannot be set at the same time");
+    }
+    if (vertexBinary != null) {
+      vertex = SignableVertexSpec.parseFrom(vob.getVertexBinary());
+    }
+
+    LlapTokenInfo tokenInfo = LlapTokenChecker.getTokenInfo(clusterId);
+    if (tokenInfo.isSigningRequired) {
+      checkSignature(vertex, vertexBinary, request, tokenInfo.userName);
+    }
+
     if (LOG.isInfoEnabled()) {
       LOG.info("Queueing container for execution: " + stringifySubmitRequest(request, vertex));
     }
@@ -166,6 +190,7 @@ public class ContainerRunnerImpl extends CompositeService implements ContainerRu
     HistoryLogger.logFragmentStart(vId.getApplicationIdString(), request.getContainerIdString(),
         localAddress.get().getHostName(), vertex.getDagName(), vId.getDagId(),
         vertex.getVertexName(), request.getFragmentNumber(), request.getAttemptNumber());
+
     // This is the start of container-annotated logging.
     // TODO Reduce the length of this string. Way too verbose at the moment.
     NDC.push(fragmentIdString);
@@ -194,7 +219,7 @@ public class ContainerRunnerImpl extends CompositeService implements ContainerRu
       QueryFragmentInfo fragmentInfo = queryTracker.registerFragment(
           queryIdentifier, vId.getApplicationIdString(), vertex.getDagName(), dagIdentifier,
           vertex.getVertexName(), request.getFragmentNumber(), request.getAttemptNumber(),
-          vertex.getUser(), vertex, jobToken, fragmentIdString);
+          vertex.getUser(), vertex, jobToken, fragmentIdString, tokenInfo);
 
       String[] localDirs = fragmentInfo.getLocalDirs();
       Preconditions.checkNotNull(localDirs);
@@ -208,7 +233,7 @@ public class ContainerRunnerImpl extends CompositeService implements ContainerRu
       TaskRunnerCallable callable = new TaskRunnerCallable(request, fragmentInfo, callableConf,
           new LlapExecutionContext(localAddress.get().getHostName(), queryTracker), env,
           credentials, memoryPerExecutor, amReporter, confParams, metrics, killedTaskHandler,
-          this, tezHadoopShim, attemptId);
+          this, tezHadoopShim, attemptId, vertex);
       submissionState = executorService.schedule(callable);
 
       if (LOG.isInfoEnabled()) {
@@ -233,6 +258,24 @@ public class ContainerRunnerImpl extends CompositeService implements ContainerRu
     return responseBuilder.build();
   }
 
+  private void checkSignature(SignableVertexSpec vertex, ByteString vertexBinary,
+      SubmitWorkRequestProto request, String tokenUserName) throws SecurityException, IOException {
+    if (!request.hasWorkSpecSignature()) {
+      throw new SecurityException("Unsigned fragment not allowed");
+    }
+    if (vertexBinary == null) {
+      ByteString.Output os = ByteString.newOutput();
+      vertex.writeTo(os);
+      vertexBinary = os.toByteString();
+    }
+    signer.checkSignature(vertexBinary.toByteArray(),
+        request.getWorkSpecSignature().toByteArray(), (int)vertex.getSignatureKeyId());
+    if (!vertex.hasUser() || !vertex.getUser().equals(tokenUserName)) {
+      throw new SecurityException("LLAP token is for " + tokenUserName
+          + " but the fragment is for " + (vertex.hasUser() ? vertex.getUser() : null));
+    }
+  }
+
   private static class LlapExecutionContext extends ExecutionContextImpl
       implements TezProcessor.Hook {
     private final QueryTracker queryTracker;

http://git-wip-us.apache.org/repos/asf/hive/blob/9fe3dab7/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java
index 5ab7b3c..2faedcd 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java
@@ -248,7 +248,7 @@ public class LlapDaemon extends CompositeService implements ContainerRunner, Lla
 
     this.containerRunner = new ContainerRunnerImpl(daemonConf, numExecutors, waitQueueSize,
         enablePreemption, localDirs, this.shufflePort, srvAddress, executorMemoryBytes, metrics,
-        amReporter, executorClassLoader, daemonId.getClusterString());
+        amReporter, executorClassLoader, daemonId);
     addIfService(containerRunner);
 
     // Not adding the registry as a service, since we need to control when it is initialized - conf used to pickup properties.

http://git-wip-us.apache.org/repos/asf/hive/blob/9fe3dab7/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapProtocolServerImpl.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapProtocolServerImpl.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapProtocolServerImpl.java
index b94fc2e..7ccd28f 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapProtocolServerImpl.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapProtocolServerImpl.java
@@ -45,7 +45,6 @@ import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmitWor
 import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmitWorkResponseProto;
 import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.TerminateFragmentRequestProto;
 import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.TerminateFragmentResponseProto;
-import org.apache.hadoop.io.Text;
 import org.apache.hadoop.ipc.ProtobufRpcEngine;
 import org.apache.hadoop.ipc.RPC;
 import org.apache.hadoop.net.NetUtils;
@@ -64,6 +63,9 @@ public class LlapProtocolServerImpl extends AbstractService
     implements LlapProtocolBlockingPB, LlapManagementProtocolPB {
 
   private static final Logger LOG = LoggerFactory.getLogger(LlapProtocolServerImpl.class);
+  private enum TokenRequiresSigning {
+    TRUE, FALSE, EXCEPT_OWNER
+  }
 
   private final int numHandlers;
   private final ContainerRunner containerRunner;
@@ -71,8 +73,10 @@ public class LlapProtocolServerImpl extends AbstractService
   private RPC.Server server, mngServer;
   private final AtomicReference<InetSocketAddress> srvAddress, mngAddress;
   private SecretManager zkSecretManager;
-  private String restrictedToUser = null;
+  private String clusterUser = null;
+  private boolean isRestrictedToClusterUser = false;
   private final DaemonId daemonId;
+  private TokenRequiresSigning isSigningRequiredConfig = TokenRequiresSigning.TRUE;
 
   public LlapProtocolServerImpl(int numHandlers, ContainerRunner containerRunner,
       AtomicReference<InetSocketAddress> srvAddress, AtomicReference<InetSocketAddress> mngAddress,
@@ -132,6 +136,7 @@ public class LlapProtocolServerImpl extends AbstractService
   @Override
   public void serviceStart() {
     final Configuration conf = getConfig();
+    isSigningRequiredConfig = getSigningConfig(conf);
     final BlockingService daemonImpl =
         LlapDaemonProtocolProtos.LlapDaemonProtocol.newReflectiveBlockingService(this);
     final BlockingService managementImpl =
@@ -140,13 +145,14 @@ public class LlapProtocolServerImpl extends AbstractService
       startProtocolServers(conf, daemonImpl, managementImpl);
       return;
     }
+    try {
+      this.clusterUser = UserGroupInformation.getCurrentUser().getShortUserName();
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
     if (isPermissiveManagementAcl(conf)) {
       LOG.warn("Management protocol has a '*' ACL.");
-      try {
-        this.restrictedToUser = UserGroupInformation.getCurrentUser().getShortUserName();
-      } catch (IOException e) {
-        throw new RuntimeException(e);
-      }
+      isRestrictedToClusterUser = true;
     }
     String llapPrincipal = HiveConf.getVar(conf, ConfVars.LLAP_KERBEROS_PRINCIPAL),
         llapKeytab = HiveConf.getVar(conf, ConfVars.LLAP_KERBEROS_KEYTAB_FILE);
@@ -169,6 +175,20 @@ public class LlapProtocolServerImpl extends AbstractService
     });
   }
 
+  private static TokenRequiresSigning getSigningConfig(final Configuration conf) {
+    String signSetting = HiveConf.getVar(
+        conf, ConfVars.LLAP_REMOTE_TOKEN_REQUIRES_SIGNING).toLowerCase();
+    switch (signSetting) {
+    case "true": return TokenRequiresSigning.TRUE;
+    case "except_llap_owner": return TokenRequiresSigning.EXCEPT_OWNER;
+    case "false": return TokenRequiresSigning.FALSE;
+    default: {
+      throw new RuntimeException("Invalid value for "
+          + ConfVars.LLAP_REMOTE_TOKEN_REQUIRES_SIGNING.varname + ": " + signSetting);
+    }
+    }
+  }
+
   private static boolean isPermissiveManagementAcl(Configuration conf) {
     return HiveConf.getBoolVar(conf, ConfVars.LLAP_VALIDATE_ACLS)
         && AccessControlList.WILDCARD_ACL_VALUE.equals(
@@ -266,17 +286,20 @@ public class LlapProtocolServerImpl extends AbstractService
     if (zkSecretManager == null) {
       throw new ServiceException("Operation not supported on unsecure cluster");
     }
-    UserGroupInformation ugi = null;
+    UserGroupInformation callingUser = null;
     Token<LlapTokenIdentifier> token = null;
     try {
-      ugi = UserGroupInformation.getCurrentUser();
-      token = zkSecretManager.createLlapToken(request.getAppId(), null);
+      callingUser = UserGroupInformation.getCurrentUser();
+      // Determine if the user would need to sign fragments.
+      boolean isSigningRequired = determineIfSigningIsRequired(callingUser);
+      token = zkSecretManager.createLlapToken(
+          request.hasAppId() ? request.getAppId() : null, null, isSigningRequired);
     } catch (IOException e) {
       throw new ServiceException(e);
     }
-    if (restrictedToUser != null && !restrictedToUser.equals(ugi.getShortUserName())) {
+    if (isRestrictedToClusterUser && !clusterUser.equals(callingUser.getShortUserName())) {
       throw new ServiceException("Management protocol ACL is too permissive. The access has been"
-          + " automatically restricted to " + restrictedToUser + "; " + ugi.getShortUserName()
+          + " automatically restricted to " + clusterUser + "; " + callingUser.getShortUserName()
           + " is denied acccess. Please set " + ConfVars.LLAP_VALIDATE_ACLS.varname + " to false,"
           + " or adjust " + ConfVars.LLAP_MANAGEMENT_ACL.varname + " and "
           + ConfVars.LLAP_MANAGEMENT_ACL_DENY.varname + " to a more restrictive ACL.");
@@ -292,4 +315,16 @@ public class LlapProtocolServerImpl extends AbstractService
     GetTokenResponseProto response = GetTokenResponseProto.newBuilder().setToken(bs).build();
     return response;
   }
+
+  private boolean determineIfSigningIsRequired(UserGroupInformation callingUser) {
+    switch (isSigningRequiredConfig) {
+    case FALSE: return false;
+    case TRUE: return true;
+    // Note that this uses short user name without consideration for Kerberos realm.
+    // This seems to be the common approach (e.g. for HDFS permissions), but it may be
+    // better to consider the realm (although not the host, so not the full name).
+    case EXCEPT_OWNER: return !clusterUser.equals(callingUser.getShortUserName());
+    default: throw new AssertionError("Unknown value " + isSigningRequiredConfig);
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/9fe3dab7/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapTokenChecker.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapTokenChecker.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapTokenChecker.java
index 04df929..24a7737 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapTokenChecker.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapTokenChecker.java
@@ -23,8 +23,6 @@ import java.util.List;
 import java.io.IOException;
 
 import org.apache.commons.lang3.StringUtils;
-import org.apache.commons.lang3.tuple.ImmutablePair;
-import org.apache.commons.lang3.tuple.Pair;
 import org.apache.hadoop.hive.llap.security.LlapTokenIdentifier;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.token.TokenIdentifier;
@@ -34,8 +32,20 @@ import org.slf4j.LoggerFactory;
 public final class LlapTokenChecker {
   private static final Logger LOG = LoggerFactory.getLogger(LlapTokenChecker.class);
 
-  private static final ImmutablePair<String, String> NO_SECURITY = new ImmutablePair<>(null, null);
-  public static Pair<String, String> getTokenInfo(String clusterId) throws IOException {
+  public static final class LlapTokenInfo {
+    public final String userName;
+    public final String appId;
+    public final boolean isSigningRequired;
+
+    public LlapTokenInfo(String userName, String appId, boolean isSigningRequired) {
+      this.userName = userName;
+      this.appId = appId;
+      this.isSigningRequired = isSigningRequired;
+    }
+  }
+
+  private static final LlapTokenInfo NO_SECURITY = new LlapTokenInfo(null, null, false);
+  public static LlapTokenInfo getTokenInfo(String clusterId) throws IOException {
     if (!UserGroupInformation.isSecurityEnabled()) return NO_SECURITY;
     UserGroupInformation current = UserGroupInformation.getCurrentUser();
     String kerberosName = current.hasKerberosCredentials() ? current.getShortUserName() : null;
@@ -65,13 +75,14 @@ public final class LlapTokenChecker {
   }
 
   @VisibleForTesting
-  static Pair<String, String> getTokenInfoInternal(
+  static LlapTokenInfo getTokenInfoInternal(
       String kerberosName, List<LlapTokenIdentifier> tokens) {
     assert (tokens != null && !tokens.isEmpty()) || kerberosName != null;
     if (tokens == null) {
-      return new ImmutablePair<String, String>(kerberosName, null);
+      return new LlapTokenInfo(kerberosName, null, true);
     }
     String userName = kerberosName, appId = null;
+    boolean isSigningRequired = false;
     for (LlapTokenIdentifier llapId : tokens) {
       String newUserName = llapId.getRealUser().toString();
       if (userName != null && !userName.equals(newUserName)) {
@@ -88,9 +99,10 @@ public final class LlapTokenChecker {
         }
         appId = newAppId;
       }
+      isSigningRequired = isSigningRequired || llapId.isSigningRequired();
     }
     assert userName != null;
-    return new ImmutablePair<String, String>(userName, appId);
+    return new LlapTokenInfo(userName, appId, isSigningRequired);
   }
 
   public static void checkPermissions(
@@ -120,12 +132,12 @@ public final class LlapTokenChecker {
   }
 
   public static void checkPermissions(
-      Pair<String, String> prm, String userName, String appId, Object hint) {
+      LlapTokenInfo prm, String userName, String appId, Object hint) {
     if (userName == null) {
       assert StringUtils.isEmpty(appId);
       return;
     }
-    if (!checkTokenPermissions(userName, appId, prm.getLeft(), prm.getRight())) {
+    if (!checkTokenPermissions(userName, appId, prm.userName, prm.appId)) {
       throw new SecurityException("Unauthorized to access "
           + userName + ", " + appId.hashCode() + " (" + hint + ")");
     }

http://git-wip-us.apache.org/repos/asf/hive/blob/9fe3dab7/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryTracker.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryTracker.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryTracker.java
index c55436b..a965872 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryTracker.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryTracker.java
@@ -20,7 +20,6 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 
-import org.apache.commons.lang3.tuple.Pair;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.token.Token;
@@ -31,6 +30,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
+import org.apache.hadoop.hive.llap.daemon.impl.LlapTokenChecker.LlapTokenInfo;
 import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SignableVertexSpec;
 import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SourceStateProto;
 import org.apache.hadoop.hive.llap.shufflehandler.ShuffleHandler;
@@ -119,7 +119,7 @@ public class QueryTracker extends AbstractService {
   QueryFragmentInfo registerFragment(QueryIdentifier queryIdentifier, String appIdString,
       String dagName, int dagIdentifier, String vertexName, int fragmentNumber, int attemptNumber,
       String user, SignableVertexSpec vertex, Token<JobTokenIdentifier> appToken,
-      String fragmentIdString) throws IOException {
+      String fragmentIdString, LlapTokenInfo tokenInfo) throws IOException {
     ReadWriteLock dagLock = getDagLock(queryIdentifier);
     dagLock.readLock().lock();
     try {
@@ -132,16 +132,18 @@ public class QueryTracker extends AbstractService {
       }
       // TODO: for now, we get the secure username out of UGI... after signing, we can take it
       //       out of the request provided that it's signed.
-      Pair<String, String> tokenInfo = LlapTokenChecker.getTokenInfo(clusterId);
+      if (tokenInfo == null) {
+        tokenInfo = LlapTokenChecker.getTokenInfo(clusterId);
+      }
       boolean isExistingQueryInfo = true;
       QueryInfo queryInfo = queryInfoMap.get(queryIdentifier);
       if (queryInfo == null) {
-        String tokenUser = tokenInfo.getLeft(), tokenAppId = tokenInfo.getRight();
         if (UserGroupInformation.isSecurityEnabled()) {
-          Preconditions.checkNotNull(tokenUser);
+          Preconditions.checkNotNull(tokenInfo.userName);
         }
         queryInfo = new QueryInfo(queryIdentifier, appIdString, dagName, dagIdentifier, user,
-           getSourceCompletionMap(queryIdentifier), localDirsBase, localFs, tokenUser, tokenAppId);
+            getSourceCompletionMap(queryIdentifier), localDirsBase, localFs,
+            tokenInfo.userName, tokenInfo.appId);
         QueryInfo old = queryInfoMap.putIfAbsent(queryIdentifier, queryInfo);
         if (old != null) {
           queryInfo = old;

http://git-wip-us.apache.org/repos/asf/hive/blob/9fe3dab7/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java
index eac0e8f..1e302e8 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java
@@ -628,9 +628,9 @@ public class TaskExecutorService extends AbstractService implements Scheduler<Ta
         String state = reason == null ? "FAILED" : reason.name();
         boolean removed = preemptionQueue.remove(taskWrapper);
         if (removed && isInfoEnabled) {
-          LOG.info(TaskRunnerCallable
-              .getTaskIdentifierString(taskWrapper.getTaskRunnerCallable().getRequest())
-              + " request " + state + "! Removed from preemption list.");
+          TaskRunnerCallable trc = taskWrapper.getTaskRunnerCallable();
+          LOG.info(TaskRunnerCallable.getTaskIdentifierString(trc.getRequest(),
+              trc.getVertexSpec()) + " request " + state + "! Removed from preemption list.");
         }
         if (metrics != null) {
           metrics.setExecutorNumPreemptableRequests(preemptionQueue.size());

http://git-wip-us.apache.org/repos/asf/hive/blob/9fe3dab7/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java
index 74359fa..0d9882b 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java
@@ -124,7 +124,8 @@ public class TaskRunnerCallable extends CallableWithNdc<TaskRunner2Result> {
                      ConfParams confParams, LlapDaemonExecutorMetrics metrics,
                      KilledTaskHandler killedTaskHandler,
                      FragmentCompletionHandler fragmentCompleteHandler,
-                     HadoopShim tezHadoopShim, TezTaskAttemptID attemptId) {
+                     HadoopShim tezHadoopShim, TezTaskAttemptID attemptId,
+                     SignableVertexSpec vertex) {
     this.request = request;
     this.fragmentInfo = fragmentInfo;
     this.conf = conf;
@@ -135,8 +136,7 @@ public class TaskRunnerCallable extends CallableWithNdc<TaskRunner2Result> {
     this.memoryAvailable = memoryAvailable;
     this.confParams = confParams;
     this.jobToken = TokenCache.getSessionToken(credentials);
-    // TODO: support binary spec here or above
-    this.vertex = request.getWorkSpec().getVertex();
+    this.vertex = vertex;
     this.taskSpec = Converters.getTaskSpecfromProto(
         vertex, request.getFragmentNumber(), request.getAttemptNumber(), attemptId);
     this.amReporter = amReporter;
@@ -389,7 +389,7 @@ public class TaskRunnerCallable extends CallableWithNdc<TaskRunner2Result> {
   }
 
   public TaskRunnerCallback getCallback() {
-    return new TaskRunnerCallback(request, this);
+    return new TaskRunnerCallback(request, vertex, this);
   }
 
   public SubmitWorkRequestProto getRequest() {
@@ -399,11 +399,13 @@ public class TaskRunnerCallable extends CallableWithNdc<TaskRunner2Result> {
   final class TaskRunnerCallback implements FutureCallback<TaskRunner2Result> {
 
     private final SubmitWorkRequestProto request;
+    private final SignableVertexSpec vertex;
     private final TaskRunnerCallable taskRunnerCallable;
 
-    TaskRunnerCallback(SubmitWorkRequestProto request,
+    TaskRunnerCallback(SubmitWorkRequestProto request, SignableVertexSpec vertex,
         TaskRunnerCallable taskRunnerCallable) {
       this.request = request;
+      this.vertex = vertex;
       this.taskRunnerCallable = taskRunnerCallable;
     }
 
@@ -463,7 +465,8 @@ public class TaskRunnerCallable extends CallableWithNdc<TaskRunner2Result> {
 
     @Override
     public void onFailure(Throwable t) {
-      LOG.error("TezTaskRunner execution failed for : " + getTaskIdentifierString(request), t);
+      LOG.error("TezTaskRunner execution failed for : "
+          + getTaskIdentifierString(request, vertex), t);
       isCompleted.set(true);
       fragmentCompletionHanler.fragmentComplete(fragmentInfo);
       // TODO HIVE-10236 Report a fatal error over the umbilical
@@ -494,10 +497,8 @@ public class TaskRunnerCallable extends CallableWithNdc<TaskRunner2Result> {
   }
 
   public static String getTaskIdentifierString(
-      SubmitWorkRequestProto request) {
+      SubmitWorkRequestProto request, SignableVertexSpec vertex) {
     StringBuilder sb = new StringBuilder();
-    // TODO: also support the binary version
-    SignableVertexSpec vertex = request.getWorkSpec().getVertex();
     sb.append("AppId=").append(vertex.getVertexIdentifier().getApplicationIdString())
         .append(", containerId=").append(request.getContainerIdString())
         .append(", Dag=").append(vertex.getDagName())

http://git-wip-us.apache.org/repos/asf/hive/blob/9fe3dab7/llap-server/src/java/org/apache/hadoop/hive/llap/security/LlapSignerImpl.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/security/LlapSignerImpl.java b/llap-server/src/java/org/apache/hadoop/hive/llap/security/LlapSignerImpl.java
new file mode 100644
index 0000000..4174593
--- /dev/null
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/security/LlapSignerImpl.java
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.llap.security;
+
+import java.io.IOException;
+import java.util.Arrays;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.llap.DaemonId;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.delegation.DelegationKey;
+
+import com.google.common.annotations.VisibleForTesting;
+
+public class LlapSignerImpl implements LlapSigner {
+  private final SigningSecretManager secretManager;
+
+  public LlapSignerImpl(Configuration conf, DaemonId daemonId) {
+    // TODO: create this centrally in HS2 case
+    secretManager = SecretManager.createSecretManager(conf, daemonId.getClusterString());
+  }
+
+  @VisibleForTesting
+  public LlapSignerImpl(SigningSecretManager sm) {
+    secretManager = sm;
+  }
+
+  @Override
+  public SignedMessage serializeAndSign(Signable message) throws IOException {
+    SignedMessage result = new SignedMessage();
+    DelegationKey key = secretManager.getCurrentKey();
+    message.setSignInfo(key.getKeyId(), UserGroupInformation.getCurrentUser().getUserName());
+    result.message = message.serialize();
+    result.signature = secretManager.signWithKey(result.message, key);
+    return result;
+  }
+
+  @Override
+  public void checkSignature(byte[] message, byte[] signature, int keyId)
+      throws SecurityException {
+    byte[] expectedSignature = secretManager.signWithKey(message, keyId);
+    if (Arrays.equals(signature, expectedSignature)) return;
+    throw new SecurityException("Message signature does not match");
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/9fe3dab7/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorTestHelpers.java
----------------------------------------------------------------------
diff --git a/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorTestHelpers.java b/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorTestHelpers.java
index e0f0676..96d626a 100644
--- a/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorTestHelpers.java
+++ b/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorTestHelpers.java
@@ -150,7 +150,8 @@ public class TaskExecutorTestHelpers {
           new ExecutionContextImpl("localhost"), null, new Credentials(), 0, mock(AMReporter.class), null, mock(
               LlapDaemonExecutorMetrics.class),
           mock(KilledTaskHandler.class), mock(
-              FragmentCompletionHandler.class), new DefaultHadoopShim(), null);
+              FragmentCompletionHandler.class), new DefaultHadoopShim(), null,
+              requestProto.getWorkSpec().getVertex());
       this.workTime = workTime;
       this.canFinish = canFinish;
     }

http://git-wip-us.apache.org/repos/asf/hive/blob/9fe3dab7/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestLlapTokenChecker.java
----------------------------------------------------------------------
diff --git a/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestLlapTokenChecker.java b/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestLlapTokenChecker.java
index aaaa762..d4ded23 100644
--- a/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestLlapTokenChecker.java
+++ b/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestLlapTokenChecker.java
@@ -65,7 +65,7 @@ public class TestLlapTokenChecker {
   private List<LlapTokenIdentifier> createTokens(String... args) {
     List<LlapTokenIdentifier> tokens = new ArrayList<>();
     for (int i = 0; i < args.length; i += 2) {
-      tokens.add(new LlapTokenIdentifier(null, null, new Text(args[i]), "c", args[i + 1]));
+      tokens.add(new LlapTokenIdentifier(null, null, new Text(args[i]), "c", args[i + 1], false));
     }
     return tokens;
   }
@@ -89,8 +89,8 @@ public class TestLlapTokenChecker {
     }
   }
 
-  private void check(Pair<String, String> p, String user, String appId) {
-    assertEquals(user, p.getLeft());
-    assertEquals(appId, p.getRight());
+  private void check(LlapTokenChecker.LlapTokenInfo p, String user, String appId) {
+    assertEquals(user, p.userName);
+    assertEquals(appId, p.appId);
   }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hive/blob/9fe3dab7/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/comparator/TestFirstInFirstOutComparator.java
----------------------------------------------------------------------
diff --git a/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/comparator/TestFirstInFirstOutComparator.java b/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/comparator/TestFirstInFirstOutComparator.java
index a250882..ac48a3a 100644
--- a/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/comparator/TestFirstInFirstOutComparator.java
+++ b/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/comparator/TestFirstInFirstOutComparator.java
@@ -53,37 +53,6 @@ public class TestFirstInFirstOutComparator {
   private static Configuration conf;
   private static Credentials cred = new Credentials();
 
-  private static class MockRequest extends TaskRunnerCallable {
-    private int workTime;
-    private boolean canFinish;
-
-    public MockRequest(SubmitWorkRequestProto requestProto,
-        boolean canFinish, int workTime) {
-      super(requestProto, mock(QueryFragmentInfo.class), conf,
-          new ExecutionContextImpl("localhost"), null, cred, 0, null, null, null,
-          mock(KilledTaskHandler.class), mock(
-          FragmentCompletionHandler.class), new DefaultHadoopShim(), null);
-      this.workTime = workTime;
-      this.canFinish = canFinish;
-    }
-
-    @Override
-    protected TaskRunner2Result callInternal() {
-      System.out.println(super.getRequestId() + " is executing..");
-      try {
-        Thread.sleep(workTime);
-      } catch (InterruptedException e) {
-        return new TaskRunner2Result(EndReason.KILL_REQUESTED, null, null, false);
-      }
-      return new TaskRunner2Result(EndReason.SUCCESS, null, null, false);
-    }
-
-    @Override
-    public boolean canFinish() {
-      return canFinish;
-    }
-  }
-
   @Before
   public void setup() {
     conf = new Configuration();

http://git-wip-us.apache.org/repos/asf/hive/blob/9fe3dab7/llap-server/src/test/org/apache/hadoop/hive/llap/security/TestLlapSignerImpl.java
----------------------------------------------------------------------
diff --git a/llap-server/src/test/org/apache/hadoop/hive/llap/security/TestLlapSignerImpl.java b/llap-server/src/test/org/apache/hadoop/hive/llap/security/TestLlapSignerImpl.java
new file mode 100644
index 0000000..0420225
--- /dev/null
+++ b/llap-server/src/test/org/apache/hadoop/hive/llap/security/TestLlapSignerImpl.java
@@ -0,0 +1,200 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.llap.security;
+
+import static org.junit.Assert.*;
+
+import java.io.IOException;
+
+import org.apache.hadoop.hive.llap.security.LlapSigner.Signable;
+import org.apache.hadoop.hive.llap.security.LlapSigner.SignedMessage;
+import org.apache.hadoop.io.DataInputBuffer;
+import org.apache.hadoop.io.DataOutputBuffer;
+import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier;
+import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenSecretManager;
+import org.apache.hadoop.security.token.delegation.DelegationKey;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class TestLlapSignerImpl {
+  private static final Logger LOG = LoggerFactory.getLogger(TestLlapSignerImpl.class);
+
+  @Test(timeout = 10000)
+  public void testSigning() throws Exception {
+    FakeSecretManager fsm = new FakeSecretManager();
+    fsm.startThreads();
+
+    // Make sure the signature works.
+    LlapSignerImpl signer = new LlapSignerImpl(fsm);
+    byte theByte = 1;
+    TestSignable in = new TestSignable(theByte);
+    TestSignable in2 = new TestSignable(++theByte);
+    SignedMessage sm2 = signer.serializeAndSign(in2);
+    SignedMessage sm = signer.serializeAndSign(in);
+    TestSignable out = TestSignable.deserialize(sm.message);
+    TestSignable out2 = TestSignable.deserialize(sm2.message);
+    assertEquals(in, out);
+    assertEquals(in2, out2);
+    signer.checkSignature(sm.message, sm.signature, out.masterKeyId);
+    signer.checkSignature(sm2.message, sm2.signature, out2.masterKeyId);
+
+    // Make sure the broken signature doesn't work.
+    try {
+      signer.checkSignature(sm.message, sm2.signature, out.masterKeyId);
+      fail("Didn't throw");
+    } catch (SecurityException ex) {
+      // Expected.
+    }
+
+    int index = sm.signature.length / 2;
+    sm.signature[index] = (byte)(sm.signature[index] + 1);
+    try {
+      signer.checkSignature(sm.message, sm.signature, out.masterKeyId);
+      fail("Didn't throw");
+    } catch (SecurityException ex) {
+      // Expected.
+    }
+    sm.signature[index] = (byte)(sm.signature[index] - 1);
+
+    // Adding keys is PITA - there's no way to plug into timed rolling; just create a new fsm.
+    DelegationKey dk = fsm.getCurrentKey();
+    fsm.stopThreads();
+    fsm = new FakeSecretManager();
+    fsm.addKey(dk);
+    fsm.startThreads();
+    signer = new LlapSignerImpl(fsm);
+    // Sign in2 with a different key.
+    sm2 = signer.serializeAndSign(in2);
+    out2 = TestSignable.deserialize(sm2.message);
+    assertNotEquals(out.masterKeyId, out2.masterKeyId);
+    assertEquals(in2, out2);
+    signer.checkSignature(sm2.message, sm2.signature, out2.masterKeyId);
+    signer.checkSignature(sm.message, sm.signature, out.masterKeyId);
+    // Make sure the key ID mismatch causes error.
+    try {
+      signer.checkSignature(sm2.message, sm2.signature, out.masterKeyId);
+      fail("Didn't throw");
+    } catch (SecurityException ex) {
+      // Expected.
+    }
+
+    // The same for rolling the key; re-create the fsm with only the key #2.
+    dk = fsm.getCurrentKey();
+    fsm.stopThreads();
+
+    fsm = new FakeSecretManager();
+    fsm.addKey(dk);
+    fsm.startThreads();
+    signer = new LlapSignerImpl(fsm);
+    signer.checkSignature(sm2.message, sm2.signature, out2.masterKeyId);
+    // The key is missing - shouldn't be able to verify.
+    try {
+      signer.checkSignature(sm.message, sm.signature, out.masterKeyId);
+      fail("Didn't throw");
+    } catch (SecurityException ex) {
+      // Expected.
+    }
+    fsm.stopThreads();
+  }
+
+  private static class TestSignable implements Signable {
+    public int masterKeyId;
+    public byte index;
+
+    public TestSignable(byte i) {
+      index = i;
+    }
+
+    public TestSignable(int keyId, byte b) {
+      masterKeyId = keyId;
+      index = b;
+    }
+
+    @Override
+    public void setSignInfo(int masterKeyId, String user) {
+      this.masterKeyId = masterKeyId;
+    }
+
+    @Override
+    public byte[] serialize() throws IOException {
+      DataOutputBuffer dob = new DataOutputBuffer(5);
+      dob.writeInt(masterKeyId);
+      dob.write(index);
+      byte[] b = dob.getData();
+      dob.close();
+      return b;
+    }
+
+    public static TestSignable deserialize(byte[] bytes) throws IOException {
+      DataInputBuffer db = new DataInputBuffer();
+      db.reset(bytes, bytes.length);
+      int keyId = db.readInt();
+      byte b = db.readByte();
+      db.close();
+      return new TestSignable(keyId, b);
+    }
+
+    @Override
+    public int hashCode() {
+      return 31 * index + masterKeyId;
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+      if (this == obj) return true;
+      if (!(obj instanceof TestSignable)) return false;
+      TestSignable other = (TestSignable) obj;
+      return (index == other.index) && (masterKeyId == other.masterKeyId);
+    }
+  }
+
+  private static class FakeSecretManager
+    extends AbstractDelegationTokenSecretManager<AbstractDelegationTokenIdentifier>
+    implements SigningSecretManager {
+
+    public FakeSecretManager() {
+      // The keys instantly expire and are rolled.
+      super(10000000, 10000000, 10000000, 10000000);
+    }
+
+    @Override
+    public DelegationKey getCurrentKey() {
+      return getDelegationKey(getCurrentKeyId());
+    }
+
+    @Override
+    public byte[] signWithKey(byte[] message, DelegationKey key) {
+      return createPassword(message, key.getKey());
+    }
+
+    @Override
+    public byte[] signWithKey(byte[] message, int keyId) throws SecurityException {
+      DelegationKey key = getDelegationKey(keyId);
+      if (key == null) {
+        throw new SecurityException("The key ID " + keyId + " was not found");
+      }
+      return createPassword(message, key.getKey());
+    }
+
+    @Override
+    public AbstractDelegationTokenIdentifier createIdentifier() {
+      throw new UnsupportedOperationException();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/9fe3dab7/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java
index c9b912b..d04cfa3 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java
@@ -54,7 +54,7 @@ import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
 import org.apache.hadoop.hive.llap.DaemonId;
 import org.apache.hadoop.hive.llap.LlapUtil;
 import org.apache.hadoop.hive.llap.impl.LlapProtocolClientImpl;
-import org.apache.hadoop.hive.llap.security.LlapTokenClientFactory;
+import org.apache.hadoop.hive.llap.security.LlapTokenClient;
 import org.apache.hadoop.hive.llap.security.LlapTokenIdentifier;
 import org.apache.hadoop.hive.llap.security.LlapTokenLocalClient;
 import org.apache.hadoop.hive.llap.tez.LlapProtocolClientProxy;
@@ -355,10 +355,12 @@ public class TezSessionState {
       String user, final Configuration conf) throws IOException {
     // TODO: parts of this should be moved out of TezSession to reuse the clients, but there's
     //       no good place for that right now (HIVE-13698).
-    boolean useLocalTokenClient = isUsingLocalClient(conf);
+    SessionState session = SessionState.get();
+    boolean isInHs2 = session != null && session.isHiveServerQuery();
     Token<LlapTokenIdentifier> token = null;
     // For Tez, we don't use appId to distinguish the tokens.
-    if (useLocalTokenClient) {
+    if (isInHs2) {
+      // We are in HS2, get the token locally.
       String clusterName = LlapUtil.generateClusterName(conf);
       // This assumes that the LLAP cluster and session are both running under HS2 user.
       final String clusterId = DaemonId.createClusterString(user, clusterName);
@@ -368,12 +370,13 @@ public class TezSessionState {
           public LlapTokenLocalClient call() throws Exception {
             return new LlapTokenLocalClient(conf, clusterId);
           }
-        }).createToken(null, null);
+        }).createToken(null, null, false); // Signature is not required for Tez.
       } catch (ExecutionException e) {
         throw new IOException(e);
       }
     } else {
-      token = new LlapTokenClientFactory(conf).createClient().getDelegationToken(null);
+      // We are not in HS2; always create a new client for now.
+      token = new LlapTokenClient(conf).getDelegationToken(null);
     }
     if (LOG.isInfoEnabled()) {
       LOG.info("Obtained a LLAP token: " + token);
@@ -381,19 +384,6 @@ public class TezSessionState {
     return token;
   }
 
-  private static boolean isUsingLocalClient(Configuration conf) {
-    String mode = HiveConf.getVar(conf, ConfVars.LLAP_CREATE_TOKEN_LOCALLY).toLowerCase();
-    boolean isHs2Only = "hs2".equals(mode);
-    // We are initialized on first use inside TezSessionState::openInternal; assume the session
-    // should be available.
-    if (!isHs2Only) return "true".equals(mode);
-    SessionState session = SessionState.get();
-    if (session == null && LOG.isInfoEnabled()) {
-      LOG.warn("There's no session to check if we are in HS2");
-    }
-    return session != null && session.isHiveServerQuery();
-  }
-
   private TezClient startSessionAndContainers(TezClient session, HiveConf conf,
       Map<String, LocalResource> commonLocalResources, TezConfiguration tezConfig,
       boolean isOnThread) throws TezException, IOException {