You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by sp...@apache.org on 2016/05/06 20:42:34 UTC

[03/50] [abbrv] hive git commit: HIVE-13445 : LLAP: token should encode application and cluster ids (Sergey Shelukhin, reviewed by Siddharth Seth)

HIVE-13445 : LLAP: token should encode application and cluster ids (Sergey Shelukhin, reviewed by Siddharth Seth)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/868e5e14
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/868e5e14
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/868e5e14

Branch: refs/heads/java8
Commit: 868e5e141856ce75af48d854d9e3eb13372d11f4
Parents: b621827
Author: Sergey Shelukhin <se...@apache.org>
Authored: Tue May 3 12:01:32 2016 -0700
Committer: Sergey Shelukhin <se...@apache.org>
Committed: Tue May 3 13:38:03 2016 -0700

----------------------------------------------------------------------
 .../daemon/rpc/LlapDaemonProtocolProtos.java    | 209 +++++++++++++++++--
 .../org/apache/hadoop/hive/llap/DaemonId.java   |  41 ++++
 .../hive/llap/security/LlapTokenIdentifier.java |  39 +++-
 .../hive/llap/security/LlapTokenProvider.java   |   2 +-
 .../src/protobuf/LlapDaemonProtocol.proto       |   1 +
 .../hive/llap/daemon/ContainerRunner.java       |   9 +-
 .../llap/daemon/impl/ContainerRunnerImpl.java   |  47 +++--
 .../hive/llap/daemon/impl/LlapDaemon.java       |  52 ++++-
 .../daemon/impl/LlapProtocolServerImpl.java     |  41 ++--
 .../hive/llap/daemon/impl/LlapTokenChecker.java | 137 ++++++++++++
 .../hadoop/hive/llap/daemon/impl/QueryInfo.java |  17 +-
 .../hive/llap/daemon/impl/QueryTracker.java     |  85 +++++---
 .../hadoop/hive/llap/daemon/impl/Scheduler.java |   2 +
 .../llap/daemon/impl/TaskExecutorService.java   |   9 +
 .../hive/llap/security/LlapSecurityHelper.java  |  15 +-
 .../hive/llap/security/SecretManager.java       |  19 +-
 .../hive/llap/daemon/MiniLlapCluster.java       |   2 +-
 .../daemon/impl/TaskExecutorTestHelpers.java    |   2 +-
 .../impl/TestLlapDaemonProtocolServerImpl.java  |   2 +-
 .../llap/daemon/impl/TestLlapTokenChecker.java  |  96 +++++++++
 .../hive/ql/exec/tez/TezSessionState.java       |   3 +-
 21 files changed, 702 insertions(+), 128 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/868e5e14/llap-common/src/gen/protobuf/gen-java/org/apache/hadoop/hive/llap/daemon/rpc/LlapDaemonProtocolProtos.java
----------------------------------------------------------------------
diff --git a/llap-common/src/gen/protobuf/gen-java/org/apache/hadoop/hive/llap/daemon/rpc/LlapDaemonProtocolProtos.java b/llap-common/src/gen/protobuf/gen-java/org/apache/hadoop/hive/llap/daemon/rpc/LlapDaemonProtocolProtos.java
index 4ab7b32..820f6be 100644
--- a/llap-common/src/gen/protobuf/gen-java/org/apache/hadoop/hive/llap/daemon/rpc/LlapDaemonProtocolProtos.java
+++ b/llap-common/src/gen/protobuf/gen-java/org/apache/hadoop/hive/llap/daemon/rpc/LlapDaemonProtocolProtos.java
@@ -12821,6 +12821,21 @@ public final class LlapDaemonProtocolProtos {
 
   public interface GetTokenRequestProtoOrBuilder
       extends com.google.protobuf.MessageOrBuilder {
+
+    // optional string app_id = 1;
+    /**
+     * <code>optional string app_id = 1;</code>
+     */
+    boolean hasAppId();
+    /**
+     * <code>optional string app_id = 1;</code>
+     */
+    java.lang.String getAppId();
+    /**
+     * <code>optional string app_id = 1;</code>
+     */
+    com.google.protobuf.ByteString
+        getAppIdBytes();
   }
   /**
    * Protobuf type {@code GetTokenRequestProto}
@@ -12855,6 +12870,7 @@ public final class LlapDaemonProtocolProtos {
         com.google.protobuf.ExtensionRegistryLite extensionRegistry)
         throws com.google.protobuf.InvalidProtocolBufferException {
       initFields();
+      int mutable_bitField0_ = 0;
       com.google.protobuf.UnknownFieldSet.Builder unknownFields =
           com.google.protobuf.UnknownFieldSet.newBuilder();
       try {
@@ -12872,6 +12888,11 @@ public final class LlapDaemonProtocolProtos {
               }
               break;
             }
+            case 10: {
+              bitField0_ |= 0x00000001;
+              appId_ = input.readBytes();
+              break;
+            }
           }
         }
       } catch (com.google.protobuf.InvalidProtocolBufferException e) {
@@ -12911,7 +12932,52 @@ public final class LlapDaemonProtocolProtos {
       return PARSER;
     }
 
+    private int bitField0_;
+    // optional string app_id = 1;
+    public static final int APP_ID_FIELD_NUMBER = 1;
+    private java.lang.Object appId_;
+    /**
+     * <code>optional string app_id = 1;</code>
+     */
+    public boolean hasAppId() {
+      return ((bitField0_ & 0x00000001) == 0x00000001);
+    }
+    /**
+     * <code>optional string app_id = 1;</code>
+     */
+    public java.lang.String getAppId() {
+      java.lang.Object ref = appId_;
+      if (ref instanceof java.lang.String) {
+        return (java.lang.String) ref;
+      } else {
+        com.google.protobuf.ByteString bs = 
+            (com.google.protobuf.ByteString) ref;
+        java.lang.String s = bs.toStringUtf8();
+        if (bs.isValidUtf8()) {
+          appId_ = s;
+        }
+        return s;
+      }
+    }
+    /**
+     * <code>optional string app_id = 1;</code>
+     */
+    public com.google.protobuf.ByteString
+        getAppIdBytes() {
+      java.lang.Object ref = appId_;
+      if (ref instanceof java.lang.String) {
+        com.google.protobuf.ByteString b = 
+            com.google.protobuf.ByteString.copyFromUtf8(
+                (java.lang.String) ref);
+        appId_ = b;
+        return b;
+      } else {
+        return (com.google.protobuf.ByteString) ref;
+      }
+    }
+
     private void initFields() {
+      appId_ = "";
     }
     private byte memoizedIsInitialized = -1;
     public final boolean isInitialized() {
@@ -12925,6 +12991,9 @@ public final class LlapDaemonProtocolProtos {
     public void writeTo(com.google.protobuf.CodedOutputStream output)
                         throws java.io.IOException {
       getSerializedSize();
+      if (((bitField0_ & 0x00000001) == 0x00000001)) {
+        output.writeBytes(1, getAppIdBytes());
+      }
       getUnknownFields().writeTo(output);
     }
 
@@ -12934,6 +13003,10 @@ public final class LlapDaemonProtocolProtos {
       if (size != -1) return size;
 
       size = 0;
+      if (((bitField0_ & 0x00000001) == 0x00000001)) {
+        size += com.google.protobuf.CodedOutputStream
+          .computeBytesSize(1, getAppIdBytes());
+      }
       size += getUnknownFields().getSerializedSize();
       memoizedSerializedSize = size;
       return size;
@@ -12957,6 +13030,11 @@ public final class LlapDaemonProtocolProtos {
       org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.GetTokenRequestProto other = (org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.GetTokenRequestProto) obj;
 
       boolean result = true;
+      result = result && (hasAppId() == other.hasAppId());
+      if (hasAppId()) {
+        result = result && getAppId()
+            .equals(other.getAppId());
+      }
       result = result &&
           getUnknownFields().equals(other.getUnknownFields());
       return result;
@@ -12970,6 +13048,10 @@ public final class LlapDaemonProtocolProtos {
       }
       int hash = 41;
       hash = (19 * hash) + getDescriptorForType().hashCode();
+      if (hasAppId()) {
+        hash = (37 * hash) + APP_ID_FIELD_NUMBER;
+        hash = (53 * hash) + getAppId().hashCode();
+      }
       hash = (29 * hash) + getUnknownFields().hashCode();
       memoizedHashCode = hash;
       return hash;
@@ -13079,6 +13161,8 @@ public final class LlapDaemonProtocolProtos {
 
       public Builder clear() {
         super.clear();
+        appId_ = "";
+        bitField0_ = (bitField0_ & ~0x00000001);
         return this;
       }
 
@@ -13105,6 +13189,13 @@ public final class LlapDaemonProtocolProtos {
 
       public org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.GetTokenRequestProto buildPartial() {
         org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.GetTokenRequestProto result = new org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.GetTokenRequestProto(this);
+        int from_bitField0_ = bitField0_;
+        int to_bitField0_ = 0;
+        if (((from_bitField0_ & 0x00000001) == 0x00000001)) {
+          to_bitField0_ |= 0x00000001;
+        }
+        result.appId_ = appId_;
+        result.bitField0_ = to_bitField0_;
         onBuilt();
         return result;
       }
@@ -13120,6 +13211,11 @@ public final class LlapDaemonProtocolProtos {
 
       public Builder mergeFrom(org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.GetTokenRequestProto other) {
         if (other == org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.GetTokenRequestProto.getDefaultInstance()) return this;
+        if (other.hasAppId()) {
+          bitField0_ |= 0x00000001;
+          appId_ = other.appId_;
+          onChanged();
+        }
         this.mergeUnknownFields(other.getUnknownFields());
         return this;
       }
@@ -13145,6 +13241,81 @@ public final class LlapDaemonProtocolProtos {
         }
         return this;
       }
+      private int bitField0_;
+
+      // optional string app_id = 1;
+      private java.lang.Object appId_ = "";
+      /**
+       * <code>optional string app_id = 1;</code>
+       */
+      public boolean hasAppId() {
+        return ((bitField0_ & 0x00000001) == 0x00000001);
+      }
+      /**
+       * <code>optional string app_id = 1;</code>
+       */
+      public java.lang.String getAppId() {
+        java.lang.Object ref = appId_;
+        if (!(ref instanceof java.lang.String)) {
+          java.lang.String s = ((com.google.protobuf.ByteString) ref)
+              .toStringUtf8();
+          appId_ = s;
+          return s;
+        } else {
+          return (java.lang.String) ref;
+        }
+      }
+      /**
+       * <code>optional string app_id = 1;</code>
+       */
+      public com.google.protobuf.ByteString
+          getAppIdBytes() {
+        java.lang.Object ref = appId_;
+        if (ref instanceof String) {
+          com.google.protobuf.ByteString b = 
+              com.google.protobuf.ByteString.copyFromUtf8(
+                  (java.lang.String) ref);
+          appId_ = b;
+          return b;
+        } else {
+          return (com.google.protobuf.ByteString) ref;
+        }
+      }
+      /**
+       * <code>optional string app_id = 1;</code>
+       */
+      public Builder setAppId(
+          java.lang.String value) {
+        if (value == null) {
+    throw new NullPointerException();
+  }
+  bitField0_ |= 0x00000001;
+        appId_ = value;
+        onChanged();
+        return this;
+      }
+      /**
+       * <code>optional string app_id = 1;</code>
+       */
+      public Builder clearAppId() {
+        bitField0_ = (bitField0_ & ~0x00000001);
+        appId_ = getDefaultInstance().getAppId();
+        onChanged();
+        return this;
+      }
+      /**
+       * <code>optional string app_id = 1;</code>
+       */
+      public Builder setAppIdBytes(
+          com.google.protobuf.ByteString value) {
+        if (value == null) {
+    throw new NullPointerException();
+  }
+  bitField0_ |= 0x00000001;
+        appId_ = value;
+        onChanged();
+        return this;
+      }
 
       // @@protoc_insertion_point(builder_scope:GetTokenRequestProto)
     }
@@ -14414,24 +14585,24 @@ public final class LlapDaemonProtocolProtos {
       "RequestProto\022/\n\020query_identifier\030\001 \001(\0132\025" +
       ".QueryIdentifierProto\022\"\n\032fragment_identi" +
       "fier_string\030\002 \001(\t\" \n\036TerminateFragmentRe" +
-      "sponseProto\"\026\n\024GetTokenRequestProto\"&\n\025G",
-      "etTokenResponseProto\022\r\n\005token\030\001 \001(\014*2\n\020S" +
-      "ourceStateProto\022\017\n\013S_SUCCEEDED\020\001\022\r\n\tS_RU" +
-      "NNING\020\002*E\n\024SubmissionStateProto\022\014\n\010ACCEP" +
-      "TED\020\001\022\014\n\010REJECTED\020\002\022\021\n\rEVICTED_OTHER\020\0032\316" +
-      "\002\n\022LlapDaemonProtocol\022?\n\nsubmitWork\022\027.Su" +
-      "bmitWorkRequestProto\032\030.SubmitWorkRespons" +
-      "eProto\022W\n\022sourceStateUpdated\022\037.SourceSta" +
-      "teUpdatedRequestProto\032 .SourceStateUpdat" +
-      "edResponseProto\022H\n\rqueryComplete\022\032.Query" +
-      "CompleteRequestProto\032\033.QueryCompleteResp",
-      "onseProto\022T\n\021terminateFragment\022\036.Termina" +
-      "teFragmentRequestProto\032\037.TerminateFragme" +
-      "ntResponseProto2]\n\026LlapManagementProtoco" +
-      "l\022C\n\022getDelegationToken\022\025.GetTokenReques" +
-      "tProto\032\026.GetTokenResponseProtoBH\n&org.ap" +
-      "ache.hadoop.hive.llap.daemon.rpcB\030LlapDa" +
-      "emonProtocolProtos\210\001\001\240\001\001"
+      "sponseProto\"&\n\024GetTokenRequestProto\022\016\n\006a",
+      "pp_id\030\001 \001(\t\"&\n\025GetTokenResponseProto\022\r\n\005" +
+      "token\030\001 \001(\014*2\n\020SourceStateProto\022\017\n\013S_SUC" +
+      "CEEDED\020\001\022\r\n\tS_RUNNING\020\002*E\n\024SubmissionSta" +
+      "teProto\022\014\n\010ACCEPTED\020\001\022\014\n\010REJECTED\020\002\022\021\n\rE" +
+      "VICTED_OTHER\020\0032\316\002\n\022LlapDaemonProtocol\022?\n" +
+      "\nsubmitWork\022\027.SubmitWorkRequestProto\032\030.S" +
+      "ubmitWorkResponseProto\022W\n\022sourceStateUpd" +
+      "ated\022\037.SourceStateUpdatedRequestProto\032 ." +
+      "SourceStateUpdatedResponseProto\022H\n\rquery" +
+      "Complete\022\032.QueryCompleteRequestProto\032\033.Q",
+      "ueryCompleteResponseProto\022T\n\021terminateFr" +
+      "agment\022\036.TerminateFragmentRequestProto\032\037" +
+      ".TerminateFragmentResponseProto2]\n\026LlapM" +
+      "anagementProtocol\022C\n\022getDelegationToken\022" +
+      "\025.GetTokenRequestProto\032\026.GetTokenRespons" +
+      "eProtoBH\n&org.apache.hadoop.hive.llap.da" +
+      "emon.rpcB\030LlapDaemonProtocolProtos\210\001\001\240\001\001"
     };
     com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
       new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {
@@ -14533,7 +14704,7 @@ public final class LlapDaemonProtocolProtos {
           internal_static_GetTokenRequestProto_fieldAccessorTable = new
             com.google.protobuf.GeneratedMessage.FieldAccessorTable(
               internal_static_GetTokenRequestProto_descriptor,
-              new java.lang.String[] { });
+              new java.lang.String[] { "AppId", });
           internal_static_GetTokenResponseProto_descriptor =
             getDescriptor().getMessageTypes().get(16);
           internal_static_GetTokenResponseProto_fieldAccessorTable = new

http://git-wip-us.apache.org/repos/asf/hive/blob/868e5e14/llap-common/src/java/org/apache/hadoop/hive/llap/DaemonId.java
----------------------------------------------------------------------
diff --git a/llap-common/src/java/org/apache/hadoop/hive/llap/DaemonId.java b/llap-common/src/java/org/apache/hadoop/hive/llap/DaemonId.java
new file mode 100644
index 0000000..18355e6
--- /dev/null
+++ b/llap-common/src/java/org/apache/hadoop/hive/llap/DaemonId.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.llap;
+
+public class DaemonId {
+  private final String userName;
+  private final String clusterName;
+  private final String appId;
+  private final String hostName;
+  private final long startTime;
+
+  public DaemonId(String userName, String clusterName, String hostName, String appId,
+      long startTime) {
+    this.userName = userName;
+    this.clusterName = clusterName;
+    this.appId = appId;
+    this.hostName = hostName;
+    this.startTime = startTime;
+    // TODO: we could also get an unique number per daemon.
+  }
+
+  public String getClusterString() {
+    return userName + "_" + clusterName + "_" + appId;
+  }
+
+  public String getApplicationId() {
+    return appId;
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hive/blob/868e5e14/llap-common/src/java/org/apache/hadoop/hive/llap/security/LlapTokenIdentifier.java
----------------------------------------------------------------------
diff --git a/llap-common/src/java/org/apache/hadoop/hive/llap/security/LlapTokenIdentifier.java b/llap-common/src/java/org/apache/hadoop/hive/llap/security/LlapTokenIdentifier.java
index 23980d0..e28eddd 100644
--- a/llap-common/src/java/org/apache/hadoop/hive/llap/security/LlapTokenIdentifier.java
+++ b/llap-common/src/java/org/apache/hadoop/hive/llap/security/LlapTokenIdentifier.java
@@ -22,6 +22,7 @@ import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
 
+import org.apache.commons.lang.StringUtils;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.security.token.Token;
@@ -31,25 +32,32 @@ import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdenti
 public class LlapTokenIdentifier extends AbstractDelegationTokenIdentifier {
   private static final String KIND = "LLAP_TOKEN";
   public static final Text KIND_NAME = new Text(KIND);
+  private String clusterId;
+  private String appId;
 
   public LlapTokenIdentifier() {
     super();
   }
 
-  public LlapTokenIdentifier(Text owner, Text renewer, Text realUser) {
+  public LlapTokenIdentifier(Text owner, Text renewer, Text realUser,
+      String clusterId, String appId) {
     super(owner, renewer, realUser);
+    this.clusterId = clusterId;
+    this.appId = appId == null ? "" : appId;
   }
 
   @Override
   public void write(DataOutput out) throws IOException {
     super.write(out);
-    // Nothing right now.
+    out.writeUTF(clusterId);
+    out.writeUTF(appId);
   }
 
   @Override
   public void readFields(DataInput in) throws IOException {
     super.readFields(in);
-    // Nothing right now.
+    clusterId = in.readUTF();
+    appId = in.readUTF();
   }
 
   @Override
@@ -57,21 +65,34 @@ public class LlapTokenIdentifier extends AbstractDelegationTokenIdentifier {
     return KIND_NAME;
   }
 
+  public String getAppId() {
+    return appId;
+  }
+
+  public String getClusterId() {
+    return clusterId;
+  }
+
   @Override
   public int hashCode() {
-    // Nothing else right now.
-    return super.hashCode();
+    final int prime = 31;
+    int result = prime * super.hashCode() + ((appId == null) ? 0 : appId.hashCode());
+    return prime * result + ((clusterId == null) ? 0 : clusterId.hashCode());
   }
 
   @Override
-  public boolean equals(Object other) {
-    // Nothing else right now.
-    return super.equals(other);
+  public boolean equals(Object obj) {
+    if (this == obj) return true;
+    if (!(obj instanceof LlapTokenIdentifier) || !super.equals(obj)) return false;
+    LlapTokenIdentifier other = (LlapTokenIdentifier) obj;
+    return (appId == null ? other.appId == null : appId.equals(other.appId))
+        && (clusterId == null ? other.clusterId == null : clusterId.equals(other.clusterId));
   }
 
   @Override
   public String toString() {
-    return KIND + "; " + super.toString();
+    return KIND + "; " + super.toString() + ", cluster " + clusterId + ", app secret hash "
+        + (StringUtils.isBlank(appId) ? 0 : appId.hashCode());
   }
 
   @InterfaceAudience.Private

http://git-wip-us.apache.org/repos/asf/hive/blob/868e5e14/llap-common/src/java/org/apache/hadoop/hive/llap/security/LlapTokenProvider.java
----------------------------------------------------------------------
diff --git a/llap-common/src/java/org/apache/hadoop/hive/llap/security/LlapTokenProvider.java b/llap-common/src/java/org/apache/hadoop/hive/llap/security/LlapTokenProvider.java
index 2e99a28..edf9b18 100644
--- a/llap-common/src/java/org/apache/hadoop/hive/llap/security/LlapTokenProvider.java
+++ b/llap-common/src/java/org/apache/hadoop/hive/llap/security/LlapTokenProvider.java
@@ -23,5 +23,5 @@ import java.io.IOException;
 import org.apache.hadoop.security.token.Token;
 
 public interface LlapTokenProvider {
-  Token<LlapTokenIdentifier> getDelegationToken() throws IOException;
+  Token<LlapTokenIdentifier> getDelegationToken(String appId) throws IOException;
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hive/blob/868e5e14/llap-common/src/protobuf/LlapDaemonProtocol.proto
----------------------------------------------------------------------
diff --git a/llap-common/src/protobuf/LlapDaemonProtocol.proto b/llap-common/src/protobuf/LlapDaemonProtocol.proto
index 944c96c..5cdc02e 100644
--- a/llap-common/src/protobuf/LlapDaemonProtocol.proto
+++ b/llap-common/src/protobuf/LlapDaemonProtocol.proto
@@ -130,6 +130,7 @@ message TerminateFragmentResponseProto {
 }
 
 message GetTokenRequestProto {
+  optional string app_id = 1;
 }
 
 message GetTokenResponseProto {

http://git-wip-us.apache.org/repos/asf/hive/blob/868e5e14/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/ContainerRunner.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/ContainerRunner.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/ContainerRunner.java
index fc29371..c346aed 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/ContainerRunner.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/ContainerRunner.java
@@ -29,9 +29,12 @@ public interface ContainerRunner {
 
   SubmitWorkResponseProto submitWork(SubmitWorkRequestProto request) throws IOException;
 
-  SourceStateUpdatedResponseProto sourceStateUpdated(SourceStateUpdatedRequestProto request);
+  SourceStateUpdatedResponseProto sourceStateUpdated(
+      SourceStateUpdatedRequestProto request) throws IOException;
 
-  QueryCompleteResponseProto queryComplete(QueryCompleteRequestProto request);
+  QueryCompleteResponseProto queryComplete(
+      QueryCompleteRequestProto request) throws IOException;
 
-  TerminateFragmentResponseProto terminateFragment(TerminateFragmentRequestProto request);
+  TerminateFragmentResponseProto terminateFragment(
+      TerminateFragmentRequestProto request)  throws IOException;
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/868e5e14/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java
index 3d45c7a..78b37f7 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java
@@ -92,7 +92,7 @@ public class ContainerRunnerImpl extends CompositeService implements ContainerRu
       boolean enablePreemption, String[] localDirsBase, AtomicReference<Integer> localShufflePort,
       AtomicReference<InetSocketAddress> localAddress,
       long totalMemoryAvailableBytes, LlapDaemonExecutorMetrics metrics,
-      AMReporter amReporter, ClassLoader classLoader) {
+      AMReporter amReporter, ClassLoader classLoader, String clusterId) {
     super("ContainerRunnerImpl");
     this.conf = conf;
     Preconditions.checkState(numExecutors > 0,
@@ -101,7 +101,7 @@ public class ContainerRunnerImpl extends CompositeService implements ContainerRu
     this.localShufflePort = localShufflePort;
     this.amReporter = amReporter;
 
-    this.queryTracker = new QueryTracker(conf, localDirsBase);
+    this.queryTracker = new QueryTracker(conf, localDirsBase, clusterId);
     addIfService(queryTracker);
     String waitQueueSchedulerClassName = HiveConf.getVar(
         conf, ConfVars.LLAP_DAEMON_WAIT_QUEUE_COMPARATOR_CLASS_NAME);
@@ -175,7 +175,8 @@ public class ContainerRunnerImpl extends CompositeService implements ContainerRu
           fragmentSpec.getFragmentIdentifierString());
       int dagIdentifier = taskAttemptId.getTaskID().getVertexID().getDAGId().getId();
 
-      QueryIdentifier queryIdentifier = new QueryIdentifier(request.getApplicationIdString(), dagIdentifier);
+      QueryIdentifier queryIdentifier = new QueryIdentifier(
+          request.getApplicationIdString(), dagIdentifier);
 
       Credentials credentials = new Credentials();
       DataInputBuffer dib = new DataInputBuffer();
@@ -193,6 +194,7 @@ public class ContainerRunnerImpl extends CompositeService implements ContainerRu
               fragmentSpec.getAttemptNumber(), request.getUser(), request.getFragmentSpec(),
               jobToken);
 
+
       String[] localDirs = fragmentInfo.getLocalDirs();
       Preconditions.checkNotNull(localDirs);
       if (LOG.isDebugEnabled()) {
@@ -200,7 +202,6 @@ public class ContainerRunnerImpl extends CompositeService implements ContainerRu
       }
       // May need to setup localDir for re-localization, which is usually setup as Environment.PWD.
       // Used for re-localization, to add the user specified configuration (conf_pb_binary_stream)
-
       TaskRunnerCallable callable = new TaskRunnerCallable(request, fragmentInfo, new Configuration(getConfig()),
           new LlapExecutionContext(localAddress.get().getHostName(), queryTracker), env,
           credentials, memoryPerExecutor, amReporter, confParams, metrics, killedTaskHandler,
@@ -248,24 +249,23 @@ public class ContainerRunnerImpl extends CompositeService implements ContainerRu
 
   @Override
   public SourceStateUpdatedResponseProto sourceStateUpdated(
-      SourceStateUpdatedRequestProto request) {
+      SourceStateUpdatedRequestProto request) throws IOException {
     LOG.info("Processing state update: " + stringifySourceStateUpdateRequest(request));
-    queryTracker.registerSourceStateChange(
-        new QueryIdentifier(request.getQueryIdentifier().getAppIdentifier(),
-            request.getQueryIdentifier().getDagIdentifier()), request.getSrcName(),
-        request.getState());
+    QueryIdentifier queryId = new QueryIdentifier(request.getQueryIdentifier().getAppIdentifier(),
+        request.getQueryIdentifier().getDagIdentifier());
+    queryTracker.registerSourceStateChange(queryId, request.getSrcName(), request.getState());
     return SourceStateUpdatedResponseProto.getDefaultInstance();
   }
 
   @Override
-  public QueryCompleteResponseProto queryComplete(QueryCompleteRequestProto request) {
+  public QueryCompleteResponseProto queryComplete(
+      QueryCompleteRequestProto request) throws IOException {
     QueryIdentifier queryIdentifier =
         new QueryIdentifier(request.getQueryIdentifier().getAppIdentifier(),
             request.getQueryIdentifier().getDagIdentifier());
     LOG.info("Processing queryComplete notification for {}", queryIdentifier);
-    List<QueryFragmentInfo> knownFragments =
-        queryTracker
-            .queryComplete(queryIdentifier, request.getDeleteDelay());
+    List<QueryFragmentInfo> knownFragments = queryTracker.queryComplete(
+        queryIdentifier, request.getDeleteDelay(), false);
     LOG.info("DBG: Pending fragment count for completed query {} = {}", queryIdentifier,
         knownFragments.size());
     for (QueryFragmentInfo fragmentInfo : knownFragments) {
@@ -277,9 +277,16 @@ public class ContainerRunnerImpl extends CompositeService implements ContainerRu
   }
 
   @Override
-  public TerminateFragmentResponseProto terminateFragment(TerminateFragmentRequestProto request) {
-    LOG.info("DBG: Received terminateFragment request for {}", request.getFragmentIdentifierString());
-    executorService.killFragment(request.getFragmentIdentifierString());
+  public TerminateFragmentResponseProto terminateFragment(
+      TerminateFragmentRequestProto request) throws IOException {
+    String fragmentId = request.getFragmentIdentifierString();
+    LOG.info("DBG: Received terminateFragment request for {}", fragmentId);
+    // TODO: ideally, QueryTracker should have fragment-to-query mapping.
+    QueryIdentifier queryId = executorService.findQueryByFragment(fragmentId);
+    // checkPermissions returns false if query is not found, throws on failure.
+    if (queryId != null && queryTracker.checkPermissionsForQuery(queryId)) {
+      executorService.killFragment(fragmentId);
+    }
     return TerminateFragmentResponseProto.getDefaultInstance();
   }
 
@@ -355,8 +362,12 @@ public class ContainerRunnerImpl extends CompositeService implements ContainerRu
   @Override
   public void queryFailed(QueryIdentifier queryIdentifier) {
     LOG.info("Processing query failed notification for {}", queryIdentifier);
-    List<QueryFragmentInfo> knownFragments =
-        queryTracker.queryComplete(queryIdentifier, -1);
+    List<QueryFragmentInfo> knownFragments;
+    try {
+      knownFragments = queryTracker.queryComplete(queryIdentifier, -1, true);
+    } catch (IOException e) {
+      throw new RuntimeException(e); // Should never happen here, no permission check.
+    }
     LOG.info("DBG: Pending fragment count for failed query {} = {}", queryIdentifier,
         knownFragments.size());
     for (QueryFragmentInfo fragmentInfo : knownFragments) {

http://git-wip-us.apache.org/repos/asf/hive/blob/868e5e14/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java
index 63cb16b..d23a44a 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java
@@ -26,12 +26,14 @@ import java.util.List;
 import java.util.Set;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
+import java.util.regex.Pattern;
 
 import javax.management.ObjectName;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
+import org.apache.hadoop.hive.llap.DaemonId;
 import org.apache.hadoop.hive.llap.LlapUtil;
 import org.apache.hadoop.hive.llap.configuration.LlapDaemonConfiguration;
 import org.apache.hadoop.hive.llap.daemon.ContainerRunner;
@@ -57,11 +59,13 @@ import org.apache.hadoop.hive.ql.udf.generic.GenericUDF;
 import org.apache.hadoop.hive.ql.udf.generic.GenericUDFBridge;
 import org.apache.hadoop.hive.ql.udf.generic.GenericUDFBridge.UdfWhitelistChecker;
 import org.apache.hadoop.metrics2.util.MBeans;
+import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.service.CompositeService;
 import org.apache.hadoop.util.ExitUtil;
 import org.apache.hadoop.util.JvmPauseMonitor;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.yarn.api.ApplicationConstants;
+import org.apache.hadoop.yarn.util.ConverterUtils;
 import org.apache.hive.common.util.ShutdownHookManager;
 import org.apache.logging.log4j.core.config.Configurator;
 import org.slf4j.Logger;
@@ -97,6 +101,13 @@ public class LlapDaemon extends CompositeService implements ContainerRunner, Lla
   private final int numExecutors;
   private final long maxJvmMemory;
   private final String[] localDirs;
+  private final DaemonId daemonId;
+
+  private final static Pattern hostsRe = Pattern.compile("[^A-Za-z0-9_-]");
+  private static String generateClusterName(Configuration conf) {
+    String hosts = HiveConf.getTrimmedVar(conf, ConfVars.LLAP_DAEMON_SERVICE_HOSTS);
+    return hostsRe.matcher(hosts.startsWith("@") ? hosts.substring(1) : hosts).replaceAll("_");
+  }
 
   // TODO Not the best way to share the address
   private final AtomicReference<InetSocketAddress> srvAddress = new AtomicReference<>(),
@@ -105,11 +116,10 @@ public class LlapDaemon extends CompositeService implements ContainerRunner, Lla
 
   public LlapDaemon(Configuration daemonConf, int numExecutors, long executorMemoryBytes,
       boolean ioEnabled, boolean isDirectCache, long ioMemoryBytes, String[] localDirs, int srvPort,
-      int mngPort, int shufflePort, int webPort) {
+      int mngPort, int shufflePort, int webPort, String appName) {
     super("LlapDaemon");
 
     initializeLogging();
-
     printAsciiArt();
 
     Preconditions.checkArgument(numExecutors > 0);
@@ -129,6 +139,14 @@ public class LlapDaemon extends CompositeService implements ContainerRunner, Lla
           "LLAP service hosts startswith '@' but hive.zookeeper.quorum is not set." +
               " hive.zookeeper.quorum must be set.");
     }
+    String hostName = MetricsUtils.getHostName();
+    try {
+      daemonId = new DaemonId(UserGroupInformation.getCurrentUser().getUserName(),
+          generateClusterName(daemonConf), hostName, appName, System.currentTimeMillis());
+    } catch (IOException ex) {
+      throw new RuntimeException(ex);
+    }
+
 
     this.maxJvmMemory = getTotalHeapSize();
     this.llapIoEnabled = ioEnabled;
@@ -193,7 +211,7 @@ public class LlapDaemon extends CompositeService implements ContainerRunner, Lla
     LlapMetricsSystem.initialize("LlapDaemon");
     this.pauseMonitor = new JvmPauseMonitor(daemonConf);
     pauseMonitor.start();
-    String displayName = "LlapDaemonExecutorMetrics-" + MetricsUtils.getHostName();
+    String displayName = "LlapDaemonExecutorMetrics-" + hostName;
     String sessionId = MetricsUtils.getUUID();
     daemonConf.set("llap.daemon.metrics.sessionid", sessionId);
     String[] strIntervals = HiveConf.getTrimmedStringsVar(daemonConf,
@@ -223,11 +241,11 @@ public class LlapDaemon extends CompositeService implements ContainerRunner, Lla
     this.amReporter = new AMReporter(srvAddress, new QueryFailedHandlerProxy(), daemonConf);
 
     this.server = new LlapProtocolServerImpl(
-        numHandlers, this, srvAddress, mngAddress, srvPort, mngPort);
+        numHandlers, this, srvAddress, mngAddress, srvPort, mngPort, daemonId);
 
     this.containerRunner = new ContainerRunnerImpl(daemonConf, numExecutors, waitQueueSize,
         enablePreemption, localDirs, this.shufflePort, srvAddress, executorMemoryBytes, metrics,
-        amReporter, executorClassLoader);
+        amReporter, executorClassLoader, daemonId.getClusterString());
     addIfService(containerRunner);
 
     // Not adding the registry as a service, since we need to control when it is initialized - conf used to pickup properties.
@@ -377,10 +395,18 @@ public class LlapDaemon extends CompositeService implements ContainerRunner, Lla
       LlapDaemonConfiguration daemonConf = new LlapDaemonConfiguration();
 
       String containerIdStr = System.getenv(ApplicationConstants.Environment.CONTAINER_ID.name());
+      String appName = null;
       if (containerIdStr != null && !containerIdStr.isEmpty()) {
         daemonConf.set(ConfVars.LLAP_DAEMON_CONTAINER_ID.varname, containerIdStr);
+        appName = ConverterUtils.toContainerId(containerIdStr)
+            .getApplicationAttemptId().getApplicationId().toString();
       } else {
         daemonConf.unset(ConfVars.LLAP_DAEMON_CONTAINER_ID.varname);
+        // Note, we assume production LLAP always runs under YARN.
+        LOG.error("Cannot find " + ApplicationConstants.Environment.CONTAINER_ID.toString()
+            + "; LLAP tokens may grant access to subsequent instances of the cluster with"
+            + " the same name");
+        appName = null;
       }
 
       int numExecutors = HiveConf.getIntVar(daemonConf, ConfVars.LLAP_DAEMON_NUM_EXECUTORS);
@@ -400,7 +426,8 @@ public class LlapDaemon extends CompositeService implements ContainerRunner, Lla
       boolean isDirectCache = HiveConf.getBoolVar(daemonConf, ConfVars.LLAP_ALLOCATOR_DIRECT);
       boolean isLlapIo = HiveConf.getBoolVar(daemonConf, HiveConf.ConfVars.LLAP_IO_ENABLED, true);
       llapDaemon = new LlapDaemon(daemonConf, numExecutors, executorMemoryBytes, isLlapIo,
-              isDirectCache, ioMemoryBytes, localDirs, rpcPort, mngPort, shufflePort, webPort);
+          isDirectCache, ioMemoryBytes, localDirs, rpcPort, mngPort, shufflePort, webPort,
+          appName);
 
       LOG.info("Adding shutdown hook for LlapDaemon");
       ShutdownHookManager.addShutdownHook(new CompositeServiceShutdownHook(llapDaemon), 1);
@@ -420,24 +447,27 @@ public class LlapDaemon extends CompositeService implements ContainerRunner, Lla
   }
 
   @Override
-  public SubmitWorkResponseProto submitWork(SubmitWorkRequestProto request) throws
-      IOException {
+  public SubmitWorkResponseProto submitWork(
+      SubmitWorkRequestProto request) throws IOException {
     numSubmissions.incrementAndGet();
     return containerRunner.submitWork(request);
   }
 
   @Override
-  public SourceStateUpdatedResponseProto sourceStateUpdated(SourceStateUpdatedRequestProto request) {
+  public SourceStateUpdatedResponseProto sourceStateUpdated(
+      SourceStateUpdatedRequestProto request) throws IOException {
     return containerRunner.sourceStateUpdated(request);
   }
 
   @Override
-  public QueryCompleteResponseProto queryComplete(QueryCompleteRequestProto request) {
+  public QueryCompleteResponseProto queryComplete(
+      QueryCompleteRequestProto request) throws IOException {
     return containerRunner.queryComplete(request);
   }
 
   @Override
-  public TerminateFragmentResponseProto terminateFragment(TerminateFragmentRequestProto request) {
+  public TerminateFragmentResponseProto terminateFragment(
+      TerminateFragmentRequestProto request) throws IOException {
     return containerRunner.terminateFragment(request);
   }
 

http://git-wip-us.apache.org/repos/asf/hive/blob/868e5e14/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapProtocolServerImpl.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapProtocolServerImpl.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapProtocolServerImpl.java
index dae1a3a..db8bfa6 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapProtocolServerImpl.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapProtocolServerImpl.java
@@ -32,6 +32,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
+import org.apache.hadoop.hive.llap.DaemonId;
 import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos;
 import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.GetTokenRequestProto;
 import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.GetTokenResponseProto;
@@ -71,13 +72,11 @@ public class LlapProtocolServerImpl extends AbstractService
   private final AtomicReference<InetSocketAddress> srvAddress, mngAddress;
   private SecretManager zkSecretManager;
   private String restrictedToUser = null;
+  private final DaemonId daemonId;
 
-  public LlapProtocolServerImpl(int numHandlers,
-                                ContainerRunner containerRunner,
-                                AtomicReference<InetSocketAddress> srvAddress,
-                                AtomicReference<InetSocketAddress> mngAddress,
-                                int srvPort,
-                                int mngPort) {
+  public LlapProtocolServerImpl(int numHandlers, ContainerRunner containerRunner,
+      AtomicReference<InetSocketAddress> srvAddress, AtomicReference<InetSocketAddress> mngAddress,
+      int srvPort, int mngPort, DaemonId daemonId) {
     super("LlapDaemonProtocolServerImpl");
     this.numHandlers = numHandlers;
     this.containerRunner = containerRunner;
@@ -85,14 +84,14 @@ public class LlapProtocolServerImpl extends AbstractService
     this.srvPort = srvPort;
     this.mngAddress = mngAddress;
     this.mngPort = mngPort;
+    this.daemonId = daemonId;
     LOG.info("Creating: " + LlapProtocolServerImpl.class.getSimpleName() +
         " with port configured to: " + srvPort);
   }
 
   @Override
   public SubmitWorkResponseProto submitWork(RpcController controller,
-                                            SubmitWorkRequestProto request) throws
-      ServiceException {
+      SubmitWorkRequestProto request) throws ServiceException {
     try {
       return containerRunner.submitWork(request);
     } catch (IOException e) {
@@ -103,20 +102,31 @@ public class LlapProtocolServerImpl extends AbstractService
   @Override
   public SourceStateUpdatedResponseProto sourceStateUpdated(RpcController controller,
       SourceStateUpdatedRequestProto request) throws ServiceException {
-    return containerRunner.sourceStateUpdated(request);
+    try {
+      return containerRunner.sourceStateUpdated(request);
+    } catch (IOException e) {
+      throw new ServiceException(e);
+    }
   }
 
   @Override
   public QueryCompleteResponseProto queryComplete(RpcController controller,
       QueryCompleteRequestProto request) throws ServiceException {
-    return containerRunner.queryComplete(request);
+    try {
+      return containerRunner.queryComplete(request);
+    } catch (IOException e) {
+      throw new ServiceException(e);
+    }
   }
 
   @Override
   public TerminateFragmentResponseProto terminateFragment(
-      RpcController controller,
-      TerminateFragmentRequestProto request) throws ServiceException {
-    return containerRunner.terminateFragment(request);
+      RpcController controller, TerminateFragmentRequestProto request) throws ServiceException {
+    try {
+      return containerRunner.terminateFragment(request);
+    } catch (IOException e) {
+      throw new ServiceException(e);
+    }
   }
 
   @Override
@@ -140,7 +150,7 @@ public class LlapProtocolServerImpl extends AbstractService
     }
     String llapPrincipal = HiveConf.getVar(conf, ConfVars.LLAP_KERBEROS_PRINCIPAL),
         llapKeytab = HiveConf.getVar(conf, ConfVars.LLAP_KERBEROS_KEYTAB_FILE);
-    zkSecretManager = SecretManager.createSecretManager(conf, llapPrincipal, llapKeytab);
+    zkSecretManager = SecretManager.createSecretManager(conf, llapPrincipal, llapKeytab, daemonId);
 
     // Start the protocol server after properly authenticating with daemon keytab.
     UserGroupInformation daemonUgi = null;
@@ -275,7 +285,8 @@ public class LlapProtocolServerImpl extends AbstractService
       realUser = new Text(ugi.getRealUser().getUserName());
     }
     Text renewer = new Text(ugi.getShortUserName());
-    LlapTokenIdentifier llapId = new LlapTokenIdentifier(owner, renewer, realUser);
+    LlapTokenIdentifier llapId = new LlapTokenIdentifier(owner, renewer, realUser,
+        daemonId.getClusterString(), request.hasAppId() ? request.getAppId() : null);
     // TODO: note that the token is not renewable right now and will last for 2 weeks by default.
     Token<LlapTokenIdentifier> token = new Token<LlapTokenIdentifier>(llapId, zkSecretManager);
     if (LOG.isInfoEnabled()) {

http://git-wip-us.apache.org/repos/asf/hive/blob/868e5e14/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapTokenChecker.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapTokenChecker.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapTokenChecker.java
new file mode 100644
index 0000000..03ee055
--- /dev/null
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapTokenChecker.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.llap.daemon.impl;
+
+import com.google.common.annotations.VisibleForTesting;
+
+import java.util.ArrayList;
+
+import java.util.List;
+
+import java.io.IOException;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.lang3.tuple.ImmutablePair;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.hadoop.hive.llap.security.LlapTokenIdentifier;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.TokenIdentifier;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public final class LlapTokenChecker {
+  private static final Logger LOG = LoggerFactory.getLogger(LlapTokenChecker.class);
+
+  private static final ImmutablePair<String, String> NO_SECURITY = new ImmutablePair<>(null, null);
+  public static Pair<String, String> getTokenInfo(String clusterId) throws IOException {
+    if (!UserGroupInformation.isSecurityEnabled()) return NO_SECURITY;
+    UserGroupInformation current = UserGroupInformation.getCurrentUser();
+    String kerberosName = current.hasKerberosCredentials() ? current.getShortUserName() : null;
+    List<LlapTokenIdentifier> tokens = getLlapTokens(current, clusterId);
+    if ((tokens == null || tokens.isEmpty()) && kerberosName == null) {
+      throw new SecurityException("No tokens or kerberos for " + current);
+    }
+    return getTokenInfoInternal(kerberosName, tokens);
+  }
+
+  private static List<LlapTokenIdentifier> getLlapTokens(
+      UserGroupInformation ugi, String clusterId) {
+    List<LlapTokenIdentifier> tokens = null;
+    for (TokenIdentifier id : ugi.getTokenIdentifiers()) {
+      if (!LlapTokenIdentifier.KIND_NAME.equals(id.getKind())) continue;
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Token {}", id);
+      }
+      LlapTokenIdentifier llapId = (LlapTokenIdentifier)id;
+      if (!clusterId.equals(llapId.getClusterId())) continue;
+      if (tokens == null) {
+        tokens = new ArrayList<>();
+      }
+      tokens.add((LlapTokenIdentifier)id);
+    }
+    return tokens;
+  }
+
+  @VisibleForTesting
+  static Pair<String, String> getTokenInfoInternal(
+      String kerberosName, List<LlapTokenIdentifier> tokens) {
+    assert (tokens != null && !tokens.isEmpty()) || kerberosName != null;
+    if (tokens == null) {
+      return new ImmutablePair<String, String>(kerberosName, null);
+    }
+    String userName = kerberosName, appId = null;
+    for (LlapTokenIdentifier llapId : tokens) {
+      String newUserName = llapId.getRealUser().toString();
+      if (userName != null && !userName.equals(newUserName)) {
+        throw new SecurityException("Ambiguous user name from credentials - " + userName
+            + " and " + newUserName + " from " + llapId
+            + ((kerberosName == null) ? ("; has kerberos credentials for " + kerberosName) : ""));
+      }
+      userName = newUserName;
+      String newAppId = llapId.getAppId();
+      if (!StringUtils.isEmpty(newAppId)) {
+        if (!StringUtils.isEmpty(appId) && !appId.equals(newAppId)) {
+          throw new SecurityException("Ambiguous app ID from credentials - " + appId
+              + " and " + newAppId + " from " + llapId);
+        }
+        appId = newAppId;
+      }
+    }
+    assert userName != null;
+    return new ImmutablePair<String, String>(userName, appId);
+  }
+
+  public static void checkPermissions(
+      String clusterId, String userName, String appId, Object hint) throws IOException {
+    if (!UserGroupInformation.isSecurityEnabled()) return;
+    UserGroupInformation current = UserGroupInformation.getCurrentUser();
+    String kerberosName = current.hasKerberosCredentials() ? current.getShortUserName() : null;
+    List<LlapTokenIdentifier> tokens = getLlapTokens(current, clusterId);
+    checkPermissionsInternal(kerberosName, tokens, userName, appId, hint);
+  }
+
+  @VisibleForTesting
+  static void checkPermissionsInternal(String kerberosName, List<LlapTokenIdentifier> tokens,
+      String userName, String appId, Object hint) {
+    if (kerberosName != null && StringUtils.isEmpty(appId) && kerberosName.equals(userName)) {
+      return;
+    }
+    if (tokens != null) {
+      for (LlapTokenIdentifier llapId : tokens) {
+        String tokenUser = llapId.getRealUser().toString(), tokenAppId = llapId.getAppId();
+        if (checkTokenPermissions(userName, appId, tokenUser, tokenAppId)) return;
+      }
+    }
+    throw new SecurityException("Unauthorized to access "
+        + userName + ", " + appId.hashCode() + " (" + hint + ")");
+  }
+
+  public static void checkPermissions(
+      Pair<String, String> prm, String userName, String appId, Object hint) {
+    if (userName == null) {
+      assert StringUtils.isEmpty(appId);
+      return;
+    }
+    if (!checkTokenPermissions(userName, appId, prm.getLeft(), prm.getRight())) {
+      throw new SecurityException("Unauthorized to access "
+          + userName + ", " + appId.hashCode() + " (" + hint + ")");
+    }
+  }
+
+  private static boolean checkTokenPermissions(
+      String userName, String appId, String tokenUser, String tokenAppId) {
+    return userName.equals(tokenUser)
+        && (StringUtils.isEmpty(appId) || appId.equals(tokenAppId));
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hive/blob/868e5e14/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryInfo.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryInfo.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryInfo.java
index 64c2b58..8daef9e 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryInfo.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryInfo.java
@@ -55,10 +55,11 @@ public class QueryInfo {
   private final ConcurrentMap<String, SourceStateProto> sourceStateMap;
 
   private final FinishableStateTracker finishableStateTracker = new FinishableStateTracker();
+  private final String tokenUserName, appId;
 
-  public QueryInfo(QueryIdentifier queryIdentifier, String appIdString, String dagName, int dagIdentifier,
-                   String user, ConcurrentMap<String, SourceStateProto> sourceStateMap,
-                   String[] localDirsBase, FileSystem localFs) {
+  public QueryInfo(QueryIdentifier queryIdentifier, String appIdString, String dagName,
+      int dagIdentifier, String user, ConcurrentMap<String, SourceStateProto> sourceStateMap,
+      String[] localDirsBase, FileSystem localFs, String tokenUserName, String tokenAppId) {
     this.queryIdentifier = queryIdentifier;
     this.appIdString = appIdString;
     this.dagName = dagName;
@@ -67,6 +68,8 @@ public class QueryInfo {
     this.user = user;
     this.localDirsBase = localDirsBase;
     this.localFs = localFs;
+    this.tokenUserName = tokenUserName;
+    this.appId = tokenAppId;
   }
 
   public QueryIdentifier getQueryIdentifier() {
@@ -270,4 +273,12 @@ public class QueryInfo {
       this.lastFinishableState = lastFinishableState;
     }
   }
+
+  public String getTokenUserName() {
+    return tokenUserName;
+  }
+
+  public String getTokenAppId() {
+    return appId;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/868e5e14/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryTracker.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryTracker.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryTracker.java
index 14657e6..cb3be2b 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryTracker.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryTracker.java
@@ -18,6 +18,8 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.lang3.tuple.Pair;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.security.token.Token;
 import org.apache.tez.common.CallableWithNdc;
@@ -60,6 +62,7 @@ public class QueryTracker extends AbstractService {
 
   private final String[] localDirsBase;
   private final FileSystem localFs;
+  private final String clusterId;
   private final long defaultDeleteDelaySeconds;
 
   // TODO At the moment there's no way of knowing whether a query is running or not.
@@ -89,9 +92,10 @@ public class QueryTracker extends AbstractService {
   private final ConcurrentHashMap<QueryIdentifier, String> queryIdentifierToHiveQueryId =
       new ConcurrentHashMap<>();
 
-  public QueryTracker(Configuration conf, String[] localDirsBase) {
+  public QueryTracker(Configuration conf, String[] localDirsBase, String clusterId) {
     super("QueryTracker");
     this.localDirsBase = localDirsBase;
+    this.clusterId = clusterId;
     try {
       localFs = FileSystem.getLocal(conf);
     } catch (IOException e) {
@@ -119,35 +123,50 @@ public class QueryTracker extends AbstractService {
    * @param user
    * @throws IOException
    */
-  QueryFragmentInfo registerFragment(QueryIdentifier queryIdentifier, String appIdString, String dagName,
-                                     int dagIdentifier, String vertexName, int fragmentNumber, int attemptNumber, String user,
-                                     FragmentSpecProto fragmentSpec, Token<JobTokenIdentifier> appToken) throws IOException {
+  QueryFragmentInfo registerFragment(QueryIdentifier queryIdentifier, String appIdString,
+      String dagName, int dagIdentifier, String vertexName, int fragmentNumber, int attemptNumber,
+      String user, FragmentSpecProto fragmentSpec, Token<JobTokenIdentifier> appToken)
+          throws IOException {
     ReadWriteLock dagLock = getDagLock(queryIdentifier);
     dagLock.readLock().lock();
     try {
-      if (!completedDagMap.contains(queryIdentifier)) {
-        QueryInfo queryInfo = queryInfoMap.get(queryIdentifier);
-        if (queryInfo == null) {
-          queryInfo = new QueryInfo(queryIdentifier, appIdString, dagName, dagIdentifier, user,
-              getSourceCompletionMap(queryIdentifier), localDirsBase, localFs);
-          queryInfoMap.putIfAbsent(queryIdentifier, queryInfo);
-        }
-
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("Registering request for {} with the ShuffleHandler", queryIdentifier);
-        }
-        ShuffleHandler.get()
-            .registerDag(appIdString, dagIdentifier, appToken,
-                user, queryInfo.getLocalDirs());
-
-        return queryInfo.registerFragment(vertexName, fragmentNumber, attemptNumber, fragmentSpec);
-      } else {
+      if (completedDagMap.contains(queryIdentifier)) {
         // Cleanup the dag lock here, since it may have been created after the query completed
         dagSpecificLocks.remove(queryIdentifier);
         throw new RuntimeException(
             "Dag " + dagName + " already complete. Rejecting fragment ["
                 + vertexName + ", " + fragmentNumber + ", " + attemptNumber + "]");
       }
+      // TODO: for now, we get the secure username out of UGI... after signing, we can take it
+      //       out of the request provided that it's signed.
+      Pair<String, String> tokenInfo = LlapTokenChecker.getTokenInfo(clusterId);
+      boolean isExistingQueryInfo = true;
+      QueryInfo queryInfo = queryInfoMap.get(queryIdentifier);
+      if (queryInfo == null) {
+        queryInfo = new QueryInfo(queryIdentifier, appIdString, dagName, dagIdentifier, user,
+            getSourceCompletionMap(queryIdentifier), localDirsBase, localFs,
+            tokenInfo.getLeft(), tokenInfo.getRight());
+        QueryInfo old = queryInfoMap.putIfAbsent(queryIdentifier, queryInfo);
+        if (old != null) {
+          queryInfo = old;
+        } else {
+          isExistingQueryInfo = false;
+        }
+      }
+      if (isExistingQueryInfo) {
+        // We already retrieved the incoming info, check without UGI.
+        LlapTokenChecker.checkPermissions(tokenInfo, queryInfo.getTokenUserName(),
+            queryInfo.getTokenAppId(), queryInfo.getQueryIdentifier());
+      }
+
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Registering request for {} with the ShuffleHandler", queryIdentifier);
+      }
+      ShuffleHandler.get()
+          .registerDag(appIdString, dagIdentifier, appToken,
+              user, queryInfo.getLocalDirs());
+
+      return queryInfo.registerFragment(vertexName, fragmentNumber, attemptNumber, fragmentSpec);
     } finally {
       dagLock.readLock().unlock();
     }
@@ -174,17 +193,20 @@ public class QueryTracker extends AbstractService {
    * @param queryIdentifier
    * @param deleteDelay
    */
-  List<QueryFragmentInfo> queryComplete(QueryIdentifier queryIdentifier, long deleteDelay) {
+  List<QueryFragmentInfo> queryComplete(QueryIdentifier queryIdentifier, long deleteDelay,
+      boolean isInternal) throws IOException {
     if (deleteDelay == -1) {
       deleteDelay = defaultDeleteDelaySeconds;
     }
     ReadWriteLock dagLock = getDagLock(queryIdentifier);
     dagLock.writeLock().lock();
     try {
+      QueryInfo queryInfo = isInternal
+          ? queryInfoMap.get(queryIdentifier) : checkPermissionsAndGetQuery(queryIdentifier);
       rememberCompletedDag(queryIdentifier);
       LOG.info("Processing queryComplete for queryIdentifier={} with deleteDelay={} seconds", queryIdentifier,
           deleteDelay);
-      QueryInfo queryInfo = queryInfoMap.remove(queryIdentifier);
+      queryInfoMap.remove(queryIdentifier);
       if (queryInfo == null) {
         LOG.warn("Ignoring query complete for unknown dag: {}", queryIdentifier);
         return Collections.emptyList();
@@ -229,9 +251,10 @@ public class QueryTracker extends AbstractService {
    * @param sourceName
    * @param sourceState
    */
-  void registerSourceStateChange(QueryIdentifier queryIdentifier, String sourceName, SourceStateProto sourceState) {
+  void registerSourceStateChange(QueryIdentifier queryIdentifier, String sourceName,
+      SourceStateProto sourceState) throws IOException {
     getSourceCompletionMap(queryIdentifier).put(sourceName, sourceState);
-    QueryInfo queryInfo = queryInfoMap.get(queryIdentifier);
+    QueryInfo queryInfo = checkPermissionsAndGetQuery(queryIdentifier);
     if (queryInfo != null) {
       queryInfo.sourceStateUpdated(sourceName);
     } else {
@@ -322,4 +345,16 @@ public class QueryTracker extends AbstractService {
       return null;
     }
   }
+
+  private QueryInfo checkPermissionsAndGetQuery(QueryIdentifier queryId) throws IOException {
+    QueryInfo queryInfo = queryInfoMap.get(queryId);
+    if (queryInfo == null) return null;
+    LlapTokenChecker.checkPermissions(clusterId, queryInfo.getTokenAppId(),
+        queryInfo.getTokenUserName(), queryInfo.getQueryIdentifier());
+    return queryInfo;
+  }
+
+  public boolean checkPermissionsForQuery(QueryIdentifier queryId) throws IOException {
+    return checkPermissionsAndGetQuery(queryId) != null;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/868e5e14/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/Scheduler.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/Scheduler.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/Scheduler.java
index 26c8e55..fd6234a 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/Scheduler.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/Scheduler.java
@@ -44,4 +44,6 @@ public interface Scheduler<T> {
   void killFragment(String fragmentId);
 
   Set<String> getExecutorsStatus();
+
+  QueryIdentifier findQueryByFragment(String fragmentId);
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/868e5e14/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java
index f621af2..1933eb1 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java
@@ -420,6 +420,15 @@ public class TaskExecutorService extends AbstractService implements Scheduler<Ta
   }
 
   @Override
+  public QueryIdentifier findQueryByFragment(String fragmentId) {
+    synchronized (lock) {
+      TaskWrapper taskWrapper = knownTasks.get(fragmentId);
+      return taskWrapper == null ? null : taskWrapper.getTaskRunnerCallable()
+          .getFragmentInfo().getQueryInfo().getQueryIdentifier();
+    }
+  }
+
+  @Override
   public void killFragment(String fragmentId) {
     synchronized (lock) {
       TaskWrapper taskWrapper = knownTasks.remove(fragmentId);

http://git-wip-us.apache.org/repos/asf/hive/blob/868e5e14/llap-server/src/java/org/apache/hadoop/hive/llap/security/LlapSecurityHelper.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/security/LlapSecurityHelper.java b/llap-server/src/java/org/apache/hadoop/hive/llap/security/LlapSecurityHelper.java
index 76ba225..f958bc4 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/security/LlapSecurityHelper.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/security/LlapSecurityHelper.java
@@ -26,6 +26,7 @@ import java.util.concurrent.TimeUnit;
 
 import javax.net.SocketFactory;
 
+import org.apache.commons.lang3.StringUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.llap.impl.LlapManagementProtocolClientImpl;
 import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.GetTokenRequestProto;
@@ -81,7 +82,7 @@ public class LlapSecurityHelper implements LlapTokenProvider {
   }
 
   @Override
-  public Token<LlapTokenIdentifier> getDelegationToken() throws IOException {
+  public Token<LlapTokenIdentifier> getDelegationToken(String appId) throws IOException {
     if (!UserGroupInformation.isSecurityEnabled()) return null;
     if (llapUgi == null) {
       llapUgi = UserGroupInformation.getCurrentUser();
@@ -98,7 +99,7 @@ public class LlapSecurityHelper implements LlapTokenProvider {
     boolean hasRefreshed = false;
     while (true) {
       try {
-        tokenBytes = getTokenBytes();
+        tokenBytes = getTokenBytes(appId);
         break;
       } catch (InterruptedException ie) {
         throw new RuntimeException(ie);
@@ -128,7 +129,8 @@ public class LlapSecurityHelper implements LlapTokenProvider {
     return token;
   }
 
-  private ByteString getTokenBytes() throws InterruptedException, IOException {
+  private ByteString getTokenBytes(
+      final String appId) throws InterruptedException, IOException {
     return llapUgi.doAs(new PrivilegedExceptionAction<ByteString>() {
       @Override
       public ByteString run() throws Exception {
@@ -138,8 +140,11 @@ public class LlapSecurityHelper implements LlapTokenProvider {
               clientInstance.getManagementPort(), retryPolicy, socketFactory);
         }
         // Client only connects on the first call, so this has to be done in doAs.
-        GetTokenRequestProto req = GetTokenRequestProto.newBuilder().build();
-        return client.getDelegationToken(null, req).getToken();
+        GetTokenRequestProto.Builder req = GetTokenRequestProto.newBuilder();
+        if (!StringUtils.isBlank(appId)) {
+          req.setAppId(appId);
+        }
+        return client.getDelegationToken(null, req.build()).getToken();
       }
     });
   }

http://git-wip-us.apache.org/repos/asf/hive/blob/868e5e14/llap-server/src/java/org/apache/hadoop/hive/llap/security/SecretManager.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/security/SecretManager.java b/llap-server/src/java/org/apache/hadoop/hive/llap/security/SecretManager.java
index 8c7a539..c54e726 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/security/SecretManager.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/security/SecretManager.java
@@ -23,6 +23,7 @@ import java.util.regex.Pattern;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
+import org.apache.hadoop.hive.llap.DaemonId;
 import org.apache.hadoop.hive.llap.security.LlapTokenIdentifier;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.token.Token;
@@ -33,6 +34,7 @@ import org.slf4j.LoggerFactory;
 
 public class SecretManager extends ZKDelegationTokenSecretManager<LlapTokenIdentifier> {
   private static final Logger LOG = LoggerFactory.getLogger(SecretManager.class);
+
   public SecretManager(Configuration conf) {
     super(conf);
     checkForZKDTSMBug(conf);
@@ -82,16 +84,8 @@ public class SecretManager extends ZKDelegationTokenSecretManager<LlapTokenIdent
     return id;
   }
 
-  private final static Pattern hostsRe = Pattern.compile("[^A-Za-z0-9_-]");
-  private static String deriveZkPath(Configuration conf) throws IOException {
-    String hosts = HiveConf.getTrimmedVar(conf, ConfVars.LLAP_DAEMON_SERVICE_HOSTS);
-    String clusterName = hosts.startsWith("@") ? hosts.substring(1) : hosts;
-    String userName = UserGroupInformation.getCurrentUser().getShortUserName();
-    return hostsRe.matcher(userName + "_" + clusterName).replaceAll("_") ;
-  }
-
   public static SecretManager createSecretManager(
-      final Configuration conf, String llapPrincipal, String llapKeytab) {
+      final Configuration conf, String llapPrincipal, String llapKeytab, DaemonId daemonId) {
     // Create ZK connection under a separate ugi (if specified) - ZK works in mysterious ways.
     UserGroupInformation zkUgi = null;
     String principal = HiveConf.getVar(conf, ConfVars.LLAP_ZKSM_KERBEROS_PRINCIPAL, llapPrincipal);
@@ -110,12 +104,7 @@ public class SecretManager extends ZKDelegationTokenSecretManager<LlapTokenIdent
     zkConf.setLong(DelegationTokenManager.RENEW_INTERVAL, tokenLifetime);
     zkConf.set(SecretManager.ZK_DTSM_ZK_KERBEROS_PRINCIPAL, principal);
     zkConf.set(SecretManager.ZK_DTSM_ZK_KERBEROS_KEYTAB, keyTab);
-    String zkPath;
-    try {
-      zkPath = deriveZkPath(conf);
-    } catch (IOException e) {
-      throw new RuntimeException(e);
-    }
+    String zkPath = daemonId.getClusterString();
     LOG.info("Using {} as ZK secret manager path", zkPath);
     zkConf.set(SecretManager.ZK_DTSM_ZNODE_WORKING_PATH, "zkdtsm_" + zkPath);
     setZkConfIfNotSet(zkConf, SecretManager.ZK_DTSM_ZK_AUTH_TYPE, "sasl");

http://git-wip-us.apache.org/repos/asf/hive/blob/868e5e14/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/MiniLlapCluster.java
----------------------------------------------------------------------
diff --git a/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/MiniLlapCluster.java b/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/MiniLlapCluster.java
index 610f266..dde5be0 100644
--- a/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/MiniLlapCluster.java
+++ b/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/MiniLlapCluster.java
@@ -193,7 +193,7 @@ public class MiniLlapCluster extends AbstractService {
     LOG.info("Initializing {} llap instances for MiniLlapCluster with name={}", numInstances, clusterNameTrimmed);
     for (int i = 0 ;i < numInstances ; i++) {
       llapDaemons[i] = new LlapDaemon(conf, numExecutorsPerService, execBytesPerService, llapIoEnabled,
-          ioIsDirect, ioBytesPerService, localDirs, rpcPort, mngPort, shufflePort, webPort);
+          ioIsDirect, ioBytesPerService, localDirs, rpcPort, mngPort, shufflePort, webPort, clusterNameTrimmed);
       llapDaemons[i].init(new Configuration(conf));
     }
     LOG.info("Initialized {} llap instances for MiniLlapCluster with name={}", numInstances, clusterNameTrimmed);

http://git-wip-us.apache.org/repos/asf/hive/blob/868e5e14/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorTestHelpers.java
----------------------------------------------------------------------
diff --git a/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorTestHelpers.java b/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorTestHelpers.java
index 24f4442..c6ba14e 100644
--- a/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorTestHelpers.java
+++ b/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorTestHelpers.java
@@ -78,7 +78,7 @@ public class TaskExecutorTestHelpers {
     QueryInfo queryInfo =
         new QueryInfo(queryIdentifier, "fake_app_id_string", "fake_dag_name", 1, "fakeUser",
             new ConcurrentHashMap<String, LlapDaemonProtocolProtos.SourceStateProto>(),
-            new String[0], null);
+            new String[0], null, null, null);
     return queryInfo;
   }
 

http://git-wip-us.apache.org/repos/asf/hive/blob/868e5e14/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestLlapDaemonProtocolServerImpl.java
----------------------------------------------------------------------
diff --git a/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestLlapDaemonProtocolServerImpl.java b/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestLlapDaemonProtocolServerImpl.java
index a65bf5c..fd37a06 100644
--- a/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestLlapDaemonProtocolServerImpl.java
+++ b/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestLlapDaemonProtocolServerImpl.java
@@ -46,7 +46,7 @@ public class TestLlapDaemonProtocolServerImpl {
     LlapProtocolServerImpl server =
         new LlapProtocolServerImpl(numHandlers, containerRunnerMock,
            new AtomicReference<InetSocketAddress>(), new AtomicReference<InetSocketAddress>(),
-           rpcPort, rpcPort + 1);
+           rpcPort, rpcPort + 1, null);
     when(containerRunnerMock.submitWork(any(SubmitWorkRequestProto.class))).thenReturn(
         SubmitWorkResponseProto
             .newBuilder()

http://git-wip-us.apache.org/repos/asf/hive/blob/868e5e14/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestLlapTokenChecker.java
----------------------------------------------------------------------
diff --git a/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestLlapTokenChecker.java b/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestLlapTokenChecker.java
new file mode 100644
index 0000000..aaaa762
--- /dev/null
+++ b/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestLlapTokenChecker.java
@@ -0,0 +1,96 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.llap.daemon.impl;
+
+import static org.junit.Assert.*;
+
+import org.apache.hadoop.io.Text;
+
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.hadoop.hive.llap.security.LlapTokenIdentifier;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.junit.Test;
+
+public class TestLlapTokenChecker {
+
+  @Test
+  public void testGetToken() {
+    check(LlapTokenChecker.getTokenInfoInternal("u", null), "u", null);
+    check(LlapTokenChecker.getTokenInfoInternal(null, createTokens("u", null)), "u", null);
+    check(LlapTokenChecker.getTokenInfoInternal(null, createTokens("u", "a")), "u", "a");
+    check(LlapTokenChecker.getTokenInfoInternal("u", createTokens("u", "a")), "u", "a");
+    check(LlapTokenChecker.getTokenInfoInternal("u", createTokens("u", "a", "u", null)),
+        "u", "a");
+    // Note - some of these scenarios could be handled, but they are not supported right now.
+    // The reason is that we bind a query to app/user using the signed token information, and
+    // we don't want to bother figuring out which one to use in case of ambiguity w/o a use case.
+    checkGetThrows("u", createTokens("u2", "a")); // Ambiguous user.
+    checkGetThrows("u2", createTokens("u2", "a", "u3", "a")); // Ambiguous user.
+    checkGetThrows(null, createTokens("u2", "a", "u3", "a")); // Ambiguous user.
+    checkGetThrows(null, createTokens("u2", "a", "u2", "a1")); // Ambiguous app.
+  }
+
+  @Test
+  public void testCheckPermissions() {
+    LlapTokenChecker.checkPermissionsInternal("u", null, "u", null, null);
+    LlapTokenChecker.checkPermissionsInternal(null, createTokens("u", null) , "u", null, null);
+    LlapTokenChecker.checkPermissionsInternal("u", createTokens("u", "a") , "u", "a", null);
+    // No access.
+    checkPrmThrows("u2", null, "u", "a");
+    checkPrmThrows("u", null, "u", "a"); // Note - Kerberos user w/o appId doesn't have access.
+    checkPrmThrows(null, createTokens("u2", "a"), "u", "a");
+    checkPrmThrows(null, createTokens("u", "a2"), "u", "a");
+    checkPrmThrows(null, createTokens("u", null), "u", "a");
+  }
+
+  private List<LlapTokenIdentifier> createTokens(String... args) {
+    List<LlapTokenIdentifier> tokens = new ArrayList<>();
+    for (int i = 0; i < args.length; i += 2) {
+      tokens.add(new LlapTokenIdentifier(null, null, new Text(args[i]), "c", args[i + 1]));
+    }
+    return tokens;
+  }
+
+  private void checkGetThrows(String kerberosName, List<LlapTokenIdentifier> tokens) {
+    try {
+      LlapTokenChecker.getTokenInfoInternal(kerberosName, tokens);
+      fail("Didn't throw");
+    } catch (SecurityException ex) {
+      // Expected.
+    }
+  }
+
+  private void checkPrmThrows(
+      String kerberosName, List<LlapTokenIdentifier> tokens, String userName, String appId) {
+    try {
+      LlapTokenChecker.checkPermissionsInternal(kerberosName, tokens, userName, appId, null);
+      fail("Didn't throw");
+    } catch (SecurityException ex) {
+      // Expected.
+    }
+  }
+
+  private void check(Pair<String, String> p, String user, String appId) {
+    assertEquals(user, p.getLeft());
+    assertEquals(appId, p.getRight());
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hive/blob/868e5e14/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java
index 3ea5ef9..fd6465a 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java
@@ -275,7 +275,8 @@ public class TezSessionState {
     if (llapMode) {
       if (UserGroupInformation.isSecurityEnabled()) {
         LlapTokenProvider tp = LlapProxy.getOrInitTokenProvider(conf);
-        Token<LlapTokenIdentifier> token = tp.getDelegationToken();
+        // For Tez, we don't use appId to distinguish the tokens; security scope is the user.
+        Token<LlapTokenIdentifier> token = tp.getDelegationToken(null);
         if (LOG.isInfoEnabled()) {
           LOG.info("Obtained a LLAP token: " + token);
         }