You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by se...@apache.org on 2016/05/31 20:24:08 UTC
[2/2] hive git commit: HIVE-13444 : LLAP: add HMAC signatures to LLAP;
verify them on LLAP side (Sergey Shelukhin, reviewed by Siddharth Seth)
HIVE-13444 : LLAP: add HMAC signatures to LLAP; verify them on LLAP side (Sergey Shelukhin, reviewed by Siddharth Seth)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/9fe3dab7
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/9fe3dab7
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/9fe3dab7
Branch: refs/heads/branch-2.1
Commit: 9fe3dab7fe82d78435d9cd01f44f7a8e748f3420
Parents: 44a8f0a
Author: Sergey Shelukhin <se...@apache.org>
Authored: Tue May 31 13:09:45 2016 -0700
Committer: Sergey Shelukhin <se...@apache.org>
Committed: Tue May 31 13:24:05 2016 -0700
----------------------------------------------------------------------
.../org/apache/hadoop/hive/conf/HiveConf.java | 11 +-
.../hive/llap/security/LlapTokenClient.java | 148 ++++++++++++++
.../llap/security/LlapTokenClientFactory.java | 160 ---------------
.../llap/security/LlapTokenLocalClient.java | 12 +-
.../hadoop/hive/llap/security/LlapSigner.java | 41 ++++
.../hive/llap/security/LlapTokenIdentifier.java | 14 +-
.../hive/llap/security/SecretManager.java | 96 ++++++---
.../llap/security/SigningSecretManager.java | 26 +++
.../llap/daemon/impl/ContainerRunnerImpl.java | 57 +++++-
.../hive/llap/daemon/impl/LlapDaemon.java | 2 +-
.../daemon/impl/LlapProtocolServerImpl.java | 59 ++++--
.../hive/llap/daemon/impl/LlapTokenChecker.java | 30 ++-
.../hive/llap/daemon/impl/QueryTracker.java | 14 +-
.../llap/daemon/impl/TaskExecutorService.java | 6 +-
.../llap/daemon/impl/TaskRunnerCallable.java | 19 +-
.../hive/llap/security/LlapSignerImpl.java | 60 ++++++
.../daemon/impl/TaskExecutorTestHelpers.java | 3 +-
.../llap/daemon/impl/TestLlapTokenChecker.java | 8 +-
.../TestFirstInFirstOutComparator.java | 31 ---
.../hive/llap/security/TestLlapSignerImpl.java | 200 +++++++++++++++++++
.../hive/ql/exec/tez/TezSessionState.java | 26 +--
21 files changed, 720 insertions(+), 303 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/9fe3dab7/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index 6a404bd..cad5d65 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -2716,6 +2716,12 @@ public class HiveConf extends Configuration {
LLAP_MANAGEMENT_ACL("hive.llap.management.acl", "*", "The ACL for LLAP daemon management."),
LLAP_MANAGEMENT_ACL_DENY("hive.llap.management.acl.blocked", "",
"The deny ACL for LLAP daemon management."),
+ LLAP_REMOTE_TOKEN_REQUIRES_SIGNING("hive.llap.remote.token.requires.signing", "true",
+ new StringSet("false", "except_llap_owner", "true"),
+ "Whether the token returned from LLAP management API should require fragment signing.\n" +
+ "True by default; can be disabled to allow CLI to get tokens from LLAP in a secure\n" +
+ "cluster by setting it to true or 'except_llap_owner' (the latter returns such tokens\n" +
+ "to everyone except the user LLAP cluster is authenticating under)."),
// Hadoop DelegationTokenManager default is 1 week.
LLAP_DELEGATION_TOKEN_LIFETIME("hive.llap.daemon.delegation.token.lifetime", "14d",
@@ -2725,11 +2731,6 @@ public class HiveConf extends Configuration {
"RPC port for LLAP daemon management service."),
LLAP_WEB_AUTO_AUTH("hive.llap.auto.auth", false,
"Whether or not to set Hadoop configs to enable auth in LLAP web app."),
- LLAP_CREATE_TOKEN_LOCALLY("hive.llap.create.token.locally", "hs2",
- new StringSet("true", "hs2", "false"),
- "Whether to create LLAP tokens locally, saving directly to ZooKeeper SecretManager.\n" +
- "Requires one to have access to ZK paths; in other words, this should only be used in\n" +
- "HiveServer2. By default, the value is 'hs2', which means exactly that."),
LLAP_DAEMON_RPC_NUM_HANDLERS("hive.llap.daemon.rpc.num.handlers", 5,
"Number of RPC handlers for LLAP daemon.", "llap.daemon.rpc.num.handlers"),
http://git-wip-us.apache.org/repos/asf/hive/blob/9fe3dab7/llap-client/src/java/org/apache/hadoop/hive/llap/security/LlapTokenClient.java
----------------------------------------------------------------------
diff --git a/llap-client/src/java/org/apache/hadoop/hive/llap/security/LlapTokenClient.java b/llap-client/src/java/org/apache/hadoop/hive/llap/security/LlapTokenClient.java
new file mode 100644
index 0000000..921e050
--- /dev/null
+++ b/llap-client/src/java/org/apache/hadoop/hive/llap/security/LlapTokenClient.java
@@ -0,0 +1,148 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.llap.security;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import javax.net.SocketFactory;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.llap.impl.LlapManagementProtocolClientImpl;
+import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.GetTokenRequestProto;
+import org.apache.hadoop.hive.llap.registry.impl.LlapRegistryService;
+import org.apache.hadoop.hive.llap.registry.ServiceInstance;
+import org.apache.hadoop.hive.llap.registry.ServiceInstanceSet;
+import org.apache.hadoop.hive.llap.security.LlapTokenIdentifier;
+import org.apache.hadoop.io.DataInputByteBuffer;
+import org.apache.hadoop.io.retry.RetryPolicies;
+import org.apache.hadoop.io.retry.RetryPolicy;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.protobuf.ByteString;
+import com.google.protobuf.ServiceException;
+
+public class LlapTokenClient {
+ private static final Logger LOG = LoggerFactory.getLogger(LlapTokenClient.class);
+
+ private final LlapRegistryService registry;
+ private final SocketFactory socketFactory;
+ private final RetryPolicy retryPolicy;
+ private final Configuration conf;
+ private ServiceInstanceSet activeInstances;
+ private Collection<ServiceInstance> lastKnownInstances;
+ private LlapManagementProtocolClientImpl client;
+ private ServiceInstance clientInstance;
+
+ public LlapTokenClient(Configuration conf) {
+ this.conf = conf;
+ registry = new LlapRegistryService(false);
+ registry.init(conf);
+ socketFactory = NetUtils.getDefaultSocketFactory(conf);
+ retryPolicy = RetryPolicies.retryUpToMaximumTimeWithFixedSleep(
+ 16000, 2000l, TimeUnit.MILLISECONDS);
+ }
+
+ public Token<LlapTokenIdentifier> getDelegationToken(String appId) throws IOException {
+ if (!UserGroupInformation.isSecurityEnabled()) return null;
+ Iterator<ServiceInstance> llaps = null;
+ if (clientInstance == null) {
+ assert client == null;
+ llaps = getLlapServices(false).iterator();
+ clientInstance = llaps.next();
+ }
+
+ ByteString tokenBytes = null;
+ boolean hasRefreshed = false;
+ while (true) {
+ try {
+ tokenBytes = getTokenBytes(appId);
+ break;
+ } catch (IOException | ServiceException ex) {
+ LOG.error("Cannot get a token, trying a different instance", ex);
+ client = null;
+ clientInstance = null;
+ }
+ if (llaps == null || !llaps.hasNext()) {
+ if (hasRefreshed) { // Only refresh once.
+ throw new RuntimeException("Cannot find any LLAPs to get the token from");
+ }
+ llaps = getLlapServices(true).iterator();
+ hasRefreshed = true;
+ }
+ clientInstance = llaps.next();
+ }
+
+ Token<LlapTokenIdentifier> token = extractToken(tokenBytes);
+ if (LOG.isInfoEnabled()) {
+ LOG.info("Obtained a LLAP delegation token from " + clientInstance + ": " + token);
+ }
+ return token;
+ }
+
+ private Token<LlapTokenIdentifier> extractToken(ByteString tokenBytes) throws IOException {
+ Token<LlapTokenIdentifier> token = new Token<>();
+ DataInputByteBuffer in = new DataInputByteBuffer();
+ in.reset(tokenBytes.asReadOnlyByteBuffer());
+ token.readFields(in);
+ return token;
+ }
+
+ private ByteString getTokenBytes(final String appId) throws IOException, ServiceException {
+ assert clientInstance != null;
+ if (client == null) {
+ client = new LlapManagementProtocolClientImpl(conf, clientInstance.getHost(),
+ clientInstance.getManagementPort(), retryPolicy, socketFactory);
+ }
+ GetTokenRequestProto.Builder req = GetTokenRequestProto.newBuilder();
+ if (!StringUtils.isBlank(appId)) {
+ req.setAppId(appId);
+ }
+ return client.getDelegationToken(null, req.build()).getToken();
+ }
+
+ /** Synchronized - LLAP registry and instance set are not thread safe. */
+ private synchronized List<ServiceInstance> getLlapServices(
+ boolean doForceRefresh) throws IOException {
+ if (!doForceRefresh && lastKnownInstances != null) {
+ return new ArrayList<>(lastKnownInstances);
+ }
+ if (activeInstances == null) {
+ registry.start();
+ activeInstances = registry.getInstances();
+ }
+ Map<String, ServiceInstance> daemons = activeInstances.getAll();
+ if (daemons == null || daemons.isEmpty()) {
+ throw new RuntimeException("No LLAPs found");
+ }
+ lastKnownInstances = daemons.values();
+ return new ArrayList<ServiceInstance>(lastKnownInstances);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/9fe3dab7/llap-client/src/java/org/apache/hadoop/hive/llap/security/LlapTokenClientFactory.java
----------------------------------------------------------------------
diff --git a/llap-client/src/java/org/apache/hadoop/hive/llap/security/LlapTokenClientFactory.java b/llap-client/src/java/org/apache/hadoop/hive/llap/security/LlapTokenClientFactory.java
deleted file mode 100644
index ebc91b1..0000000
--- a/llap-client/src/java/org/apache/hadoop/hive/llap/security/LlapTokenClientFactory.java
+++ /dev/null
@@ -1,160 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hive.llap.security;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.TimeUnit;
-
-import javax.net.SocketFactory;
-
-import org.apache.commons.lang3.StringUtils;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hive.llap.impl.LlapManagementProtocolClientImpl;
-import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.GetTokenRequestProto;
-import org.apache.hadoop.hive.llap.registry.impl.LlapRegistryService;
-import org.apache.hadoop.hive.llap.registry.ServiceInstance;
-import org.apache.hadoop.hive.llap.registry.ServiceInstanceSet;
-import org.apache.hadoop.hive.llap.security.LlapTokenIdentifier;
-import org.apache.hadoop.io.DataInputByteBuffer;
-import org.apache.hadoop.io.retry.RetryPolicies;
-import org.apache.hadoop.io.retry.RetryPolicy;
-import org.apache.hadoop.net.NetUtils;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.security.token.Token;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.protobuf.ByteString;
-import com.google.protobuf.ServiceException;
-
-public class LlapTokenClientFactory {
- private static final Logger LOG = LoggerFactory.getLogger(LlapTokenClientFactory.class);
-
- private final LlapRegistryService registry;
- private final SocketFactory socketFactory;
- private final RetryPolicy retryPolicy;
- private final Configuration conf;
- private ServiceInstanceSet activeInstances;
- private Collection<ServiceInstance> lastKnownInstances;
-
- public LlapTokenClientFactory(Configuration conf) {
- this.conf = conf;
- registry = new LlapRegistryService(false);
- registry.init(conf);
- socketFactory = NetUtils.getDefaultSocketFactory(conf);
- retryPolicy = RetryPolicies.retryUpToMaximumTimeWithFixedSleep(
- 16000, 2000l, TimeUnit.MILLISECONDS);
- }
-
- public interface Client {
- Token<LlapTokenIdentifier> getDelegationToken(String appId) throws IOException;
- }
-
- public Client createClient() {
- return new ClientImpl(); // Client is separate from factory mostly for thread-safety reasons.
- }
-
- private class ClientImpl implements Client {
- private LlapManagementProtocolClientImpl client;
- private ServiceInstance clientInstance;
-
- @Override
- public Token<LlapTokenIdentifier> getDelegationToken(String appId) throws IOException {
- if (!UserGroupInformation.isSecurityEnabled()) return null;
- Iterator<ServiceInstance> llaps = null;
- if (clientInstance == null) {
- assert client == null;
- llaps = getLlapServices(false).iterator();
- clientInstance = llaps.next();
- }
-
- ByteString tokenBytes = null;
- boolean hasRefreshed = false;
- while (true) {
- try {
- tokenBytes = getTokenBytes(appId);
- break;
- } catch (IOException | ServiceException ex) {
- LOG.error("Cannot get a token, trying a different instance", ex);
- client = null;
- clientInstance = null;
- }
- if (llaps == null || !llaps.hasNext()) {
- if (hasRefreshed) { // Only refresh once.
- throw new RuntimeException("Cannot find any LLAPs to get the token from");
- }
- llaps = getLlapServices(true).iterator();
- hasRefreshed = true;
- }
- clientInstance = llaps.next();
- }
-
- Token<LlapTokenIdentifier> token = extractToken(tokenBytes);
- if (LOG.isInfoEnabled()) {
- LOG.info("Obtained a LLAP delegation token from " + clientInstance + ": " + token);
- }
- return token;
- }
-
- private Token<LlapTokenIdentifier> extractToken(ByteString tokenBytes) throws IOException {
- Token<LlapTokenIdentifier> token = new Token<>();
- DataInputByteBuffer in = new DataInputByteBuffer();
- in.reset(tokenBytes.asReadOnlyByteBuffer());
- token.readFields(in);
- return token;
- }
-
- private ByteString getTokenBytes(final String appId) throws IOException, ServiceException {
- assert clientInstance != null;
- if (client == null) {
- client = new LlapManagementProtocolClientImpl(conf, clientInstance.getHost(),
- clientInstance.getManagementPort(), retryPolicy, socketFactory);
- }
- GetTokenRequestProto.Builder req = GetTokenRequestProto.newBuilder();
- if (!StringUtils.isBlank(appId)) {
- req.setAppId(appId);
- }
- return client.getDelegationToken(null, req.build()).getToken();
- }
- }
-
- /** Synchronized - LLAP registry and instance set are not thread safe. */
- private synchronized List<ServiceInstance> getLlapServices(
- boolean doForceRefresh) throws IOException {
- if (!doForceRefresh && lastKnownInstances != null) {
- return new ArrayList<>(lastKnownInstances);
- }
- if (activeInstances == null) {
- registry.start();
- activeInstances = registry.getInstances();
- }
- Map<String, ServiceInstance> daemons = activeInstances.getAll();
- if (daemons == null || daemons.isEmpty()) {
- throw new RuntimeException("No LLAPs found");
- }
- lastKnownInstances = daemons.values();
- return new ArrayList<ServiceInstance>(lastKnownInstances);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/hive/blob/9fe3dab7/llap-client/src/java/org/apache/hadoop/hive/llap/security/LlapTokenLocalClient.java
----------------------------------------------------------------------
diff --git a/llap-client/src/java/org/apache/hadoop/hive/llap/security/LlapTokenLocalClient.java b/llap-client/src/java/org/apache/hadoop/hive/llap/security/LlapTokenLocalClient.java
index f10351b..af889b6 100644
--- a/llap-client/src/java/org/apache/hadoop/hive/llap/security/LlapTokenLocalClient.java
+++ b/llap-client/src/java/org/apache/hadoop/hive/llap/security/LlapTokenLocalClient.java
@@ -22,7 +22,6 @@ import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
import org.apache.hadoop.security.token.Token;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -32,20 +31,21 @@ public class LlapTokenLocalClient {
private final SecretManager secretManager;
public LlapTokenLocalClient(Configuration conf, String clusterId) {
+ // TODO: create this centrally in HS2 case
secretManager = SecretManager.createSecretManager(conf, clusterId);
}
- public Token<LlapTokenIdentifier> createToken(String appId, String user) throws IOException {
+ public Token<LlapTokenIdentifier> createToken(
+ String appId, String user, boolean isSignatureRequired) throws IOException {
try {
- Token<LlapTokenIdentifier> token = secretManager.createLlapToken(appId, user);
+ Token<LlapTokenIdentifier> token = secretManager.createLlapToken(
+ appId, user, isSignatureRequired);
if (LOG.isInfoEnabled()) {
LOG.info("Created a LLAP delegation token locally: " + token);
}
return token;
} catch (Exception ex) {
- throw new IOException("Failed to create LLAP token locally. You might need to set "
- + ConfVars.LLAP_CREATE_TOKEN_LOCALLY.varname
- + " to false, or make sure you can access secure ZK paths.", ex);
+ throw (ex instanceof IOException) ? (IOException)ex : new IOException(ex);
}
}
http://git-wip-us.apache.org/repos/asf/hive/blob/9fe3dab7/llap-common/src/java/org/apache/hadoop/hive/llap/security/LlapSigner.java
----------------------------------------------------------------------
diff --git a/llap-common/src/java/org/apache/hadoop/hive/llap/security/LlapSigner.java b/llap-common/src/java/org/apache/hadoop/hive/llap/security/LlapSigner.java
new file mode 100644
index 0000000..478a40a
--- /dev/null
+++ b/llap-common/src/java/org/apache/hadoop/hive/llap/security/LlapSigner.java
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.llap.security;
+
+import java.io.IOException;
+
+public interface LlapSigner {
+ /** An object signable by a signer. */
+ public interface Signable {
+ /** Called by the signer to record key information as part of the message to be signed. */
+ void setSignInfo(int masterKeyId, String user);
+ /** Called by the signer to get the serialized representation of the message to be signed. */
+ byte[] serialize() throws IOException;
+ }
+
+ /** Message with the signature. */
+ public static final class SignedMessage {
+ public byte[] message, signature;
+ }
+
+ /** Serializes and signs the message. */
+ SignedMessage serializeAndSign(Signable message) throws IOException;
+
+ void checkSignature(byte[] message, byte[] signature, int keyId);
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hive/blob/9fe3dab7/llap-common/src/java/org/apache/hadoop/hive/llap/security/LlapTokenIdentifier.java
----------------------------------------------------------------------
diff --git a/llap-common/src/java/org/apache/hadoop/hive/llap/security/LlapTokenIdentifier.java b/llap-common/src/java/org/apache/hadoop/hive/llap/security/LlapTokenIdentifier.java
index 7c47f0b..08c141f 100644
--- a/llap-common/src/java/org/apache/hadoop/hive/llap/security/LlapTokenIdentifier.java
+++ b/llap-common/src/java/org/apache/hadoop/hive/llap/security/LlapTokenIdentifier.java
@@ -30,23 +30,24 @@ import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdenti
import com.google.common.base.Preconditions;
-/** For now, a LLAP token gives access to any LLAP server. */
public class LlapTokenIdentifier extends AbstractDelegationTokenIdentifier {
private static final String KIND = "LLAP_TOKEN";
public static final Text KIND_NAME = new Text(KIND);
private String clusterId;
private String appId;
+ private boolean isSigningRequired;
public LlapTokenIdentifier() {
super();
}
public LlapTokenIdentifier(Text owner, Text renewer, Text realUser,
- String clusterId, String appId) {
+ String clusterId, String appId, boolean isSigningRequired) {
super(owner, renewer, realUser);
Preconditions.checkNotNull(clusterId);
this.clusterId = clusterId;
this.appId = appId == null ? "" : appId;
+ this.isSigningRequired = isSigningRequired;
}
@Override
@@ -54,6 +55,7 @@ public class LlapTokenIdentifier extends AbstractDelegationTokenIdentifier {
super.write(out);
out.writeUTF(clusterId);
out.writeUTF(appId);
+ out.writeBoolean(isSigningRequired);
}
@Override
@@ -62,6 +64,7 @@ public class LlapTokenIdentifier extends AbstractDelegationTokenIdentifier {
clusterId = in.readUTF();
Preconditions.checkNotNull(clusterId);
appId = in.readUTF();
+ isSigningRequired = in.readBoolean();
appId = appId == null ? "" : appId;
}
@@ -78,10 +81,15 @@ public class LlapTokenIdentifier extends AbstractDelegationTokenIdentifier {
return clusterId;
}
+ public boolean isSigningRequired() {
+ return isSigningRequired;
+ }
+
@Override
public int hashCode() {
final int prime = 31;
int result = prime * super.hashCode() + (StringUtils.isBlank(appId) ? 0 : appId.hashCode());
+ result = prime * result + (isSigningRequired ? 1231 : 1237);
return prime * result + ((clusterId == null) ? 0 : clusterId.hashCode());
}
@@ -90,7 +98,7 @@ public class LlapTokenIdentifier extends AbstractDelegationTokenIdentifier {
if (this == obj) return true;
if (!(obj instanceof LlapTokenIdentifier) || !super.equals(obj)) return false;
LlapTokenIdentifier other = (LlapTokenIdentifier) obj;
- return (StringUtils.isBlank(appId)
+ return isSigningRequired == other.isSigningRequired && (StringUtils.isBlank(appId)
? StringUtils.isBlank(other.appId) : appId.equals(other.appId))
&& (clusterId == null ? other.clusterId == null : clusterId.equals(other.clusterId));
}
http://git-wip-us.apache.org/repos/asf/hive/blob/9fe3dab7/llap-common/src/java/org/apache/hadoop/hive/llap/security/SecretManager.java
----------------------------------------------------------------------
diff --git a/llap-common/src/java/org/apache/hadoop/hive/llap/security/SecretManager.java b/llap-common/src/java/org/apache/hadoop/hive/llap/security/SecretManager.java
index 465b204..5aa4b84 100644
--- a/llap-common/src/java/org/apache/hadoop/hive/llap/security/SecretManager.java
+++ b/llap-common/src/java/org/apache/hadoop/hive/llap/security/SecretManager.java
@@ -1,7 +1,11 @@
-/*
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
@@ -27,12 +31,14 @@ import org.apache.hadoop.hive.llap.security.LlapTokenIdentifier;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.delegation.DelegationKey;
import org.apache.hadoop.security.token.delegation.ZKDelegationTokenSecretManager;
import org.apache.hadoop.security.token.delegation.web.DelegationTokenManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class SecretManager extends ZKDelegationTokenSecretManager<LlapTokenIdentifier> {
+public class SecretManager extends ZKDelegationTokenSecretManager<LlapTokenIdentifier>
+ implements SigningSecretManager {
private static final Logger LOG = LoggerFactory.getLogger(SecretManager.class);
private final String clusterId;
@@ -86,43 +92,76 @@ public class SecretManager extends ZKDelegationTokenSecretManager<LlapTokenIdent
return id;
}
- public static SecretManager createSecretManager(final Configuration conf, String clusterId) {
- String llapPrincipal = HiveConf.getVar(conf, ConfVars.LLAP_KERBEROS_PRINCIPAL),
- llapKeytab = HiveConf.getVar(conf, ConfVars.LLAP_KERBEROS_KEYTAB_FILE);
- return SecretManager.createSecretManager(conf, llapPrincipal, llapKeytab, clusterId);
+ @Override
+ public synchronized DelegationKey getCurrentKey() {
+ return allKeys.get(getCurrentKeyId());
}
+ @Override
+ public byte[] signWithKey(byte[] message, DelegationKey key) {
+ return createPassword(message, key.getKey());
+ }
- public static SecretManager createSecretManager(final Configuration conf,
- String llapPrincipal, String llapKeytab, final String clusterId) {
- // Create ZK connection under a separate ugi (if specified) - ZK works in mysterious ways.
- UserGroupInformation zkUgi = null;
- String principal = HiveConf.getVar(conf, ConfVars.LLAP_ZKSM_KERBEROS_PRINCIPAL, llapPrincipal);
- String keyTab = HiveConf.getVar(conf, ConfVars.LLAP_ZKSM_KERBEROS_KEYTAB_FILE, llapKeytab);
- try {
- zkUgi = LlapUtil.loginWithKerberos(principal, keyTab);
- } catch (IOException e) {
- throw new RuntimeException(e);
+ @Override
+ public byte[] signWithKey(byte[] message, int keyId) throws SecurityException {
+ DelegationKey key = getDelegationKey(keyId);
+ if (key == null) {
+ throw new SecurityException("The key ID " + keyId + " was not found");
}
- // Override the default delegation token lifetime for LLAP.
- // Also set all the necessary ZK settings to defaults and LLAP configs, if not set.
- final Configuration zkConf = new Configuration(conf);
+ return createPassword(message, key.getKey());
+ }
+
+ static final class LlapZkConf {
+ public Configuration zkConf;
+ public UserGroupInformation zkUgi;
+ public LlapZkConf(Configuration zkConf, UserGroupInformation zkUgi) {
+ this.zkConf = zkConf;
+ this.zkUgi = zkUgi;
+ }
+ }
+
+ private static LlapZkConf createLlapZkConf(
+ Configuration conf, String llapPrincipal, String llapKeytab, String clusterId) {
+ String principal = HiveConf.getVar(conf, ConfVars.LLAP_ZKSM_KERBEROS_PRINCIPAL, llapPrincipal);
+ String keyTab = HiveConf.getVar(conf, ConfVars.LLAP_ZKSM_KERBEROS_KEYTAB_FILE, llapKeytab);
+ // Override the default delegation token lifetime for LLAP.
+ // Also set all the necessary ZK settings to defaults and LLAP configs, if not set.
+ final Configuration zkConf = new Configuration(conf);
long tokenLifetime = HiveConf.getTimeVar(
conf, ConfVars.LLAP_DELEGATION_TOKEN_LIFETIME, TimeUnit.SECONDS);
zkConf.setLong(DelegationTokenManager.MAX_LIFETIME, tokenLifetime);
zkConf.setLong(DelegationTokenManager.RENEW_INTERVAL, tokenLifetime);
zkConf.set(SecretManager.ZK_DTSM_ZK_KERBEROS_PRINCIPAL, principal);
zkConf.set(SecretManager.ZK_DTSM_ZK_KERBEROS_KEYTAB, keyTab);
- String zkPath = clusterId;
+ String zkPath = "zkdtsm_" + clusterId;
LOG.info("Using {} as ZK secret manager path", zkPath);
- zkConf.set(SecretManager.ZK_DTSM_ZNODE_WORKING_PATH, "zkdtsm_" + zkPath);
+ zkConf.set(SecretManager.ZK_DTSM_ZNODE_WORKING_PATH, zkPath);
setZkConfIfNotSet(zkConf, SecretManager.ZK_DTSM_ZK_AUTH_TYPE, "sasl");
setZkConfIfNotSet(zkConf, SecretManager.ZK_DTSM_ZK_CONNECTION_STRING,
HiveConf.getVar(zkConf, ConfVars.LLAP_ZKSM_ZK_CONNECTION_STRING));
- return zkUgi.doAs(new PrivilegedAction<SecretManager>() {
+
+ UserGroupInformation zkUgi = null;
+ try {
+ zkUgi = LlapUtil.loginWithKerberos(principal, keyTab);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ return new LlapZkConf(zkConf, zkUgi);
+ }
+
+ public static SecretManager createSecretManager(final Configuration conf, String clusterId) {
+ String llapPrincipal = HiveConf.getVar(conf, ConfVars.LLAP_KERBEROS_PRINCIPAL),
+ llapKeytab = HiveConf.getVar(conf, ConfVars.LLAP_KERBEROS_KEYTAB_FILE);
+ return SecretManager.createSecretManager(conf, llapPrincipal, llapKeytab, clusterId);
+ }
+
+ public static SecretManager createSecretManager(
+ final Configuration conf, String llapPrincipal, String llapKeytab, final String clusterId) {
+ final LlapZkConf c = createLlapZkConf(conf, llapPrincipal, llapKeytab, clusterId);
+ return c.zkUgi.doAs(new PrivilegedAction<SecretManager>() {
@Override
public SecretManager run() {
- SecretManager zkSecretManager = new SecretManager(zkConf, clusterId);
+ SecretManager zkSecretManager = new SecretManager(c.zkConf, clusterId);
try {
zkSecretManager.startThreads();
} catch (IOException e) {
@@ -138,7 +177,8 @@ public class SecretManager extends ZKDelegationTokenSecretManager<LlapTokenIdent
zkConf.set(name, value);
}
- public Token<LlapTokenIdentifier> createLlapToken(String appId, String user) throws IOException {
+ public Token<LlapTokenIdentifier> createLlapToken(
+ String appId, String user, boolean isSignatureRequired) throws IOException {
Text realUser = null, renewer = null;
if (user == null) {
UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
@@ -151,7 +191,7 @@ public class SecretManager extends ZKDelegationTokenSecretManager<LlapTokenIdent
renewer = new Text(user);
}
LlapTokenIdentifier llapId = new LlapTokenIdentifier(
- new Text(user), renewer, realUser, clusterId, appId);
+ new Text(user), renewer, realUser, clusterId, appId, isSignatureRequired);
// TODO: note that the token is not renewable right now and will last for 2 weeks by default.
Token<LlapTokenIdentifier> token = new Token<LlapTokenIdentifier>(llapId, this);
if (LOG.isInfoEnabled()) {
http://git-wip-us.apache.org/repos/asf/hive/blob/9fe3dab7/llap-common/src/java/org/apache/hadoop/hive/llap/security/SigningSecretManager.java
----------------------------------------------------------------------
diff --git a/llap-common/src/java/org/apache/hadoop/hive/llap/security/SigningSecretManager.java b/llap-common/src/java/org/apache/hadoop/hive/llap/security/SigningSecretManager.java
new file mode 100644
index 0000000..067a98e
--- /dev/null
+++ b/llap-common/src/java/org/apache/hadoop/hive/llap/security/SigningSecretManager.java
@@ -0,0 +1,26 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.llap.security;
+
+import org.apache.hadoop.security.token.delegation.DelegationKey;
+
+public interface SigningSecretManager {
+ DelegationKey getCurrentKey();
+ byte[] signWithKey(byte[] message, DelegationKey key);
+ byte[] signWithKey(byte[] message, int keyId) throws SecurityException;
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hive/blob/9fe3dab7/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java
index 2524dc2..d439c07 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java
@@ -27,11 +27,13 @@ import java.util.concurrent.atomic.AtomicReference;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
+import org.apache.hadoop.hive.llap.DaemonId;
import org.apache.hadoop.hive.llap.daemon.ContainerRunner;
import org.apache.hadoop.hive.llap.daemon.FragmentCompletionHandler;
import org.apache.hadoop.hive.llap.daemon.HistoryLogger;
import org.apache.hadoop.hive.llap.daemon.KilledTaskHandler;
import org.apache.hadoop.hive.llap.daemon.QueryFailedHandler;
+import org.apache.hadoop.hive.llap.daemon.impl.LlapTokenChecker.LlapTokenInfo;
import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.FragmentRuntimeInfo;
import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.GroupInputSpecProto;
import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.IOSpecProto;
@@ -46,11 +48,14 @@ import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmitWor
import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.TerminateFragmentRequestProto;
import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.TerminateFragmentResponseProto;
import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.VertexIdentifier;
+import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.VertexOrBinary;
import org.apache.hadoop.hive.llap.metrics.LlapDaemonExecutorMetrics;
+import org.apache.hadoop.hive.llap.security.LlapSignerImpl;
import org.apache.hadoop.hive.llap.tez.Converters;
import org.apache.hadoop.hive.ql.exec.tez.TezProcessor;
import org.apache.hadoop.io.DataInputBuffer;
import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.service.CompositeService;
import org.apache.hadoop.yarn.api.ApplicationConstants;
@@ -68,7 +73,8 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.base.Preconditions;
-// TODO Convert this to a CompositeService
+import com.google.protobuf.ByteString;
+
public class ContainerRunnerImpl extends CompositeService implements ContainerRunner, FragmentCompletionHandler, QueryFailedHandler {
// TODO Setup a set of threads to process incoming requests.
@@ -89,12 +95,14 @@ public class ContainerRunnerImpl extends CompositeService implements ContainerRu
private final TaskRunnerCallable.ConfParams confParams;
private final KilledTaskHandler killedTaskHandler = new KilledTaskHandlerImpl();
private final HadoopShim tezHadoopShim;
+ private final LlapSignerImpl signer;
+ private final String clusterId;
public ContainerRunnerImpl(Configuration conf, int numExecutors, int waitQueueSize,
boolean enablePreemption, String[] localDirsBase, AtomicReference<Integer> localShufflePort,
AtomicReference<InetSocketAddress> localAddress,
long totalMemoryAvailableBytes, LlapDaemonExecutorMetrics metrics,
- AMReporter amReporter, ClassLoader classLoader, String clusterId) {
+ AMReporter amReporter, ClassLoader classLoader, DaemonId daemonId) {
super("ContainerRunnerImpl");
this.conf = conf;
Preconditions.checkState(numExecutors > 0,
@@ -102,7 +110,10 @@ public class ContainerRunnerImpl extends CompositeService implements ContainerRu
this.localAddress = localAddress;
this.localShufflePort = localShufflePort;
this.amReporter = amReporter;
+ this.signer = UserGroupInformation.isSecurityEnabled()
+ ? new LlapSignerImpl(conf, daemonId) : null;
+ this.clusterId = daemonId.getClusterString();
this.queryTracker = new QueryTracker(conf, localDirsBase, clusterId);
addIfService(queryTracker);
String waitQueueSchedulerClassName = HiveConf.getVar(
@@ -153,9 +164,22 @@ public class ContainerRunnerImpl extends CompositeService implements ContainerRu
@Override
public SubmitWorkResponseProto submitWork(SubmitWorkRequestProto request) throws IOException {
- // TODO: also support binary. Actually, we should figure out the binary stuff here and
- // stop passing the protobuf around. We should pass around some plain objects/values.
- SignableVertexSpec vertex = request.getWorkSpec().getVertex();
+ VertexOrBinary vob = request.getWorkSpec();
+ SignableVertexSpec vertex = vob.hasVertex() ? vob.getVertex() : null;
+ ByteString vertexBinary = vob.hasVertexBinary() ? vob.getVertexBinary() : null;
+ if (vertex != null && vertexBinary != null) {
+ throw new IOException(
+ "Vertex and vertexBinary in VertexOrBinary cannot be set at the same time");
+ }
+ if (vertexBinary != null) {
+ vertex = SignableVertexSpec.parseFrom(vob.getVertexBinary());
+ }
+
+ LlapTokenInfo tokenInfo = LlapTokenChecker.getTokenInfo(clusterId);
+ if (tokenInfo.isSigningRequired) {
+ checkSignature(vertex, vertexBinary, request, tokenInfo.userName);
+ }
+
if (LOG.isInfoEnabled()) {
LOG.info("Queueing container for execution: " + stringifySubmitRequest(request, vertex));
}
@@ -166,6 +190,7 @@ public class ContainerRunnerImpl extends CompositeService implements ContainerRu
HistoryLogger.logFragmentStart(vId.getApplicationIdString(), request.getContainerIdString(),
localAddress.get().getHostName(), vertex.getDagName(), vId.getDagId(),
vertex.getVertexName(), request.getFragmentNumber(), request.getAttemptNumber());
+
// This is the start of container-annotated logging.
// TODO Reduce the length of this string. Way too verbose at the moment.
NDC.push(fragmentIdString);
@@ -194,7 +219,7 @@ public class ContainerRunnerImpl extends CompositeService implements ContainerRu
QueryFragmentInfo fragmentInfo = queryTracker.registerFragment(
queryIdentifier, vId.getApplicationIdString(), vertex.getDagName(), dagIdentifier,
vertex.getVertexName(), request.getFragmentNumber(), request.getAttemptNumber(),
- vertex.getUser(), vertex, jobToken, fragmentIdString);
+ vertex.getUser(), vertex, jobToken, fragmentIdString, tokenInfo);
String[] localDirs = fragmentInfo.getLocalDirs();
Preconditions.checkNotNull(localDirs);
@@ -208,7 +233,7 @@ public class ContainerRunnerImpl extends CompositeService implements ContainerRu
TaskRunnerCallable callable = new TaskRunnerCallable(request, fragmentInfo, callableConf,
new LlapExecutionContext(localAddress.get().getHostName(), queryTracker), env,
credentials, memoryPerExecutor, amReporter, confParams, metrics, killedTaskHandler,
- this, tezHadoopShim, attemptId);
+ this, tezHadoopShim, attemptId, vertex);
submissionState = executorService.schedule(callable);
if (LOG.isInfoEnabled()) {
@@ -233,6 +258,24 @@ public class ContainerRunnerImpl extends CompositeService implements ContainerRu
return responseBuilder.build();
}
+ private void checkSignature(SignableVertexSpec vertex, ByteString vertexBinary,
+ SubmitWorkRequestProto request, String tokenUserName) throws SecurityException, IOException {
+ if (!request.hasWorkSpecSignature()) {
+ throw new SecurityException("Unsigned fragment not allowed");
+ }
+ if (vertexBinary == null) {
+ ByteString.Output os = ByteString.newOutput();
+ vertex.writeTo(os);
+ vertexBinary = os.toByteString();
+ }
+ signer.checkSignature(vertexBinary.toByteArray(),
+ request.getWorkSpecSignature().toByteArray(), (int)vertex.getSignatureKeyId());
+ if (!vertex.hasUser() || !vertex.getUser().equals(tokenUserName)) {
+ throw new SecurityException("LLAP token is for " + tokenUserName
+ + " but the fragment is for " + (vertex.hasUser() ? vertex.getUser() : null));
+ }
+ }
+
private static class LlapExecutionContext extends ExecutionContextImpl
implements TezProcessor.Hook {
private final QueryTracker queryTracker;
http://git-wip-us.apache.org/repos/asf/hive/blob/9fe3dab7/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java
index 5ab7b3c..2faedcd 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java
@@ -248,7 +248,7 @@ public class LlapDaemon extends CompositeService implements ContainerRunner, Lla
this.containerRunner = new ContainerRunnerImpl(daemonConf, numExecutors, waitQueueSize,
enablePreemption, localDirs, this.shufflePort, srvAddress, executorMemoryBytes, metrics,
- amReporter, executorClassLoader, daemonId.getClusterString());
+ amReporter, executorClassLoader, daemonId);
addIfService(containerRunner);
// Not adding the registry as a service, since we need to control when it is initialized - conf used to pickup properties.
http://git-wip-us.apache.org/repos/asf/hive/blob/9fe3dab7/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapProtocolServerImpl.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapProtocolServerImpl.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapProtocolServerImpl.java
index b94fc2e..7ccd28f 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapProtocolServerImpl.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapProtocolServerImpl.java
@@ -45,7 +45,6 @@ import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmitWor
import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmitWorkResponseProto;
import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.TerminateFragmentRequestProto;
import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.TerminateFragmentResponseProto;
-import org.apache.hadoop.io.Text;
import org.apache.hadoop.ipc.ProtobufRpcEngine;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.net.NetUtils;
@@ -64,6 +63,9 @@ public class LlapProtocolServerImpl extends AbstractService
implements LlapProtocolBlockingPB, LlapManagementProtocolPB {
private static final Logger LOG = LoggerFactory.getLogger(LlapProtocolServerImpl.class);
+ private enum TokenRequiresSigning {
+ TRUE, FALSE, EXCEPT_OWNER
+ }
private final int numHandlers;
private final ContainerRunner containerRunner;
@@ -71,8 +73,10 @@ public class LlapProtocolServerImpl extends AbstractService
private RPC.Server server, mngServer;
private final AtomicReference<InetSocketAddress> srvAddress, mngAddress;
private SecretManager zkSecretManager;
- private String restrictedToUser = null;
+ private String clusterUser = null;
+ private boolean isRestrictedToClusterUser = false;
private final DaemonId daemonId;
+ private TokenRequiresSigning isSigningRequiredConfig = TokenRequiresSigning.TRUE;
public LlapProtocolServerImpl(int numHandlers, ContainerRunner containerRunner,
AtomicReference<InetSocketAddress> srvAddress, AtomicReference<InetSocketAddress> mngAddress,
@@ -132,6 +136,7 @@ public class LlapProtocolServerImpl extends AbstractService
@Override
public void serviceStart() {
final Configuration conf = getConfig();
+ isSigningRequiredConfig = getSigningConfig(conf);
final BlockingService daemonImpl =
LlapDaemonProtocolProtos.LlapDaemonProtocol.newReflectiveBlockingService(this);
final BlockingService managementImpl =
@@ -140,13 +145,14 @@ public class LlapProtocolServerImpl extends AbstractService
startProtocolServers(conf, daemonImpl, managementImpl);
return;
}
+ try {
+ this.clusterUser = UserGroupInformation.getCurrentUser().getShortUserName();
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
if (isPermissiveManagementAcl(conf)) {
LOG.warn("Management protocol has a '*' ACL.");
- try {
- this.restrictedToUser = UserGroupInformation.getCurrentUser().getShortUserName();
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
+ isRestrictedToClusterUser = true;
}
String llapPrincipal = HiveConf.getVar(conf, ConfVars.LLAP_KERBEROS_PRINCIPAL),
llapKeytab = HiveConf.getVar(conf, ConfVars.LLAP_KERBEROS_KEYTAB_FILE);
@@ -169,6 +175,20 @@ public class LlapProtocolServerImpl extends AbstractService
});
}
+ private static TokenRequiresSigning getSigningConfig(final Configuration conf) {
+ String signSetting = HiveConf.getVar(
+ conf, ConfVars.LLAP_REMOTE_TOKEN_REQUIRES_SIGNING).toLowerCase();
+ switch (signSetting) {
+ case "true": return TokenRequiresSigning.TRUE;
+ case "except_llap_owner": return TokenRequiresSigning.EXCEPT_OWNER;
+ case "false": return TokenRequiresSigning.FALSE;
+ default: {
+ throw new RuntimeException("Invalid value for "
+ + ConfVars.LLAP_REMOTE_TOKEN_REQUIRES_SIGNING.varname + ": " + signSetting);
+ }
+ }
+ }
+
private static boolean isPermissiveManagementAcl(Configuration conf) {
return HiveConf.getBoolVar(conf, ConfVars.LLAP_VALIDATE_ACLS)
&& AccessControlList.WILDCARD_ACL_VALUE.equals(
@@ -266,17 +286,20 @@ public class LlapProtocolServerImpl extends AbstractService
if (zkSecretManager == null) {
throw new ServiceException("Operation not supported on unsecure cluster");
}
- UserGroupInformation ugi = null;
+ UserGroupInformation callingUser = null;
Token<LlapTokenIdentifier> token = null;
try {
- ugi = UserGroupInformation.getCurrentUser();
- token = zkSecretManager.createLlapToken(request.getAppId(), null);
+ callingUser = UserGroupInformation.getCurrentUser();
+ // Determine if the user would need to sign fragments.
+ boolean isSigningRequired = determineIfSigningIsRequired(callingUser);
+ token = zkSecretManager.createLlapToken(
+ request.hasAppId() ? request.getAppId() : null, null, isSigningRequired);
} catch (IOException e) {
throw new ServiceException(e);
}
- if (restrictedToUser != null && !restrictedToUser.equals(ugi.getShortUserName())) {
+ if (isRestrictedToClusterUser && !clusterUser.equals(callingUser.getShortUserName())) {
throw new ServiceException("Management protocol ACL is too permissive. The access has been"
- + " automatically restricted to " + restrictedToUser + "; " + ugi.getShortUserName()
+ + " automatically restricted to " + clusterUser + "; " + callingUser.getShortUserName()
+ " is denied acccess. Please set " + ConfVars.LLAP_VALIDATE_ACLS.varname + " to false,"
+ " or adjust " + ConfVars.LLAP_MANAGEMENT_ACL.varname + " and "
+ ConfVars.LLAP_MANAGEMENT_ACL_DENY.varname + " to a more restrictive ACL.");
@@ -292,4 +315,16 @@ public class LlapProtocolServerImpl extends AbstractService
GetTokenResponseProto response = GetTokenResponseProto.newBuilder().setToken(bs).build();
return response;
}
+
+ private boolean determineIfSigningIsRequired(UserGroupInformation callingUser) {
+ switch (isSigningRequiredConfig) {
+ case FALSE: return false;
+ case TRUE: return true;
+ // Note that this uses short user name without consideration for Kerberos realm.
+ // This seems to be the common approach (e.g. for HDFS permissions), but it may be
+ // better to consider the realm (although not the host, so not the full name).
+ case EXCEPT_OWNER: return !clusterUser.equals(callingUser.getShortUserName());
+ default: throw new AssertionError("Unknown value " + isSigningRequiredConfig);
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/hive/blob/9fe3dab7/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapTokenChecker.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapTokenChecker.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapTokenChecker.java
index 04df929..24a7737 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapTokenChecker.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapTokenChecker.java
@@ -23,8 +23,6 @@ import java.util.List;
import java.io.IOException;
import org.apache.commons.lang3.StringUtils;
-import org.apache.commons.lang3.tuple.ImmutablePair;
-import org.apache.commons.lang3.tuple.Pair;
import org.apache.hadoop.hive.llap.security.LlapTokenIdentifier;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.TokenIdentifier;
@@ -34,8 +32,20 @@ import org.slf4j.LoggerFactory;
public final class LlapTokenChecker {
private static final Logger LOG = LoggerFactory.getLogger(LlapTokenChecker.class);
- private static final ImmutablePair<String, String> NO_SECURITY = new ImmutablePair<>(null, null);
- public static Pair<String, String> getTokenInfo(String clusterId) throws IOException {
+ public static final class LlapTokenInfo {
+ public final String userName;
+ public final String appId;
+ public final boolean isSigningRequired;
+
+ public LlapTokenInfo(String userName, String appId, boolean isSigningRequired) {
+ this.userName = userName;
+ this.appId = appId;
+ this.isSigningRequired = isSigningRequired;
+ }
+ }
+
+ private static final LlapTokenInfo NO_SECURITY = new LlapTokenInfo(null, null, false);
+ public static LlapTokenInfo getTokenInfo(String clusterId) throws IOException {
if (!UserGroupInformation.isSecurityEnabled()) return NO_SECURITY;
UserGroupInformation current = UserGroupInformation.getCurrentUser();
String kerberosName = current.hasKerberosCredentials() ? current.getShortUserName() : null;
@@ -65,13 +75,14 @@ public final class LlapTokenChecker {
}
@VisibleForTesting
- static Pair<String, String> getTokenInfoInternal(
+ static LlapTokenInfo getTokenInfoInternal(
String kerberosName, List<LlapTokenIdentifier> tokens) {
assert (tokens != null && !tokens.isEmpty()) || kerberosName != null;
if (tokens == null) {
- return new ImmutablePair<String, String>(kerberosName, null);
+ return new LlapTokenInfo(kerberosName, null, true);
}
String userName = kerberosName, appId = null;
+ boolean isSigningRequired = false;
for (LlapTokenIdentifier llapId : tokens) {
String newUserName = llapId.getRealUser().toString();
if (userName != null && !userName.equals(newUserName)) {
@@ -88,9 +99,10 @@ public final class LlapTokenChecker {
}
appId = newAppId;
}
+ isSigningRequired = isSigningRequired || llapId.isSigningRequired();
}
assert userName != null;
- return new ImmutablePair<String, String>(userName, appId);
+ return new LlapTokenInfo(userName, appId, isSigningRequired);
}
public static void checkPermissions(
@@ -120,12 +132,12 @@ public final class LlapTokenChecker {
}
public static void checkPermissions(
- Pair<String, String> prm, String userName, String appId, Object hint) {
+ LlapTokenInfo prm, String userName, String appId, Object hint) {
if (userName == null) {
assert StringUtils.isEmpty(appId);
return;
}
- if (!checkTokenPermissions(userName, appId, prm.getLeft(), prm.getRight())) {
+ if (!checkTokenPermissions(userName, appId, prm.userName, prm.appId)) {
throw new SecurityException("Unauthorized to access "
+ userName + ", " + appId.hashCode() + " (" + hint + ")");
}
http://git-wip-us.apache.org/repos/asf/hive/blob/9fe3dab7/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryTracker.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryTracker.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryTracker.java
index c55436b..a965872 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryTracker.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryTracker.java
@@ -20,7 +20,6 @@ import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
-import org.apache.commons.lang3.tuple.Pair;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
@@ -31,6 +30,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
+import org.apache.hadoop.hive.llap.daemon.impl.LlapTokenChecker.LlapTokenInfo;
import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SignableVertexSpec;
import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SourceStateProto;
import org.apache.hadoop.hive.llap.shufflehandler.ShuffleHandler;
@@ -119,7 +119,7 @@ public class QueryTracker extends AbstractService {
QueryFragmentInfo registerFragment(QueryIdentifier queryIdentifier, String appIdString,
String dagName, int dagIdentifier, String vertexName, int fragmentNumber, int attemptNumber,
String user, SignableVertexSpec vertex, Token<JobTokenIdentifier> appToken,
- String fragmentIdString) throws IOException {
+ String fragmentIdString, LlapTokenInfo tokenInfo) throws IOException {
ReadWriteLock dagLock = getDagLock(queryIdentifier);
dagLock.readLock().lock();
try {
@@ -132,16 +132,18 @@ public class QueryTracker extends AbstractService {
}
// TODO: for now, we get the secure username out of UGI... after signing, we can take it
// out of the request provided that it's signed.
- Pair<String, String> tokenInfo = LlapTokenChecker.getTokenInfo(clusterId);
+ if (tokenInfo == null) {
+ tokenInfo = LlapTokenChecker.getTokenInfo(clusterId);
+ }
boolean isExistingQueryInfo = true;
QueryInfo queryInfo = queryInfoMap.get(queryIdentifier);
if (queryInfo == null) {
- String tokenUser = tokenInfo.getLeft(), tokenAppId = tokenInfo.getRight();
if (UserGroupInformation.isSecurityEnabled()) {
- Preconditions.checkNotNull(tokenUser);
+ Preconditions.checkNotNull(tokenInfo.userName);
}
queryInfo = new QueryInfo(queryIdentifier, appIdString, dagName, dagIdentifier, user,
- getSourceCompletionMap(queryIdentifier), localDirsBase, localFs, tokenUser, tokenAppId);
+ getSourceCompletionMap(queryIdentifier), localDirsBase, localFs,
+ tokenInfo.userName, tokenInfo.appId);
QueryInfo old = queryInfoMap.putIfAbsent(queryIdentifier, queryInfo);
if (old != null) {
queryInfo = old;
http://git-wip-us.apache.org/repos/asf/hive/blob/9fe3dab7/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java
index eac0e8f..1e302e8 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java
@@ -628,9 +628,9 @@ public class TaskExecutorService extends AbstractService implements Scheduler<Ta
String state = reason == null ? "FAILED" : reason.name();
boolean removed = preemptionQueue.remove(taskWrapper);
if (removed && isInfoEnabled) {
- LOG.info(TaskRunnerCallable
- .getTaskIdentifierString(taskWrapper.getTaskRunnerCallable().getRequest())
- + " request " + state + "! Removed from preemption list.");
+ TaskRunnerCallable trc = taskWrapper.getTaskRunnerCallable();
+ LOG.info(TaskRunnerCallable.getTaskIdentifierString(trc.getRequest(),
+ trc.getVertexSpec()) + " request " + state + "! Removed from preemption list.");
}
if (metrics != null) {
metrics.setExecutorNumPreemptableRequests(preemptionQueue.size());
http://git-wip-us.apache.org/repos/asf/hive/blob/9fe3dab7/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java
index 74359fa..0d9882b 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java
@@ -124,7 +124,8 @@ public class TaskRunnerCallable extends CallableWithNdc<TaskRunner2Result> {
ConfParams confParams, LlapDaemonExecutorMetrics metrics,
KilledTaskHandler killedTaskHandler,
FragmentCompletionHandler fragmentCompleteHandler,
- HadoopShim tezHadoopShim, TezTaskAttemptID attemptId) {
+ HadoopShim tezHadoopShim, TezTaskAttemptID attemptId,
+ SignableVertexSpec vertex) {
this.request = request;
this.fragmentInfo = fragmentInfo;
this.conf = conf;
@@ -135,8 +136,7 @@ public class TaskRunnerCallable extends CallableWithNdc<TaskRunner2Result> {
this.memoryAvailable = memoryAvailable;
this.confParams = confParams;
this.jobToken = TokenCache.getSessionToken(credentials);
- // TODO: support binary spec here or above
- this.vertex = request.getWorkSpec().getVertex();
+ this.vertex = vertex;
this.taskSpec = Converters.getTaskSpecfromProto(
vertex, request.getFragmentNumber(), request.getAttemptNumber(), attemptId);
this.amReporter = amReporter;
@@ -389,7 +389,7 @@ public class TaskRunnerCallable extends CallableWithNdc<TaskRunner2Result> {
}
public TaskRunnerCallback getCallback() {
- return new TaskRunnerCallback(request, this);
+ return new TaskRunnerCallback(request, vertex, this);
}
public SubmitWorkRequestProto getRequest() {
@@ -399,11 +399,13 @@ public class TaskRunnerCallable extends CallableWithNdc<TaskRunner2Result> {
final class TaskRunnerCallback implements FutureCallback<TaskRunner2Result> {
private final SubmitWorkRequestProto request;
+ private final SignableVertexSpec vertex;
private final TaskRunnerCallable taskRunnerCallable;
- TaskRunnerCallback(SubmitWorkRequestProto request,
+ TaskRunnerCallback(SubmitWorkRequestProto request, SignableVertexSpec vertex,
TaskRunnerCallable taskRunnerCallable) {
this.request = request;
+ this.vertex = vertex;
this.taskRunnerCallable = taskRunnerCallable;
}
@@ -463,7 +465,8 @@ public class TaskRunnerCallable extends CallableWithNdc<TaskRunner2Result> {
@Override
public void onFailure(Throwable t) {
- LOG.error("TezTaskRunner execution failed for : " + getTaskIdentifierString(request), t);
+ LOG.error("TezTaskRunner execution failed for : "
+ + getTaskIdentifierString(request, vertex), t);
isCompleted.set(true);
fragmentCompletionHanler.fragmentComplete(fragmentInfo);
// TODO HIVE-10236 Report a fatal error over the umbilical
@@ -494,10 +497,8 @@ public class TaskRunnerCallable extends CallableWithNdc<TaskRunner2Result> {
}
public static String getTaskIdentifierString(
- SubmitWorkRequestProto request) {
+ SubmitWorkRequestProto request, SignableVertexSpec vertex) {
StringBuilder sb = new StringBuilder();
- // TODO: also support the binary version
- SignableVertexSpec vertex = request.getWorkSpec().getVertex();
sb.append("AppId=").append(vertex.getVertexIdentifier().getApplicationIdString())
.append(", containerId=").append(request.getContainerIdString())
.append(", Dag=").append(vertex.getDagName())
http://git-wip-us.apache.org/repos/asf/hive/blob/9fe3dab7/llap-server/src/java/org/apache/hadoop/hive/llap/security/LlapSignerImpl.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/security/LlapSignerImpl.java b/llap-server/src/java/org/apache/hadoop/hive/llap/security/LlapSignerImpl.java
new file mode 100644
index 0000000..4174593
--- /dev/null
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/security/LlapSignerImpl.java
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.llap.security;
+
+import java.io.IOException;
+import java.util.Arrays;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.llap.DaemonId;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.delegation.DelegationKey;
+
+import com.google.common.annotations.VisibleForTesting;
+
+public class LlapSignerImpl implements LlapSigner {
+ private final SigningSecretManager secretManager;
+
+ public LlapSignerImpl(Configuration conf, DaemonId daemonId) {
+ // TODO: create this centrally in HS2 case
+ secretManager = SecretManager.createSecretManager(conf, daemonId.getClusterString());
+ }
+
+ @VisibleForTesting
+ public LlapSignerImpl(SigningSecretManager sm) {
+ secretManager = sm;
+ }
+
+ @Override
+ public SignedMessage serializeAndSign(Signable message) throws IOException {
+ SignedMessage result = new SignedMessage();
+ DelegationKey key = secretManager.getCurrentKey();
+ message.setSignInfo(key.getKeyId(), UserGroupInformation.getCurrentUser().getUserName());
+ result.message = message.serialize();
+ result.signature = secretManager.signWithKey(result.message, key);
+ return result;
+ }
+
+ @Override
+ public void checkSignature(byte[] message, byte[] signature, int keyId)
+ throws SecurityException {
+ byte[] expectedSignature = secretManager.signWithKey(message, keyId);
+ if (Arrays.equals(signature, expectedSignature)) return;
+ throw new SecurityException("Message signature does not match");
+ }
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/9fe3dab7/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorTestHelpers.java
----------------------------------------------------------------------
diff --git a/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorTestHelpers.java b/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorTestHelpers.java
index e0f0676..96d626a 100644
--- a/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorTestHelpers.java
+++ b/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorTestHelpers.java
@@ -150,7 +150,8 @@ public class TaskExecutorTestHelpers {
new ExecutionContextImpl("localhost"), null, new Credentials(), 0, mock(AMReporter.class), null, mock(
LlapDaemonExecutorMetrics.class),
mock(KilledTaskHandler.class), mock(
- FragmentCompletionHandler.class), new DefaultHadoopShim(), null);
+ FragmentCompletionHandler.class), new DefaultHadoopShim(), null,
+ requestProto.getWorkSpec().getVertex());
this.workTime = workTime;
this.canFinish = canFinish;
}
http://git-wip-us.apache.org/repos/asf/hive/blob/9fe3dab7/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestLlapTokenChecker.java
----------------------------------------------------------------------
diff --git a/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestLlapTokenChecker.java b/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestLlapTokenChecker.java
index aaaa762..d4ded23 100644
--- a/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestLlapTokenChecker.java
+++ b/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestLlapTokenChecker.java
@@ -65,7 +65,7 @@ public class TestLlapTokenChecker {
private List<LlapTokenIdentifier> createTokens(String... args) {
List<LlapTokenIdentifier> tokens = new ArrayList<>();
for (int i = 0; i < args.length; i += 2) {
- tokens.add(new LlapTokenIdentifier(null, null, new Text(args[i]), "c", args[i + 1]));
+ tokens.add(new LlapTokenIdentifier(null, null, new Text(args[i]), "c", args[i + 1], false));
}
return tokens;
}
@@ -89,8 +89,8 @@ public class TestLlapTokenChecker {
}
}
- private void check(Pair<String, String> p, String user, String appId) {
- assertEquals(user, p.getLeft());
- assertEquals(appId, p.getRight());
+ private void check(LlapTokenChecker.LlapTokenInfo p, String user, String appId) {
+ assertEquals(user, p.userName);
+ assertEquals(appId, p.appId);
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hive/blob/9fe3dab7/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/comparator/TestFirstInFirstOutComparator.java
----------------------------------------------------------------------
diff --git a/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/comparator/TestFirstInFirstOutComparator.java b/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/comparator/TestFirstInFirstOutComparator.java
index a250882..ac48a3a 100644
--- a/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/comparator/TestFirstInFirstOutComparator.java
+++ b/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/comparator/TestFirstInFirstOutComparator.java
@@ -53,37 +53,6 @@ public class TestFirstInFirstOutComparator {
private static Configuration conf;
private static Credentials cred = new Credentials();
- private static class MockRequest extends TaskRunnerCallable {
- private int workTime;
- private boolean canFinish;
-
- public MockRequest(SubmitWorkRequestProto requestProto,
- boolean canFinish, int workTime) {
- super(requestProto, mock(QueryFragmentInfo.class), conf,
- new ExecutionContextImpl("localhost"), null, cred, 0, null, null, null,
- mock(KilledTaskHandler.class), mock(
- FragmentCompletionHandler.class), new DefaultHadoopShim(), null);
- this.workTime = workTime;
- this.canFinish = canFinish;
- }
-
- @Override
- protected TaskRunner2Result callInternal() {
- System.out.println(super.getRequestId() + " is executing..");
- try {
- Thread.sleep(workTime);
- } catch (InterruptedException e) {
- return new TaskRunner2Result(EndReason.KILL_REQUESTED, null, null, false);
- }
- return new TaskRunner2Result(EndReason.SUCCESS, null, null, false);
- }
-
- @Override
- public boolean canFinish() {
- return canFinish;
- }
- }
-
@Before
public void setup() {
conf = new Configuration();
http://git-wip-us.apache.org/repos/asf/hive/blob/9fe3dab7/llap-server/src/test/org/apache/hadoop/hive/llap/security/TestLlapSignerImpl.java
----------------------------------------------------------------------
diff --git a/llap-server/src/test/org/apache/hadoop/hive/llap/security/TestLlapSignerImpl.java b/llap-server/src/test/org/apache/hadoop/hive/llap/security/TestLlapSignerImpl.java
new file mode 100644
index 0000000..0420225
--- /dev/null
+++ b/llap-server/src/test/org/apache/hadoop/hive/llap/security/TestLlapSignerImpl.java
@@ -0,0 +1,200 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.llap.security;
+
+import static org.junit.Assert.*;
+
+import java.io.IOException;
+
+import org.apache.hadoop.hive.llap.security.LlapSigner.Signable;
+import org.apache.hadoop.hive.llap.security.LlapSigner.SignedMessage;
+import org.apache.hadoop.io.DataInputBuffer;
+import org.apache.hadoop.io.DataOutputBuffer;
+import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier;
+import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenSecretManager;
+import org.apache.hadoop.security.token.delegation.DelegationKey;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class TestLlapSignerImpl {
+ private static final Logger LOG = LoggerFactory.getLogger(TestLlapSignerImpl.class);
+
+ @Test(timeout = 10000)
+ public void testSigning() throws Exception {
+ FakeSecretManager fsm = new FakeSecretManager();
+ fsm.startThreads();
+
+ // Make sure the signature works.
+ LlapSignerImpl signer = new LlapSignerImpl(fsm);
+ byte theByte = 1;
+ TestSignable in = new TestSignable(theByte);
+ TestSignable in2 = new TestSignable(++theByte);
+ SignedMessage sm2 = signer.serializeAndSign(in2);
+ SignedMessage sm = signer.serializeAndSign(in);
+ TestSignable out = TestSignable.deserialize(sm.message);
+ TestSignable out2 = TestSignable.deserialize(sm2.message);
+ assertEquals(in, out);
+ assertEquals(in2, out2);
+ signer.checkSignature(sm.message, sm.signature, out.masterKeyId);
+ signer.checkSignature(sm2.message, sm2.signature, out2.masterKeyId);
+
+ // Make sure the broken signature doesn't work.
+ try {
+ signer.checkSignature(sm.message, sm2.signature, out.masterKeyId);
+ fail("Didn't throw");
+ } catch (SecurityException ex) {
+ // Expected.
+ }
+
+ int index = sm.signature.length / 2;
+ sm.signature[index] = (byte)(sm.signature[index] + 1);
+ try {
+ signer.checkSignature(sm.message, sm.signature, out.masterKeyId);
+ fail("Didn't throw");
+ } catch (SecurityException ex) {
+ // Expected.
+ }
+ sm.signature[index] = (byte)(sm.signature[index] - 1);
+
+ // Adding keys is PITA - there's no way to plug into timed rolling; just create a new fsm.
+ DelegationKey dk = fsm.getCurrentKey();
+ fsm.stopThreads();
+ fsm = new FakeSecretManager();
+ fsm.addKey(dk);
+ fsm.startThreads();
+ signer = new LlapSignerImpl(fsm);
+ // Sign in2 with a different key.
+ sm2 = signer.serializeAndSign(in2);
+ out2 = TestSignable.deserialize(sm2.message);
+ assertNotEquals(out.masterKeyId, out2.masterKeyId);
+ assertEquals(in2, out2);
+ signer.checkSignature(sm2.message, sm2.signature, out2.masterKeyId);
+ signer.checkSignature(sm.message, sm.signature, out.masterKeyId);
+ // Make sure the key ID mismatch causes error.
+ try {
+ signer.checkSignature(sm2.message, sm2.signature, out.masterKeyId);
+ fail("Didn't throw");
+ } catch (SecurityException ex) {
+ // Expected.
+ }
+
+ // The same for rolling the key; re-create the fsm with only the key #2.
+ dk = fsm.getCurrentKey();
+ fsm.stopThreads();
+
+ fsm = new FakeSecretManager();
+ fsm.addKey(dk);
+ fsm.startThreads();
+ signer = new LlapSignerImpl(fsm);
+ signer.checkSignature(sm2.message, sm2.signature, out2.masterKeyId);
+ // The key is missing - shouldn't be able to verify.
+ try {
+ signer.checkSignature(sm.message, sm.signature, out.masterKeyId);
+ fail("Didn't throw");
+ } catch (SecurityException ex) {
+ // Expected.
+ }
+ fsm.stopThreads();
+ }
+
+ private static class TestSignable implements Signable {
+ public int masterKeyId;
+ public byte index;
+
+ public TestSignable(byte i) {
+ index = i;
+ }
+
+ public TestSignable(int keyId, byte b) {
+ masterKeyId = keyId;
+ index = b;
+ }
+
+ @Override
+ public void setSignInfo(int masterKeyId, String user) {
+ this.masterKeyId = masterKeyId;
+ }
+
+ @Override
+ public byte[] serialize() throws IOException {
+ DataOutputBuffer dob = new DataOutputBuffer(5);
+ dob.writeInt(masterKeyId);
+ dob.write(index);
+ byte[] b = dob.getData();
+ dob.close();
+ return b;
+ }
+
+ public static TestSignable deserialize(byte[] bytes) throws IOException {
+ DataInputBuffer db = new DataInputBuffer();
+ db.reset(bytes, bytes.length);
+ int keyId = db.readInt();
+ byte b = db.readByte();
+ db.close();
+ return new TestSignable(keyId, b);
+ }
+
+ @Override
+ public int hashCode() {
+ return 31 * index + masterKeyId;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj) return true;
+ if (!(obj instanceof TestSignable)) return false;
+ TestSignable other = (TestSignable) obj;
+ return (index == other.index) && (masterKeyId == other.masterKeyId);
+ }
+ }
+
+ private static class FakeSecretManager
+ extends AbstractDelegationTokenSecretManager<AbstractDelegationTokenIdentifier>
+ implements SigningSecretManager {
+
+ public FakeSecretManager() {
+ // The keys instantly expire and are rolled.
+ super(10000000, 10000000, 10000000, 10000000);
+ }
+
+ @Override
+ public DelegationKey getCurrentKey() {
+ return getDelegationKey(getCurrentKeyId());
+ }
+
+ @Override
+ public byte[] signWithKey(byte[] message, DelegationKey key) {
+ return createPassword(message, key.getKey());
+ }
+
+ @Override
+ public byte[] signWithKey(byte[] message, int keyId) throws SecurityException {
+ DelegationKey key = getDelegationKey(keyId);
+ if (key == null) {
+ throw new SecurityException("The key ID " + keyId + " was not found");
+ }
+ return createPassword(message, key.getKey());
+ }
+
+ @Override
+ public AbstractDelegationTokenIdentifier createIdentifier() {
+ throw new UnsupportedOperationException();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/9fe3dab7/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java
index c9b912b..d04cfa3 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java
@@ -54,7 +54,7 @@ import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
import org.apache.hadoop.hive.llap.DaemonId;
import org.apache.hadoop.hive.llap.LlapUtil;
import org.apache.hadoop.hive.llap.impl.LlapProtocolClientImpl;
-import org.apache.hadoop.hive.llap.security.LlapTokenClientFactory;
+import org.apache.hadoop.hive.llap.security.LlapTokenClient;
import org.apache.hadoop.hive.llap.security.LlapTokenIdentifier;
import org.apache.hadoop.hive.llap.security.LlapTokenLocalClient;
import org.apache.hadoop.hive.llap.tez.LlapProtocolClientProxy;
@@ -355,10 +355,12 @@ public class TezSessionState {
String user, final Configuration conf) throws IOException {
// TODO: parts of this should be moved out of TezSession to reuse the clients, but there's
// no good place for that right now (HIVE-13698).
- boolean useLocalTokenClient = isUsingLocalClient(conf);
+ SessionState session = SessionState.get();
+ boolean isInHs2 = session != null && session.isHiveServerQuery();
Token<LlapTokenIdentifier> token = null;
// For Tez, we don't use appId to distinguish the tokens.
- if (useLocalTokenClient) {
+ if (isInHs2) {
+ // We are in HS2, get the token locally.
String clusterName = LlapUtil.generateClusterName(conf);
// This assumes that the LLAP cluster and session are both running under HS2 user.
final String clusterId = DaemonId.createClusterString(user, clusterName);
@@ -368,12 +370,13 @@ public class TezSessionState {
public LlapTokenLocalClient call() throws Exception {
return new LlapTokenLocalClient(conf, clusterId);
}
- }).createToken(null, null);
+ }).createToken(null, null, false); // Signature is not required for Tez.
} catch (ExecutionException e) {
throw new IOException(e);
}
} else {
- token = new LlapTokenClientFactory(conf).createClient().getDelegationToken(null);
+ // We are not in HS2; always create a new client for now.
+ token = new LlapTokenClient(conf).getDelegationToken(null);
}
if (LOG.isInfoEnabled()) {
LOG.info("Obtained a LLAP token: " + token);
@@ -381,19 +384,6 @@ public class TezSessionState {
return token;
}
- private static boolean isUsingLocalClient(Configuration conf) {
- String mode = HiveConf.getVar(conf, ConfVars.LLAP_CREATE_TOKEN_LOCALLY).toLowerCase();
- boolean isHs2Only = "hs2".equals(mode);
- // We are initialized on first use inside TezSessionState::openInternal; assume the session
- // should be available.
- if (!isHs2Only) return "true".equals(mode);
- SessionState session = SessionState.get();
- if (session == null && LOG.isInfoEnabled()) {
- LOG.warn("There's no session to check if we are in HS2");
- }
- return session != null && session.isHiveServerQuery();
- }
-
private TezClient startSessionAndContainers(TezClient session, HiveConf conf,
Map<String, LocalResource> commonLocalResources, TezConfiguration tezConfig,
boolean isOnThread) throws TezException, IOException {