You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by se...@apache.org on 2016/05/03 22:38:12 UTC
hive git commit: HIVE-13445 : LLAP: token should encode application
and cluster ids (Sergey Shelukhin, reviewed by Siddharth Seth)
Repository: hive
Updated Branches:
refs/heads/master b6218275b -> 868e5e141
HIVE-13445 : LLAP: token should encode application and cluster ids (Sergey Shelukhin, reviewed by Siddharth Seth)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/868e5e14
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/868e5e14
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/868e5e14
Branch: refs/heads/master
Commit: 868e5e141856ce75af48d854d9e3eb13372d11f4
Parents: b621827
Author: Sergey Shelukhin <se...@apache.org>
Authored: Tue May 3 12:01:32 2016 -0700
Committer: Sergey Shelukhin <se...@apache.org>
Committed: Tue May 3 13:38:03 2016 -0700
----------------------------------------------------------------------
.../daemon/rpc/LlapDaemonProtocolProtos.java | 209 +++++++++++++++++--
.../org/apache/hadoop/hive/llap/DaemonId.java | 41 ++++
.../hive/llap/security/LlapTokenIdentifier.java | 39 +++-
.../hive/llap/security/LlapTokenProvider.java | 2 +-
.../src/protobuf/LlapDaemonProtocol.proto | 1 +
.../hive/llap/daemon/ContainerRunner.java | 9 +-
.../llap/daemon/impl/ContainerRunnerImpl.java | 47 +++--
.../hive/llap/daemon/impl/LlapDaemon.java | 52 ++++-
.../daemon/impl/LlapProtocolServerImpl.java | 41 ++--
.../hive/llap/daemon/impl/LlapTokenChecker.java | 137 ++++++++++++
.../hadoop/hive/llap/daemon/impl/QueryInfo.java | 17 +-
.../hive/llap/daemon/impl/QueryTracker.java | 85 +++++---
.../hadoop/hive/llap/daemon/impl/Scheduler.java | 2 +
.../llap/daemon/impl/TaskExecutorService.java | 9 +
.../hive/llap/security/LlapSecurityHelper.java | 15 +-
.../hive/llap/security/SecretManager.java | 19 +-
.../hive/llap/daemon/MiniLlapCluster.java | 2 +-
.../daemon/impl/TaskExecutorTestHelpers.java | 2 +-
.../impl/TestLlapDaemonProtocolServerImpl.java | 2 +-
.../llap/daemon/impl/TestLlapTokenChecker.java | 96 +++++++++
.../hive/ql/exec/tez/TezSessionState.java | 3 +-
21 files changed, 702 insertions(+), 128 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/868e5e14/llap-common/src/gen/protobuf/gen-java/org/apache/hadoop/hive/llap/daemon/rpc/LlapDaemonProtocolProtos.java
----------------------------------------------------------------------
diff --git a/llap-common/src/gen/protobuf/gen-java/org/apache/hadoop/hive/llap/daemon/rpc/LlapDaemonProtocolProtos.java b/llap-common/src/gen/protobuf/gen-java/org/apache/hadoop/hive/llap/daemon/rpc/LlapDaemonProtocolProtos.java
index 4ab7b32..820f6be 100644
--- a/llap-common/src/gen/protobuf/gen-java/org/apache/hadoop/hive/llap/daemon/rpc/LlapDaemonProtocolProtos.java
+++ b/llap-common/src/gen/protobuf/gen-java/org/apache/hadoop/hive/llap/daemon/rpc/LlapDaemonProtocolProtos.java
@@ -12821,6 +12821,21 @@ public final class LlapDaemonProtocolProtos {
public interface GetTokenRequestProtoOrBuilder
extends com.google.protobuf.MessageOrBuilder {
+
+ // optional string app_id = 1;
+ /**
+ * <code>optional string app_id = 1;</code>
+ */
+ boolean hasAppId();
+ /**
+ * <code>optional string app_id = 1;</code>
+ */
+ java.lang.String getAppId();
+ /**
+ * <code>optional string app_id = 1;</code>
+ */
+ com.google.protobuf.ByteString
+ getAppIdBytes();
}
/**
* Protobuf type {@code GetTokenRequestProto}
@@ -12855,6 +12870,7 @@ public final class LlapDaemonProtocolProtos {
com.google.protobuf.ExtensionRegistryLite extensionRegistry)
throws com.google.protobuf.InvalidProtocolBufferException {
initFields();
+ int mutable_bitField0_ = 0;
com.google.protobuf.UnknownFieldSet.Builder unknownFields =
com.google.protobuf.UnknownFieldSet.newBuilder();
try {
@@ -12872,6 +12888,11 @@ public final class LlapDaemonProtocolProtos {
}
break;
}
+ case 10: {
+ bitField0_ |= 0x00000001;
+ appId_ = input.readBytes();
+ break;
+ }
}
}
} catch (com.google.protobuf.InvalidProtocolBufferException e) {
@@ -12911,7 +12932,52 @@ public final class LlapDaemonProtocolProtos {
return PARSER;
}
+ private int bitField0_;
+ // optional string app_id = 1;
+ public static final int APP_ID_FIELD_NUMBER = 1;
+ private java.lang.Object appId_;
+ /**
+ * <code>optional string app_id = 1;</code>
+ */
+ public boolean hasAppId() {
+ return ((bitField0_ & 0x00000001) == 0x00000001);
+ }
+ /**
+ * <code>optional string app_id = 1;</code>
+ */
+ public java.lang.String getAppId() {
+ java.lang.Object ref = appId_;
+ if (ref instanceof java.lang.String) {
+ return (java.lang.String) ref;
+ } else {
+ com.google.protobuf.ByteString bs =
+ (com.google.protobuf.ByteString) ref;
+ java.lang.String s = bs.toStringUtf8();
+ if (bs.isValidUtf8()) {
+ appId_ = s;
+ }
+ return s;
+ }
+ }
+ /**
+ * <code>optional string app_id = 1;</code>
+ */
+ public com.google.protobuf.ByteString
+ getAppIdBytes() {
+ java.lang.Object ref = appId_;
+ if (ref instanceof java.lang.String) {
+ com.google.protobuf.ByteString b =
+ com.google.protobuf.ByteString.copyFromUtf8(
+ (java.lang.String) ref);
+ appId_ = b;
+ return b;
+ } else {
+ return (com.google.protobuf.ByteString) ref;
+ }
+ }
+
private void initFields() {
+ appId_ = "";
}
private byte memoizedIsInitialized = -1;
public final boolean isInitialized() {
@@ -12925,6 +12991,9 @@ public final class LlapDaemonProtocolProtos {
public void writeTo(com.google.protobuf.CodedOutputStream output)
throws java.io.IOException {
getSerializedSize();
+ if (((bitField0_ & 0x00000001) == 0x00000001)) {
+ output.writeBytes(1, getAppIdBytes());
+ }
getUnknownFields().writeTo(output);
}
@@ -12934,6 +13003,10 @@ public final class LlapDaemonProtocolProtos {
if (size != -1) return size;
size = 0;
+ if (((bitField0_ & 0x00000001) == 0x00000001)) {
+ size += com.google.protobuf.CodedOutputStream
+ .computeBytesSize(1, getAppIdBytes());
+ }
size += getUnknownFields().getSerializedSize();
memoizedSerializedSize = size;
return size;
@@ -12957,6 +13030,11 @@ public final class LlapDaemonProtocolProtos {
org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.GetTokenRequestProto other = (org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.GetTokenRequestProto) obj;
boolean result = true;
+ result = result && (hasAppId() == other.hasAppId());
+ if (hasAppId()) {
+ result = result && getAppId()
+ .equals(other.getAppId());
+ }
result = result &&
getUnknownFields().equals(other.getUnknownFields());
return result;
@@ -12970,6 +13048,10 @@ public final class LlapDaemonProtocolProtos {
}
int hash = 41;
hash = (19 * hash) + getDescriptorForType().hashCode();
+ if (hasAppId()) {
+ hash = (37 * hash) + APP_ID_FIELD_NUMBER;
+ hash = (53 * hash) + getAppId().hashCode();
+ }
hash = (29 * hash) + getUnknownFields().hashCode();
memoizedHashCode = hash;
return hash;
@@ -13079,6 +13161,8 @@ public final class LlapDaemonProtocolProtos {
public Builder clear() {
super.clear();
+ appId_ = "";
+ bitField0_ = (bitField0_ & ~0x00000001);
return this;
}
@@ -13105,6 +13189,13 @@ public final class LlapDaemonProtocolProtos {
public org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.GetTokenRequestProto buildPartial() {
org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.GetTokenRequestProto result = new org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.GetTokenRequestProto(this);
+ int from_bitField0_ = bitField0_;
+ int to_bitField0_ = 0;
+ if (((from_bitField0_ & 0x00000001) == 0x00000001)) {
+ to_bitField0_ |= 0x00000001;
+ }
+ result.appId_ = appId_;
+ result.bitField0_ = to_bitField0_;
onBuilt();
return result;
}
@@ -13120,6 +13211,11 @@ public final class LlapDaemonProtocolProtos {
public Builder mergeFrom(org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.GetTokenRequestProto other) {
if (other == org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.GetTokenRequestProto.getDefaultInstance()) return this;
+ if (other.hasAppId()) {
+ bitField0_ |= 0x00000001;
+ appId_ = other.appId_;
+ onChanged();
+ }
this.mergeUnknownFields(other.getUnknownFields());
return this;
}
@@ -13145,6 +13241,81 @@ public final class LlapDaemonProtocolProtos {
}
return this;
}
+ private int bitField0_;
+
+ // optional string app_id = 1;
+ private java.lang.Object appId_ = "";
+ /**
+ * <code>optional string app_id = 1;</code>
+ */
+ public boolean hasAppId() {
+ return ((bitField0_ & 0x00000001) == 0x00000001);
+ }
+ /**
+ * <code>optional string app_id = 1;</code>
+ */
+ public java.lang.String getAppId() {
+ java.lang.Object ref = appId_;
+ if (!(ref instanceof java.lang.String)) {
+ java.lang.String s = ((com.google.protobuf.ByteString) ref)
+ .toStringUtf8();
+ appId_ = s;
+ return s;
+ } else {
+ return (java.lang.String) ref;
+ }
+ }
+ /**
+ * <code>optional string app_id = 1;</code>
+ */
+ public com.google.protobuf.ByteString
+ getAppIdBytes() {
+ java.lang.Object ref = appId_;
+ if (ref instanceof String) {
+ com.google.protobuf.ByteString b =
+ com.google.protobuf.ByteString.copyFromUtf8(
+ (java.lang.String) ref);
+ appId_ = b;
+ return b;
+ } else {
+ return (com.google.protobuf.ByteString) ref;
+ }
+ }
+ /**
+ * <code>optional string app_id = 1;</code>
+ */
+ public Builder setAppId(
+ java.lang.String value) {
+ if (value == null) {
+ throw new NullPointerException();
+ }
+ bitField0_ |= 0x00000001;
+ appId_ = value;
+ onChanged();
+ return this;
+ }
+ /**
+ * <code>optional string app_id = 1;</code>
+ */
+ public Builder clearAppId() {
+ bitField0_ = (bitField0_ & ~0x00000001);
+ appId_ = getDefaultInstance().getAppId();
+ onChanged();
+ return this;
+ }
+ /**
+ * <code>optional string app_id = 1;</code>
+ */
+ public Builder setAppIdBytes(
+ com.google.protobuf.ByteString value) {
+ if (value == null) {
+ throw new NullPointerException();
+ }
+ bitField0_ |= 0x00000001;
+ appId_ = value;
+ onChanged();
+ return this;
+ }
// @@protoc_insertion_point(builder_scope:GetTokenRequestProto)
}
@@ -14414,24 +14585,24 @@ public final class LlapDaemonProtocolProtos {
"RequestProto\022/\n\020query_identifier\030\001 \001(\0132\025" +
".QueryIdentifierProto\022\"\n\032fragment_identi" +
"fier_string\030\002 \001(\t\" \n\036TerminateFragmentRe" +
- "sponseProto\"\026\n\024GetTokenRequestProto\"&\n\025G",
- "etTokenResponseProto\022\r\n\005token\030\001 \001(\014*2\n\020S" +
- "ourceStateProto\022\017\n\013S_SUCCEEDED\020\001\022\r\n\tS_RU" +
- "NNING\020\002*E\n\024SubmissionStateProto\022\014\n\010ACCEP" +
- "TED\020\001\022\014\n\010REJECTED\020\002\022\021\n\rEVICTED_OTHER\020\0032\316" +
- "\002\n\022LlapDaemonProtocol\022?\n\nsubmitWork\022\027.Su" +
- "bmitWorkRequestProto\032\030.SubmitWorkRespons" +
- "eProto\022W\n\022sourceStateUpdated\022\037.SourceSta" +
- "teUpdatedRequestProto\032 .SourceStateUpdat" +
- "edResponseProto\022H\n\rqueryComplete\022\032.Query" +
- "CompleteRequestProto\032\033.QueryCompleteResp",
- "onseProto\022T\n\021terminateFragment\022\036.Termina" +
- "teFragmentRequestProto\032\037.TerminateFragme" +
- "ntResponseProto2]\n\026LlapManagementProtoco" +
- "l\022C\n\022getDelegationToken\022\025.GetTokenReques" +
- "tProto\032\026.GetTokenResponseProtoBH\n&org.ap" +
- "ache.hadoop.hive.llap.daemon.rpcB\030LlapDa" +
- "emonProtocolProtos\210\001\001\240\001\001"
+ "sponseProto\"&\n\024GetTokenRequestProto\022\016\n\006a",
+ "pp_id\030\001 \001(\t\"&\n\025GetTokenResponseProto\022\r\n\005" +
+ "token\030\001 \001(\014*2\n\020SourceStateProto\022\017\n\013S_SUC" +
+ "CEEDED\020\001\022\r\n\tS_RUNNING\020\002*E\n\024SubmissionSta" +
+ "teProto\022\014\n\010ACCEPTED\020\001\022\014\n\010REJECTED\020\002\022\021\n\rE" +
+ "VICTED_OTHER\020\0032\316\002\n\022LlapDaemonProtocol\022?\n" +
+ "\nsubmitWork\022\027.SubmitWorkRequestProto\032\030.S" +
+ "ubmitWorkResponseProto\022W\n\022sourceStateUpd" +
+ "ated\022\037.SourceStateUpdatedRequestProto\032 ." +
+ "SourceStateUpdatedResponseProto\022H\n\rquery" +
+ "Complete\022\032.QueryCompleteRequestProto\032\033.Q",
+ "ueryCompleteResponseProto\022T\n\021terminateFr" +
+ "agment\022\036.TerminateFragmentRequestProto\032\037" +
+ ".TerminateFragmentResponseProto2]\n\026LlapM" +
+ "anagementProtocol\022C\n\022getDelegationToken\022" +
+ "\025.GetTokenRequestProto\032\026.GetTokenRespons" +
+ "eProtoBH\n&org.apache.hadoop.hive.llap.da" +
+ "emon.rpcB\030LlapDaemonProtocolProtos\210\001\001\240\001\001"
};
com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {
@@ -14533,7 +14704,7 @@ public final class LlapDaemonProtocolProtos {
internal_static_GetTokenRequestProto_fieldAccessorTable = new
com.google.protobuf.GeneratedMessage.FieldAccessorTable(
internal_static_GetTokenRequestProto_descriptor,
- new java.lang.String[] { });
+ new java.lang.String[] { "AppId", });
internal_static_GetTokenResponseProto_descriptor =
getDescriptor().getMessageTypes().get(16);
internal_static_GetTokenResponseProto_fieldAccessorTable = new
http://git-wip-us.apache.org/repos/asf/hive/blob/868e5e14/llap-common/src/java/org/apache/hadoop/hive/llap/DaemonId.java
----------------------------------------------------------------------
diff --git a/llap-common/src/java/org/apache/hadoop/hive/llap/DaemonId.java b/llap-common/src/java/org/apache/hadoop/hive/llap/DaemonId.java
new file mode 100644
index 0000000..18355e6
--- /dev/null
+++ b/llap-common/src/java/org/apache/hadoop/hive/llap/DaemonId.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.llap;
+
+public class DaemonId {
+ private final String userName;
+ private final String clusterName;
+ private final String appId;
+ private final String hostName;
+ private final long startTime;
+
+ public DaemonId(String userName, String clusterName, String hostName, String appId,
+ long startTime) {
+ this.userName = userName;
+ this.clusterName = clusterName;
+ this.appId = appId;
+ this.hostName = hostName;
+ this.startTime = startTime;
+ // TODO: we could also get an unique number per daemon.
+ }
+
+ public String getClusterString() {
+ return userName + "_" + clusterName + "_" + appId;
+ }
+
+ public String getApplicationId() {
+ return appId;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hive/blob/868e5e14/llap-common/src/java/org/apache/hadoop/hive/llap/security/LlapTokenIdentifier.java
----------------------------------------------------------------------
diff --git a/llap-common/src/java/org/apache/hadoop/hive/llap/security/LlapTokenIdentifier.java b/llap-common/src/java/org/apache/hadoop/hive/llap/security/LlapTokenIdentifier.java
index 23980d0..e28eddd 100644
--- a/llap-common/src/java/org/apache/hadoop/hive/llap/security/LlapTokenIdentifier.java
+++ b/llap-common/src/java/org/apache/hadoop/hive/llap/security/LlapTokenIdentifier.java
@@ -22,6 +22,7 @@ import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
+import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.security.token.Token;
@@ -31,25 +32,32 @@ import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdenti
public class LlapTokenIdentifier extends AbstractDelegationTokenIdentifier {
private static final String KIND = "LLAP_TOKEN";
public static final Text KIND_NAME = new Text(KIND);
+ private String clusterId;
+ private String appId;
public LlapTokenIdentifier() {
super();
}
- public LlapTokenIdentifier(Text owner, Text renewer, Text realUser) {
+ public LlapTokenIdentifier(Text owner, Text renewer, Text realUser,
+ String clusterId, String appId) {
super(owner, renewer, realUser);
+ this.clusterId = clusterId;
+ this.appId = appId == null ? "" : appId;
}
@Override
public void write(DataOutput out) throws IOException {
super.write(out);
- // Nothing right now.
+ out.writeUTF(clusterId);
+ out.writeUTF(appId);
}
@Override
public void readFields(DataInput in) throws IOException {
super.readFields(in);
- // Nothing right now.
+ clusterId = in.readUTF();
+ appId = in.readUTF();
}
@Override
@@ -57,21 +65,34 @@ public class LlapTokenIdentifier extends AbstractDelegationTokenIdentifier {
return KIND_NAME;
}
+ public String getAppId() {
+ return appId;
+ }
+
+ public String getClusterId() {
+ return clusterId;
+ }
+
@Override
public int hashCode() {
- // Nothing else right now.
- return super.hashCode();
+ final int prime = 31;
+ int result = prime * super.hashCode() + ((appId == null) ? 0 : appId.hashCode());
+ return prime * result + ((clusterId == null) ? 0 : clusterId.hashCode());
}
@Override
- public boolean equals(Object other) {
- // Nothing else right now.
- return super.equals(other);
+ public boolean equals(Object obj) {
+ if (this == obj) return true;
+ if (!(obj instanceof LlapTokenIdentifier) || !super.equals(obj)) return false;
+ LlapTokenIdentifier other = (LlapTokenIdentifier) obj;
+ return (appId == null ? other.appId == null : appId.equals(other.appId))
+ && (clusterId == null ? other.clusterId == null : clusterId.equals(other.clusterId));
}
@Override
public String toString() {
- return KIND + "; " + super.toString();
+ return KIND + "; " + super.toString() + ", cluster " + clusterId + ", app secret hash "
+ + (StringUtils.isBlank(appId) ? 0 : appId.hashCode());
}
@InterfaceAudience.Private
http://git-wip-us.apache.org/repos/asf/hive/blob/868e5e14/llap-common/src/java/org/apache/hadoop/hive/llap/security/LlapTokenProvider.java
----------------------------------------------------------------------
diff --git a/llap-common/src/java/org/apache/hadoop/hive/llap/security/LlapTokenProvider.java b/llap-common/src/java/org/apache/hadoop/hive/llap/security/LlapTokenProvider.java
index 2e99a28..edf9b18 100644
--- a/llap-common/src/java/org/apache/hadoop/hive/llap/security/LlapTokenProvider.java
+++ b/llap-common/src/java/org/apache/hadoop/hive/llap/security/LlapTokenProvider.java
@@ -23,5 +23,5 @@ import java.io.IOException;
import org.apache.hadoop.security.token.Token;
public interface LlapTokenProvider {
- Token<LlapTokenIdentifier> getDelegationToken() throws IOException;
+ Token<LlapTokenIdentifier> getDelegationToken(String appId) throws IOException;
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hive/blob/868e5e14/llap-common/src/protobuf/LlapDaemonProtocol.proto
----------------------------------------------------------------------
diff --git a/llap-common/src/protobuf/LlapDaemonProtocol.proto b/llap-common/src/protobuf/LlapDaemonProtocol.proto
index 944c96c..5cdc02e 100644
--- a/llap-common/src/protobuf/LlapDaemonProtocol.proto
+++ b/llap-common/src/protobuf/LlapDaemonProtocol.proto
@@ -130,6 +130,7 @@ message TerminateFragmentResponseProto {
}
message GetTokenRequestProto {
+ optional string app_id = 1;
}
message GetTokenResponseProto {
http://git-wip-us.apache.org/repos/asf/hive/blob/868e5e14/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/ContainerRunner.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/ContainerRunner.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/ContainerRunner.java
index fc29371..c346aed 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/ContainerRunner.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/ContainerRunner.java
@@ -29,9 +29,12 @@ public interface ContainerRunner {
SubmitWorkResponseProto submitWork(SubmitWorkRequestProto request) throws IOException;
- SourceStateUpdatedResponseProto sourceStateUpdated(SourceStateUpdatedRequestProto request);
+ SourceStateUpdatedResponseProto sourceStateUpdated(
+ SourceStateUpdatedRequestProto request) throws IOException;
- QueryCompleteResponseProto queryComplete(QueryCompleteRequestProto request);
+ QueryCompleteResponseProto queryComplete(
+ QueryCompleteRequestProto request) throws IOException;
- TerminateFragmentResponseProto terminateFragment(TerminateFragmentRequestProto request);
+ TerminateFragmentResponseProto terminateFragment(
+ TerminateFragmentRequestProto request) throws IOException;
}
http://git-wip-us.apache.org/repos/asf/hive/blob/868e5e14/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java
index 3d45c7a..78b37f7 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java
@@ -92,7 +92,7 @@ public class ContainerRunnerImpl extends CompositeService implements ContainerRu
boolean enablePreemption, String[] localDirsBase, AtomicReference<Integer> localShufflePort,
AtomicReference<InetSocketAddress> localAddress,
long totalMemoryAvailableBytes, LlapDaemonExecutorMetrics metrics,
- AMReporter amReporter, ClassLoader classLoader) {
+ AMReporter amReporter, ClassLoader classLoader, String clusterId) {
super("ContainerRunnerImpl");
this.conf = conf;
Preconditions.checkState(numExecutors > 0,
@@ -101,7 +101,7 @@ public class ContainerRunnerImpl extends CompositeService implements ContainerRu
this.localShufflePort = localShufflePort;
this.amReporter = amReporter;
- this.queryTracker = new QueryTracker(conf, localDirsBase);
+ this.queryTracker = new QueryTracker(conf, localDirsBase, clusterId);
addIfService(queryTracker);
String waitQueueSchedulerClassName = HiveConf.getVar(
conf, ConfVars.LLAP_DAEMON_WAIT_QUEUE_COMPARATOR_CLASS_NAME);
@@ -175,7 +175,8 @@ public class ContainerRunnerImpl extends CompositeService implements ContainerRu
fragmentSpec.getFragmentIdentifierString());
int dagIdentifier = taskAttemptId.getTaskID().getVertexID().getDAGId().getId();
- QueryIdentifier queryIdentifier = new QueryIdentifier(request.getApplicationIdString(), dagIdentifier);
+ QueryIdentifier queryIdentifier = new QueryIdentifier(
+ request.getApplicationIdString(), dagIdentifier);
Credentials credentials = new Credentials();
DataInputBuffer dib = new DataInputBuffer();
@@ -193,6 +194,7 @@ public class ContainerRunnerImpl extends CompositeService implements ContainerRu
fragmentSpec.getAttemptNumber(), request.getUser(), request.getFragmentSpec(),
jobToken);
+
String[] localDirs = fragmentInfo.getLocalDirs();
Preconditions.checkNotNull(localDirs);
if (LOG.isDebugEnabled()) {
@@ -200,7 +202,6 @@ public class ContainerRunnerImpl extends CompositeService implements ContainerRu
}
// May need to setup localDir for re-localization, which is usually setup as Environment.PWD.
// Used for re-localization, to add the user specified configuration (conf_pb_binary_stream)
-
TaskRunnerCallable callable = new TaskRunnerCallable(request, fragmentInfo, new Configuration(getConfig()),
new LlapExecutionContext(localAddress.get().getHostName(), queryTracker), env,
credentials, memoryPerExecutor, amReporter, confParams, metrics, killedTaskHandler,
@@ -248,24 +249,23 @@ public class ContainerRunnerImpl extends CompositeService implements ContainerRu
@Override
public SourceStateUpdatedResponseProto sourceStateUpdated(
- SourceStateUpdatedRequestProto request) {
+ SourceStateUpdatedRequestProto request) throws IOException {
LOG.info("Processing state update: " + stringifySourceStateUpdateRequest(request));
- queryTracker.registerSourceStateChange(
- new QueryIdentifier(request.getQueryIdentifier().getAppIdentifier(),
- request.getQueryIdentifier().getDagIdentifier()), request.getSrcName(),
- request.getState());
+ QueryIdentifier queryId = new QueryIdentifier(request.getQueryIdentifier().getAppIdentifier(),
+ request.getQueryIdentifier().getDagIdentifier());
+ queryTracker.registerSourceStateChange(queryId, request.getSrcName(), request.getState());
return SourceStateUpdatedResponseProto.getDefaultInstance();
}
@Override
- public QueryCompleteResponseProto queryComplete(QueryCompleteRequestProto request) {
+ public QueryCompleteResponseProto queryComplete(
+ QueryCompleteRequestProto request) throws IOException {
QueryIdentifier queryIdentifier =
new QueryIdentifier(request.getQueryIdentifier().getAppIdentifier(),
request.getQueryIdentifier().getDagIdentifier());
LOG.info("Processing queryComplete notification for {}", queryIdentifier);
- List<QueryFragmentInfo> knownFragments =
- queryTracker
- .queryComplete(queryIdentifier, request.getDeleteDelay());
+ List<QueryFragmentInfo> knownFragments = queryTracker.queryComplete(
+ queryIdentifier, request.getDeleteDelay(), false);
LOG.info("DBG: Pending fragment count for completed query {} = {}", queryIdentifier,
knownFragments.size());
for (QueryFragmentInfo fragmentInfo : knownFragments) {
@@ -277,9 +277,16 @@ public class ContainerRunnerImpl extends CompositeService implements ContainerRu
}
@Override
- public TerminateFragmentResponseProto terminateFragment(TerminateFragmentRequestProto request) {
- LOG.info("DBG: Received terminateFragment request for {}", request.getFragmentIdentifierString());
- executorService.killFragment(request.getFragmentIdentifierString());
+ public TerminateFragmentResponseProto terminateFragment(
+ TerminateFragmentRequestProto request) throws IOException {
+ String fragmentId = request.getFragmentIdentifierString();
+ LOG.info("DBG: Received terminateFragment request for {}", fragmentId);
+ // TODO: ideally, QueryTracker should have fragment-to-query mapping.
+ QueryIdentifier queryId = executorService.findQueryByFragment(fragmentId);
+ // checkPermissions returns false if query is not found, throws on failure.
+ if (queryId != null && queryTracker.checkPermissionsForQuery(queryId)) {
+ executorService.killFragment(fragmentId);
+ }
return TerminateFragmentResponseProto.getDefaultInstance();
}
@@ -355,8 +362,12 @@ public class ContainerRunnerImpl extends CompositeService implements ContainerRu
@Override
public void queryFailed(QueryIdentifier queryIdentifier) {
LOG.info("Processing query failed notification for {}", queryIdentifier);
- List<QueryFragmentInfo> knownFragments =
- queryTracker.queryComplete(queryIdentifier, -1);
+ List<QueryFragmentInfo> knownFragments;
+ try {
+ knownFragments = queryTracker.queryComplete(queryIdentifier, -1, true);
+ } catch (IOException e) {
+ throw new RuntimeException(e); // Should never happen here, no permission check.
+ }
LOG.info("DBG: Pending fragment count for failed query {} = {}", queryIdentifier,
knownFragments.size());
for (QueryFragmentInfo fragmentInfo : knownFragments) {
http://git-wip-us.apache.org/repos/asf/hive/blob/868e5e14/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java
index 63cb16b..d23a44a 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java
@@ -26,12 +26,14 @@ import java.util.List;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
+import java.util.regex.Pattern;
import javax.management.ObjectName;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
+import org.apache.hadoop.hive.llap.DaemonId;
import org.apache.hadoop.hive.llap.LlapUtil;
import org.apache.hadoop.hive.llap.configuration.LlapDaemonConfiguration;
import org.apache.hadoop.hive.llap.daemon.ContainerRunner;
@@ -57,11 +59,13 @@ import org.apache.hadoop.hive.ql.udf.generic.GenericUDF;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDFBridge;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDFBridge.UdfWhitelistChecker;
import org.apache.hadoop.metrics2.util.MBeans;
+import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.service.CompositeService;
import org.apache.hadoop.util.ExitUtil;
import org.apache.hadoop.util.JvmPauseMonitor;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.api.ApplicationConstants;
+import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.hive.common.util.ShutdownHookManager;
import org.apache.logging.log4j.core.config.Configurator;
import org.slf4j.Logger;
@@ -97,6 +101,13 @@ public class LlapDaemon extends CompositeService implements ContainerRunner, Lla
private final int numExecutors;
private final long maxJvmMemory;
private final String[] localDirs;
+ private final DaemonId daemonId;
+
+ private final static Pattern hostsRe = Pattern.compile("[^A-Za-z0-9_-]");
+ private static String generateClusterName(Configuration conf) {
+ String hosts = HiveConf.getTrimmedVar(conf, ConfVars.LLAP_DAEMON_SERVICE_HOSTS);
+ return hostsRe.matcher(hosts.startsWith("@") ? hosts.substring(1) : hosts).replaceAll("_");
+ }
// TODO Not the best way to share the address
private final AtomicReference<InetSocketAddress> srvAddress = new AtomicReference<>(),
@@ -105,11 +116,10 @@ public class LlapDaemon extends CompositeService implements ContainerRunner, Lla
public LlapDaemon(Configuration daemonConf, int numExecutors, long executorMemoryBytes,
boolean ioEnabled, boolean isDirectCache, long ioMemoryBytes, String[] localDirs, int srvPort,
- int mngPort, int shufflePort, int webPort) {
+ int mngPort, int shufflePort, int webPort, String appName) {
super("LlapDaemon");
initializeLogging();
-
printAsciiArt();
Preconditions.checkArgument(numExecutors > 0);
@@ -129,6 +139,14 @@ public class LlapDaemon extends CompositeService implements ContainerRunner, Lla
"LLAP service hosts startswith '@' but hive.zookeeper.quorum is not set." +
" hive.zookeeper.quorum must be set.");
}
+ String hostName = MetricsUtils.getHostName();
+ try {
+ daemonId = new DaemonId(UserGroupInformation.getCurrentUser().getUserName(),
+ generateClusterName(daemonConf), hostName, appName, System.currentTimeMillis());
+ } catch (IOException ex) {
+ throw new RuntimeException(ex);
+ }
+
this.maxJvmMemory = getTotalHeapSize();
this.llapIoEnabled = ioEnabled;
@@ -193,7 +211,7 @@ public class LlapDaemon extends CompositeService implements ContainerRunner, Lla
LlapMetricsSystem.initialize("LlapDaemon");
this.pauseMonitor = new JvmPauseMonitor(daemonConf);
pauseMonitor.start();
- String displayName = "LlapDaemonExecutorMetrics-" + MetricsUtils.getHostName();
+ String displayName = "LlapDaemonExecutorMetrics-" + hostName;
String sessionId = MetricsUtils.getUUID();
daemonConf.set("llap.daemon.metrics.sessionid", sessionId);
String[] strIntervals = HiveConf.getTrimmedStringsVar(daemonConf,
@@ -223,11 +241,11 @@ public class LlapDaemon extends CompositeService implements ContainerRunner, Lla
this.amReporter = new AMReporter(srvAddress, new QueryFailedHandlerProxy(), daemonConf);
this.server = new LlapProtocolServerImpl(
- numHandlers, this, srvAddress, mngAddress, srvPort, mngPort);
+ numHandlers, this, srvAddress, mngAddress, srvPort, mngPort, daemonId);
this.containerRunner = new ContainerRunnerImpl(daemonConf, numExecutors, waitQueueSize,
enablePreemption, localDirs, this.shufflePort, srvAddress, executorMemoryBytes, metrics,
- amReporter, executorClassLoader);
+ amReporter, executorClassLoader, daemonId.getClusterString());
addIfService(containerRunner);
// Not adding the registry as a service, since we need to control when it is initialized - conf used to pickup properties.
@@ -377,10 +395,18 @@ public class LlapDaemon extends CompositeService implements ContainerRunner, Lla
LlapDaemonConfiguration daemonConf = new LlapDaemonConfiguration();
String containerIdStr = System.getenv(ApplicationConstants.Environment.CONTAINER_ID.name());
+ String appName = null;
if (containerIdStr != null && !containerIdStr.isEmpty()) {
daemonConf.set(ConfVars.LLAP_DAEMON_CONTAINER_ID.varname, containerIdStr);
+ appName = ConverterUtils.toContainerId(containerIdStr)
+ .getApplicationAttemptId().getApplicationId().toString();
} else {
daemonConf.unset(ConfVars.LLAP_DAEMON_CONTAINER_ID.varname);
+ // Note, we assume production LLAP always runs under YARN.
+ LOG.error("Cannot find " + ApplicationConstants.Environment.CONTAINER_ID.toString()
+ + "; LLAP tokens may grant access to subsequent instances of the cluster with"
+ + " the same name");
+ appName = null;
}
int numExecutors = HiveConf.getIntVar(daemonConf, ConfVars.LLAP_DAEMON_NUM_EXECUTORS);
@@ -400,7 +426,8 @@ public class LlapDaemon extends CompositeService implements ContainerRunner, Lla
boolean isDirectCache = HiveConf.getBoolVar(daemonConf, ConfVars.LLAP_ALLOCATOR_DIRECT);
boolean isLlapIo = HiveConf.getBoolVar(daemonConf, HiveConf.ConfVars.LLAP_IO_ENABLED, true);
llapDaemon = new LlapDaemon(daemonConf, numExecutors, executorMemoryBytes, isLlapIo,
- isDirectCache, ioMemoryBytes, localDirs, rpcPort, mngPort, shufflePort, webPort);
+ isDirectCache, ioMemoryBytes, localDirs, rpcPort, mngPort, shufflePort, webPort,
+ appName);
LOG.info("Adding shutdown hook for LlapDaemon");
ShutdownHookManager.addShutdownHook(new CompositeServiceShutdownHook(llapDaemon), 1);
@@ -420,24 +447,27 @@ public class LlapDaemon extends CompositeService implements ContainerRunner, Lla
}
@Override
- public SubmitWorkResponseProto submitWork(SubmitWorkRequestProto request) throws
- IOException {
+ public SubmitWorkResponseProto submitWork(
+ SubmitWorkRequestProto request) throws IOException {
numSubmissions.incrementAndGet();
return containerRunner.submitWork(request);
}
@Override
- public SourceStateUpdatedResponseProto sourceStateUpdated(SourceStateUpdatedRequestProto request) {
+ public SourceStateUpdatedResponseProto sourceStateUpdated(
+ SourceStateUpdatedRequestProto request) throws IOException {
return containerRunner.sourceStateUpdated(request);
}
@Override
- public QueryCompleteResponseProto queryComplete(QueryCompleteRequestProto request) {
+ public QueryCompleteResponseProto queryComplete(
+ QueryCompleteRequestProto request) throws IOException {
return containerRunner.queryComplete(request);
}
@Override
- public TerminateFragmentResponseProto terminateFragment(TerminateFragmentRequestProto request) {
+ public TerminateFragmentResponseProto terminateFragment(
+ TerminateFragmentRequestProto request) throws IOException {
return containerRunner.terminateFragment(request);
}
http://git-wip-us.apache.org/repos/asf/hive/blob/868e5e14/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapProtocolServerImpl.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapProtocolServerImpl.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapProtocolServerImpl.java
index dae1a3a..db8bfa6 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapProtocolServerImpl.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapProtocolServerImpl.java
@@ -32,6 +32,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
+import org.apache.hadoop.hive.llap.DaemonId;
import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos;
import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.GetTokenRequestProto;
import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.GetTokenResponseProto;
@@ -71,13 +72,11 @@ public class LlapProtocolServerImpl extends AbstractService
private final AtomicReference<InetSocketAddress> srvAddress, mngAddress;
private SecretManager zkSecretManager;
private String restrictedToUser = null;
+ private final DaemonId daemonId;
- public LlapProtocolServerImpl(int numHandlers,
- ContainerRunner containerRunner,
- AtomicReference<InetSocketAddress> srvAddress,
- AtomicReference<InetSocketAddress> mngAddress,
- int srvPort,
- int mngPort) {
+ public LlapProtocolServerImpl(int numHandlers, ContainerRunner containerRunner,
+ AtomicReference<InetSocketAddress> srvAddress, AtomicReference<InetSocketAddress> mngAddress,
+ int srvPort, int mngPort, DaemonId daemonId) {
super("LlapDaemonProtocolServerImpl");
this.numHandlers = numHandlers;
this.containerRunner = containerRunner;
@@ -85,14 +84,14 @@ public class LlapProtocolServerImpl extends AbstractService
this.srvPort = srvPort;
this.mngAddress = mngAddress;
this.mngPort = mngPort;
+ this.daemonId = daemonId;
LOG.info("Creating: " + LlapProtocolServerImpl.class.getSimpleName() +
" with port configured to: " + srvPort);
}
@Override
public SubmitWorkResponseProto submitWork(RpcController controller,
- SubmitWorkRequestProto request) throws
- ServiceException {
+ SubmitWorkRequestProto request) throws ServiceException {
try {
return containerRunner.submitWork(request);
} catch (IOException e) {
@@ -103,20 +102,31 @@ public class LlapProtocolServerImpl extends AbstractService
@Override
public SourceStateUpdatedResponseProto sourceStateUpdated(RpcController controller,
SourceStateUpdatedRequestProto request) throws ServiceException {
- return containerRunner.sourceStateUpdated(request);
+ try {
+ return containerRunner.sourceStateUpdated(request);
+ } catch (IOException e) {
+ throw new ServiceException(e);
+ }
}
@Override
public QueryCompleteResponseProto queryComplete(RpcController controller,
QueryCompleteRequestProto request) throws ServiceException {
- return containerRunner.queryComplete(request);
+ try {
+ return containerRunner.queryComplete(request);
+ } catch (IOException e) {
+ throw new ServiceException(e);
+ }
}
@Override
public TerminateFragmentResponseProto terminateFragment(
- RpcController controller,
- TerminateFragmentRequestProto request) throws ServiceException {
- return containerRunner.terminateFragment(request);
+ RpcController controller, TerminateFragmentRequestProto request) throws ServiceException {
+ try {
+ return containerRunner.terminateFragment(request);
+ } catch (IOException e) {
+ throw new ServiceException(e);
+ }
}
@Override
@@ -140,7 +150,7 @@ public class LlapProtocolServerImpl extends AbstractService
}
String llapPrincipal = HiveConf.getVar(conf, ConfVars.LLAP_KERBEROS_PRINCIPAL),
llapKeytab = HiveConf.getVar(conf, ConfVars.LLAP_KERBEROS_KEYTAB_FILE);
- zkSecretManager = SecretManager.createSecretManager(conf, llapPrincipal, llapKeytab);
+ zkSecretManager = SecretManager.createSecretManager(conf, llapPrincipal, llapKeytab, daemonId);
// Start the protocol server after properly authenticating with daemon keytab.
UserGroupInformation daemonUgi = null;
@@ -275,7 +285,8 @@ public class LlapProtocolServerImpl extends AbstractService
realUser = new Text(ugi.getRealUser().getUserName());
}
Text renewer = new Text(ugi.getShortUserName());
- LlapTokenIdentifier llapId = new LlapTokenIdentifier(owner, renewer, realUser);
+ LlapTokenIdentifier llapId = new LlapTokenIdentifier(owner, renewer, realUser,
+ daemonId.getClusterString(), request.hasAppId() ? request.getAppId() : null);
// TODO: note that the token is not renewable right now and will last for 2 weeks by default.
Token<LlapTokenIdentifier> token = new Token<LlapTokenIdentifier>(llapId, zkSecretManager);
if (LOG.isInfoEnabled()) {
http://git-wip-us.apache.org/repos/asf/hive/blob/868e5e14/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapTokenChecker.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapTokenChecker.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapTokenChecker.java
new file mode 100644
index 0000000..03ee055
--- /dev/null
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapTokenChecker.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.llap.daemon.impl;
+
+import com.google.common.annotations.VisibleForTesting;
+
+import java.util.ArrayList;
+
+import java.util.List;
+
+import java.io.IOException;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.lang3.tuple.ImmutablePair;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.hadoop.hive.llap.security.LlapTokenIdentifier;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.TokenIdentifier;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public final class LlapTokenChecker {
+ private static final Logger LOG = LoggerFactory.getLogger(LlapTokenChecker.class);
+
+ private static final ImmutablePair<String, String> NO_SECURITY = new ImmutablePair<>(null, null);
+ public static Pair<String, String> getTokenInfo(String clusterId) throws IOException {
+ if (!UserGroupInformation.isSecurityEnabled()) return NO_SECURITY;
+ UserGroupInformation current = UserGroupInformation.getCurrentUser();
+ String kerberosName = current.hasKerberosCredentials() ? current.getShortUserName() : null;
+ List<LlapTokenIdentifier> tokens = getLlapTokens(current, clusterId);
+ if ((tokens == null || tokens.isEmpty()) && kerberosName == null) {
+ throw new SecurityException("No tokens or kerberos for " + current);
+ }
+ return getTokenInfoInternal(kerberosName, tokens);
+ }
+
+ private static List<LlapTokenIdentifier> getLlapTokens(
+ UserGroupInformation ugi, String clusterId) {
+ List<LlapTokenIdentifier> tokens = null;
+ for (TokenIdentifier id : ugi.getTokenIdentifiers()) {
+ if (!LlapTokenIdentifier.KIND_NAME.equals(id.getKind())) continue;
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Token {}", id);
+ }
+ LlapTokenIdentifier llapId = (LlapTokenIdentifier)id;
+ if (!clusterId.equals(llapId.getClusterId())) continue;
+ if (tokens == null) {
+ tokens = new ArrayList<>();
+ }
+ tokens.add((LlapTokenIdentifier)id);
+ }
+ return tokens;
+ }
+
+ @VisibleForTesting
+ static Pair<String, String> getTokenInfoInternal(
+ String kerberosName, List<LlapTokenIdentifier> tokens) {
+ assert (tokens != null && !tokens.isEmpty()) || kerberosName != null;
+ if (tokens == null) {
+ return new ImmutablePair<String, String>(kerberosName, null);
+ }
+ String userName = kerberosName, appId = null;
+ for (LlapTokenIdentifier llapId : tokens) {
+ String newUserName = llapId.getRealUser().toString();
+ if (userName != null && !userName.equals(newUserName)) {
+ throw new SecurityException("Ambiguous user name from credentials - " + userName
+ + " and " + newUserName + " from " + llapId
+ + ((kerberosName == null) ? ("; has kerberos credentials for " + kerberosName) : ""));
+ }
+ userName = newUserName;
+ String newAppId = llapId.getAppId();
+ if (!StringUtils.isEmpty(newAppId)) {
+ if (!StringUtils.isEmpty(appId) && !appId.equals(newAppId)) {
+ throw new SecurityException("Ambiguous app ID from credentials - " + appId
+ + " and " + newAppId + " from " + llapId);
+ }
+ appId = newAppId;
+ }
+ }
+ assert userName != null;
+ return new ImmutablePair<String, String>(userName, appId);
+ }
+
+ public static void checkPermissions(
+ String clusterId, String userName, String appId, Object hint) throws IOException {
+ if (!UserGroupInformation.isSecurityEnabled()) return;
+ UserGroupInformation current = UserGroupInformation.getCurrentUser();
+ String kerberosName = current.hasKerberosCredentials() ? current.getShortUserName() : null;
+ List<LlapTokenIdentifier> tokens = getLlapTokens(current, clusterId);
+ checkPermissionsInternal(kerberosName, tokens, userName, appId, hint);
+ }
+
+ @VisibleForTesting
+ static void checkPermissionsInternal(String kerberosName, List<LlapTokenIdentifier> tokens,
+ String userName, String appId, Object hint) {
+ if (kerberosName != null && StringUtils.isEmpty(appId) && kerberosName.equals(userName)) {
+ return;
+ }
+ if (tokens != null) {
+ for (LlapTokenIdentifier llapId : tokens) {
+ String tokenUser = llapId.getRealUser().toString(), tokenAppId = llapId.getAppId();
+ if (checkTokenPermissions(userName, appId, tokenUser, tokenAppId)) return;
+ }
+ }
+ throw new SecurityException("Unauthorized to access "
+ + userName + ", " + appId.hashCode() + " (" + hint + ")");
+ }
+
+ public static void checkPermissions(
+ Pair<String, String> prm, String userName, String appId, Object hint) {
+ if (userName == null) {
+ assert StringUtils.isEmpty(appId);
+ return;
+ }
+ if (!checkTokenPermissions(userName, appId, prm.getLeft(), prm.getRight())) {
+ throw new SecurityException("Unauthorized to access "
+ + userName + ", " + appId.hashCode() + " (" + hint + ")");
+ }
+ }
+
+ private static boolean checkTokenPermissions(
+ String userName, String appId, String tokenUser, String tokenAppId) {
+ return userName.equals(tokenUser)
+ && (StringUtils.isEmpty(appId) || appId.equals(tokenAppId));
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hive/blob/868e5e14/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryInfo.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryInfo.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryInfo.java
index 64c2b58..8daef9e 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryInfo.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryInfo.java
@@ -55,10 +55,11 @@ public class QueryInfo {
private final ConcurrentMap<String, SourceStateProto> sourceStateMap;
private final FinishableStateTracker finishableStateTracker = new FinishableStateTracker();
+ private final String tokenUserName, appId;
- public QueryInfo(QueryIdentifier queryIdentifier, String appIdString, String dagName, int dagIdentifier,
- String user, ConcurrentMap<String, SourceStateProto> sourceStateMap,
- String[] localDirsBase, FileSystem localFs) {
+ public QueryInfo(QueryIdentifier queryIdentifier, String appIdString, String dagName,
+ int dagIdentifier, String user, ConcurrentMap<String, SourceStateProto> sourceStateMap,
+ String[] localDirsBase, FileSystem localFs, String tokenUserName, String tokenAppId) {
this.queryIdentifier = queryIdentifier;
this.appIdString = appIdString;
this.dagName = dagName;
@@ -67,6 +68,8 @@ public class QueryInfo {
this.user = user;
this.localDirsBase = localDirsBase;
this.localFs = localFs;
+ this.tokenUserName = tokenUserName;
+ this.appId = tokenAppId;
}
public QueryIdentifier getQueryIdentifier() {
@@ -270,4 +273,12 @@ public class QueryInfo {
this.lastFinishableState = lastFinishableState;
}
}
+
+ public String getTokenUserName() {
+ return tokenUserName;
+ }
+
+ public String getTokenAppId() {
+ return appId;
+ }
}
http://git-wip-us.apache.org/repos/asf/hive/blob/868e5e14/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryTracker.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryTracker.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryTracker.java
index 14657e6..cb3be2b 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryTracker.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryTracker.java
@@ -18,6 +18,8 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.lang3.tuple.Pair;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.security.token.Token;
import org.apache.tez.common.CallableWithNdc;
@@ -60,6 +62,7 @@ public class QueryTracker extends AbstractService {
private final String[] localDirsBase;
private final FileSystem localFs;
+ private final String clusterId;
private final long defaultDeleteDelaySeconds;
// TODO At the moment there's no way of knowing whether a query is running or not.
@@ -89,9 +92,10 @@ public class QueryTracker extends AbstractService {
private final ConcurrentHashMap<QueryIdentifier, String> queryIdentifierToHiveQueryId =
new ConcurrentHashMap<>();
- public QueryTracker(Configuration conf, String[] localDirsBase) {
+ public QueryTracker(Configuration conf, String[] localDirsBase, String clusterId) {
super("QueryTracker");
this.localDirsBase = localDirsBase;
+ this.clusterId = clusterId;
try {
localFs = FileSystem.getLocal(conf);
} catch (IOException e) {
@@ -119,35 +123,50 @@ public class QueryTracker extends AbstractService {
* @param user
* @throws IOException
*/
- QueryFragmentInfo registerFragment(QueryIdentifier queryIdentifier, String appIdString, String dagName,
- int dagIdentifier, String vertexName, int fragmentNumber, int attemptNumber, String user,
- FragmentSpecProto fragmentSpec, Token<JobTokenIdentifier> appToken) throws IOException {
+ QueryFragmentInfo registerFragment(QueryIdentifier queryIdentifier, String appIdString,
+ String dagName, int dagIdentifier, String vertexName, int fragmentNumber, int attemptNumber,
+ String user, FragmentSpecProto fragmentSpec, Token<JobTokenIdentifier> appToken)
+ throws IOException {
ReadWriteLock dagLock = getDagLock(queryIdentifier);
dagLock.readLock().lock();
try {
- if (!completedDagMap.contains(queryIdentifier)) {
- QueryInfo queryInfo = queryInfoMap.get(queryIdentifier);
- if (queryInfo == null) {
- queryInfo = new QueryInfo(queryIdentifier, appIdString, dagName, dagIdentifier, user,
- getSourceCompletionMap(queryIdentifier), localDirsBase, localFs);
- queryInfoMap.putIfAbsent(queryIdentifier, queryInfo);
- }
-
- if (LOG.isDebugEnabled()) {
- LOG.debug("Registering request for {} with the ShuffleHandler", queryIdentifier);
- }
- ShuffleHandler.get()
- .registerDag(appIdString, dagIdentifier, appToken,
- user, queryInfo.getLocalDirs());
-
- return queryInfo.registerFragment(vertexName, fragmentNumber, attemptNumber, fragmentSpec);
- } else {
+ if (completedDagMap.contains(queryIdentifier)) {
// Cleanup the dag lock here, since it may have been created after the query completed
dagSpecificLocks.remove(queryIdentifier);
throw new RuntimeException(
"Dag " + dagName + " already complete. Rejecting fragment ["
+ vertexName + ", " + fragmentNumber + ", " + attemptNumber + "]");
}
+ // TODO: for now, we get the secure username out of UGI... after signing, we can take it
+ // out of the request provided that it's signed.
+ Pair<String, String> tokenInfo = LlapTokenChecker.getTokenInfo(clusterId);
+ boolean isExistingQueryInfo = true;
+ QueryInfo queryInfo = queryInfoMap.get(queryIdentifier);
+ if (queryInfo == null) {
+ queryInfo = new QueryInfo(queryIdentifier, appIdString, dagName, dagIdentifier, user,
+ getSourceCompletionMap(queryIdentifier), localDirsBase, localFs,
+ tokenInfo.getLeft(), tokenInfo.getRight());
+ QueryInfo old = queryInfoMap.putIfAbsent(queryIdentifier, queryInfo);
+ if (old != null) {
+ queryInfo = old;
+ } else {
+ isExistingQueryInfo = false;
+ }
+ }
+ if (isExistingQueryInfo) {
+ // We already retrieved the incoming info, check without UGI.
+ LlapTokenChecker.checkPermissions(tokenInfo, queryInfo.getTokenUserName(),
+ queryInfo.getTokenAppId(), queryInfo.getQueryIdentifier());
+ }
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Registering request for {} with the ShuffleHandler", queryIdentifier);
+ }
+ ShuffleHandler.get()
+ .registerDag(appIdString, dagIdentifier, appToken,
+ user, queryInfo.getLocalDirs());
+
+ return queryInfo.registerFragment(vertexName, fragmentNumber, attemptNumber, fragmentSpec);
} finally {
dagLock.readLock().unlock();
}
@@ -174,17 +193,20 @@ public class QueryTracker extends AbstractService {
* @param queryIdentifier
* @param deleteDelay
*/
- List<QueryFragmentInfo> queryComplete(QueryIdentifier queryIdentifier, long deleteDelay) {
+ List<QueryFragmentInfo> queryComplete(QueryIdentifier queryIdentifier, long deleteDelay,
+ boolean isInternal) throws IOException {
if (deleteDelay == -1) {
deleteDelay = defaultDeleteDelaySeconds;
}
ReadWriteLock dagLock = getDagLock(queryIdentifier);
dagLock.writeLock().lock();
try {
+ QueryInfo queryInfo = isInternal
+ ? queryInfoMap.get(queryIdentifier) : checkPermissionsAndGetQuery(queryIdentifier);
rememberCompletedDag(queryIdentifier);
LOG.info("Processing queryComplete for queryIdentifier={} with deleteDelay={} seconds", queryIdentifier,
deleteDelay);
- QueryInfo queryInfo = queryInfoMap.remove(queryIdentifier);
+ queryInfoMap.remove(queryIdentifier);
if (queryInfo == null) {
LOG.warn("Ignoring query complete for unknown dag: {}", queryIdentifier);
return Collections.emptyList();
@@ -229,9 +251,10 @@ public class QueryTracker extends AbstractService {
* @param sourceName
* @param sourceState
*/
- void registerSourceStateChange(QueryIdentifier queryIdentifier, String sourceName, SourceStateProto sourceState) {
+ void registerSourceStateChange(QueryIdentifier queryIdentifier, String sourceName,
+ SourceStateProto sourceState) throws IOException {
getSourceCompletionMap(queryIdentifier).put(sourceName, sourceState);
- QueryInfo queryInfo = queryInfoMap.get(queryIdentifier);
+ QueryInfo queryInfo = checkPermissionsAndGetQuery(queryIdentifier);
if (queryInfo != null) {
queryInfo.sourceStateUpdated(sourceName);
} else {
@@ -322,4 +345,16 @@ public class QueryTracker extends AbstractService {
return null;
}
}
+
+ private QueryInfo checkPermissionsAndGetQuery(QueryIdentifier queryId) throws IOException {
+ QueryInfo queryInfo = queryInfoMap.get(queryId);
+ if (queryInfo == null) return null;
+ LlapTokenChecker.checkPermissions(clusterId, queryInfo.getTokenAppId(),
+ queryInfo.getTokenUserName(), queryInfo.getQueryIdentifier());
+ return queryInfo;
+ }
+
+ public boolean checkPermissionsForQuery(QueryIdentifier queryId) throws IOException {
+ return checkPermissionsAndGetQuery(queryId) != null;
+ }
}
http://git-wip-us.apache.org/repos/asf/hive/blob/868e5e14/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/Scheduler.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/Scheduler.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/Scheduler.java
index 26c8e55..fd6234a 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/Scheduler.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/Scheduler.java
@@ -44,4 +44,6 @@ public interface Scheduler<T> {
void killFragment(String fragmentId);
Set<String> getExecutorsStatus();
+
+ QueryIdentifier findQueryByFragment(String fragmentId);
}
http://git-wip-us.apache.org/repos/asf/hive/blob/868e5e14/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java
index f621af2..1933eb1 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java
@@ -420,6 +420,15 @@ public class TaskExecutorService extends AbstractService implements Scheduler<Ta
}
@Override
+ public QueryIdentifier findQueryByFragment(String fragmentId) {
+ synchronized (lock) {
+ TaskWrapper taskWrapper = knownTasks.get(fragmentId);
+ return taskWrapper == null ? null : taskWrapper.getTaskRunnerCallable()
+ .getFragmentInfo().getQueryInfo().getQueryIdentifier();
+ }
+ }
+
+ @Override
public void killFragment(String fragmentId) {
synchronized (lock) {
TaskWrapper taskWrapper = knownTasks.remove(fragmentId);
http://git-wip-us.apache.org/repos/asf/hive/blob/868e5e14/llap-server/src/java/org/apache/hadoop/hive/llap/security/LlapSecurityHelper.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/security/LlapSecurityHelper.java b/llap-server/src/java/org/apache/hadoop/hive/llap/security/LlapSecurityHelper.java
index 76ba225..f958bc4 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/security/LlapSecurityHelper.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/security/LlapSecurityHelper.java
@@ -26,6 +26,7 @@ import java.util.concurrent.TimeUnit;
import javax.net.SocketFactory;
+import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.llap.impl.LlapManagementProtocolClientImpl;
import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.GetTokenRequestProto;
@@ -81,7 +82,7 @@ public class LlapSecurityHelper implements LlapTokenProvider {
}
@Override
- public Token<LlapTokenIdentifier> getDelegationToken() throws IOException {
+ public Token<LlapTokenIdentifier> getDelegationToken(String appId) throws IOException {
if (!UserGroupInformation.isSecurityEnabled()) return null;
if (llapUgi == null) {
llapUgi = UserGroupInformation.getCurrentUser();
@@ -98,7 +99,7 @@ public class LlapSecurityHelper implements LlapTokenProvider {
boolean hasRefreshed = false;
while (true) {
try {
- tokenBytes = getTokenBytes();
+ tokenBytes = getTokenBytes(appId);
break;
} catch (InterruptedException ie) {
throw new RuntimeException(ie);
@@ -128,7 +129,8 @@ public class LlapSecurityHelper implements LlapTokenProvider {
return token;
}
- private ByteString getTokenBytes() throws InterruptedException, IOException {
+ private ByteString getTokenBytes(
+ final String appId) throws InterruptedException, IOException {
return llapUgi.doAs(new PrivilegedExceptionAction<ByteString>() {
@Override
public ByteString run() throws Exception {
@@ -138,8 +140,11 @@ public class LlapSecurityHelper implements LlapTokenProvider {
clientInstance.getManagementPort(), retryPolicy, socketFactory);
}
// Client only connects on the first call, so this has to be done in doAs.
- GetTokenRequestProto req = GetTokenRequestProto.newBuilder().build();
- return client.getDelegationToken(null, req).getToken();
+ GetTokenRequestProto.Builder req = GetTokenRequestProto.newBuilder();
+ if (!StringUtils.isBlank(appId)) {
+ req.setAppId(appId);
+ }
+ return client.getDelegationToken(null, req.build()).getToken();
}
});
}
http://git-wip-us.apache.org/repos/asf/hive/blob/868e5e14/llap-server/src/java/org/apache/hadoop/hive/llap/security/SecretManager.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/security/SecretManager.java b/llap-server/src/java/org/apache/hadoop/hive/llap/security/SecretManager.java
index 8c7a539..c54e726 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/security/SecretManager.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/security/SecretManager.java
@@ -23,6 +23,7 @@ import java.util.regex.Pattern;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
+import org.apache.hadoop.hive.llap.DaemonId;
import org.apache.hadoop.hive.llap.security.LlapTokenIdentifier;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
@@ -33,6 +34,7 @@ import org.slf4j.LoggerFactory;
public class SecretManager extends ZKDelegationTokenSecretManager<LlapTokenIdentifier> {
private static final Logger LOG = LoggerFactory.getLogger(SecretManager.class);
+
public SecretManager(Configuration conf) {
super(conf);
checkForZKDTSMBug(conf);
@@ -82,16 +84,8 @@ public class SecretManager extends ZKDelegationTokenSecretManager<LlapTokenIdent
return id;
}
- private final static Pattern hostsRe = Pattern.compile("[^A-Za-z0-9_-]");
- private static String deriveZkPath(Configuration conf) throws IOException {
- String hosts = HiveConf.getTrimmedVar(conf, ConfVars.LLAP_DAEMON_SERVICE_HOSTS);
- String clusterName = hosts.startsWith("@") ? hosts.substring(1) : hosts;
- String userName = UserGroupInformation.getCurrentUser().getShortUserName();
- return hostsRe.matcher(userName + "_" + clusterName).replaceAll("_") ;
- }
-
public static SecretManager createSecretManager(
- final Configuration conf, String llapPrincipal, String llapKeytab) {
+ final Configuration conf, String llapPrincipal, String llapKeytab, DaemonId daemonId) {
// Create ZK connection under a separate ugi (if specified) - ZK works in mysterious ways.
UserGroupInformation zkUgi = null;
String principal = HiveConf.getVar(conf, ConfVars.LLAP_ZKSM_KERBEROS_PRINCIPAL, llapPrincipal);
@@ -110,12 +104,7 @@ public class SecretManager extends ZKDelegationTokenSecretManager<LlapTokenIdent
zkConf.setLong(DelegationTokenManager.RENEW_INTERVAL, tokenLifetime);
zkConf.set(SecretManager.ZK_DTSM_ZK_KERBEROS_PRINCIPAL, principal);
zkConf.set(SecretManager.ZK_DTSM_ZK_KERBEROS_KEYTAB, keyTab);
- String zkPath;
- try {
- zkPath = deriveZkPath(conf);
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
+ String zkPath = daemonId.getClusterString();
LOG.info("Using {} as ZK secret manager path", zkPath);
zkConf.set(SecretManager.ZK_DTSM_ZNODE_WORKING_PATH, "zkdtsm_" + zkPath);
setZkConfIfNotSet(zkConf, SecretManager.ZK_DTSM_ZK_AUTH_TYPE, "sasl");
http://git-wip-us.apache.org/repos/asf/hive/blob/868e5e14/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/MiniLlapCluster.java
----------------------------------------------------------------------
diff --git a/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/MiniLlapCluster.java b/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/MiniLlapCluster.java
index 610f266..dde5be0 100644
--- a/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/MiniLlapCluster.java
+++ b/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/MiniLlapCluster.java
@@ -193,7 +193,7 @@ public class MiniLlapCluster extends AbstractService {
LOG.info("Initializing {} llap instances for MiniLlapCluster with name={}", numInstances, clusterNameTrimmed);
for (int i = 0 ;i < numInstances ; i++) {
llapDaemons[i] = new LlapDaemon(conf, numExecutorsPerService, execBytesPerService, llapIoEnabled,
- ioIsDirect, ioBytesPerService, localDirs, rpcPort, mngPort, shufflePort, webPort);
+ ioIsDirect, ioBytesPerService, localDirs, rpcPort, mngPort, shufflePort, webPort, clusterNameTrimmed);
llapDaemons[i].init(new Configuration(conf));
}
LOG.info("Initialized {} llap instances for MiniLlapCluster with name={}", numInstances, clusterNameTrimmed);
http://git-wip-us.apache.org/repos/asf/hive/blob/868e5e14/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorTestHelpers.java
----------------------------------------------------------------------
diff --git a/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorTestHelpers.java b/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorTestHelpers.java
index 24f4442..c6ba14e 100644
--- a/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorTestHelpers.java
+++ b/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorTestHelpers.java
@@ -78,7 +78,7 @@ public class TaskExecutorTestHelpers {
QueryInfo queryInfo =
new QueryInfo(queryIdentifier, "fake_app_id_string", "fake_dag_name", 1, "fakeUser",
new ConcurrentHashMap<String, LlapDaemonProtocolProtos.SourceStateProto>(),
- new String[0], null);
+ new String[0], null, null, null);
return queryInfo;
}
http://git-wip-us.apache.org/repos/asf/hive/blob/868e5e14/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestLlapDaemonProtocolServerImpl.java
----------------------------------------------------------------------
diff --git a/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestLlapDaemonProtocolServerImpl.java b/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestLlapDaemonProtocolServerImpl.java
index a65bf5c..fd37a06 100644
--- a/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestLlapDaemonProtocolServerImpl.java
+++ b/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestLlapDaemonProtocolServerImpl.java
@@ -46,7 +46,7 @@ public class TestLlapDaemonProtocolServerImpl {
LlapProtocolServerImpl server =
new LlapProtocolServerImpl(numHandlers, containerRunnerMock,
new AtomicReference<InetSocketAddress>(), new AtomicReference<InetSocketAddress>(),
- rpcPort, rpcPort + 1);
+ rpcPort, rpcPort + 1, null);
when(containerRunnerMock.submitWork(any(SubmitWorkRequestProto.class))).thenReturn(
SubmitWorkResponseProto
.newBuilder()
http://git-wip-us.apache.org/repos/asf/hive/blob/868e5e14/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestLlapTokenChecker.java
----------------------------------------------------------------------
diff --git a/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestLlapTokenChecker.java b/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestLlapTokenChecker.java
new file mode 100644
index 0000000..aaaa762
--- /dev/null
+++ b/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestLlapTokenChecker.java
@@ -0,0 +1,96 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.llap.daemon.impl;
+
+import static org.junit.Assert.*;
+
+import org.apache.hadoop.io.Text;
+
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.hadoop.hive.llap.security.LlapTokenIdentifier;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.junit.Test;
+
+public class TestLlapTokenChecker {
+
+ @Test
+ public void testGetToken() {
+ check(LlapTokenChecker.getTokenInfoInternal("u", null), "u", null);
+ check(LlapTokenChecker.getTokenInfoInternal(null, createTokens("u", null)), "u", null);
+ check(LlapTokenChecker.getTokenInfoInternal(null, createTokens("u", "a")), "u", "a");
+ check(LlapTokenChecker.getTokenInfoInternal("u", createTokens("u", "a")), "u", "a");
+ check(LlapTokenChecker.getTokenInfoInternal("u", createTokens("u", "a", "u", null)),
+ "u", "a");
+ // Note - some of these scenarios could be handled, but they are not supported right now.
+ // The reason is that we bind a query to app/user using the signed token information, and
+ // we don't want to bother figuring out which one to use in case of ambiguity w/o a use case.
+ checkGetThrows("u", createTokens("u2", "a")); // Ambiguous user.
+ checkGetThrows("u2", createTokens("u2", "a", "u3", "a")); // Ambiguous user.
+ checkGetThrows(null, createTokens("u2", "a", "u3", "a")); // Ambiguous user.
+ checkGetThrows(null, createTokens("u2", "a", "u2", "a1")); // Ambiguous app.
+ }
+
+ @Test
+ public void testCheckPermissions() {
+ LlapTokenChecker.checkPermissionsInternal("u", null, "u", null, null);
+ LlapTokenChecker.checkPermissionsInternal(null, createTokens("u", null) , "u", null, null);
+ LlapTokenChecker.checkPermissionsInternal("u", createTokens("u", "a") , "u", "a", null);
+ // No access.
+ checkPrmThrows("u2", null, "u", "a");
+ checkPrmThrows("u", null, "u", "a"); // Note - Kerberos user w/o appId doesn't have access.
+ checkPrmThrows(null, createTokens("u2", "a"), "u", "a");
+ checkPrmThrows(null, createTokens("u", "a2"), "u", "a");
+ checkPrmThrows(null, createTokens("u", null), "u", "a");
+ }
+
+ private List<LlapTokenIdentifier> createTokens(String... args) {
+ List<LlapTokenIdentifier> tokens = new ArrayList<>();
+ for (int i = 0; i < args.length; i += 2) {
+ tokens.add(new LlapTokenIdentifier(null, null, new Text(args[i]), "c", args[i + 1]));
+ }
+ return tokens;
+ }
+
+ private void checkGetThrows(String kerberosName, List<LlapTokenIdentifier> tokens) {
+ try {
+ LlapTokenChecker.getTokenInfoInternal(kerberosName, tokens);
+ fail("Didn't throw");
+ } catch (SecurityException ex) {
+ // Expected.
+ }
+ }
+
+ private void checkPrmThrows(
+ String kerberosName, List<LlapTokenIdentifier> tokens, String userName, String appId) {
+ try {
+ LlapTokenChecker.checkPermissionsInternal(kerberosName, tokens, userName, appId, null);
+ fail("Didn't throw");
+ } catch (SecurityException ex) {
+ // Expected.
+ }
+ }
+
+ private void check(Pair<String, String> p, String user, String appId) {
+ assertEquals(user, p.getLeft());
+ assertEquals(appId, p.getRight());
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hive/blob/868e5e14/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java
index 3ea5ef9..fd6465a 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java
@@ -275,7 +275,8 @@ public class TezSessionState {
if (llapMode) {
if (UserGroupInformation.isSecurityEnabled()) {
LlapTokenProvider tp = LlapProxy.getOrInitTokenProvider(conf);
- Token<LlapTokenIdentifier> token = tp.getDelegationToken();
+ // For Tez, we don't use appId to distinguish the tokens; security scope is the user.
+ Token<LlapTokenIdentifier> token = tp.getDelegationToken(null);
if (LOG.isInfoEnabled()) {
LOG.info("Obtained a LLAP token: " + token);
}