You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by se...@apache.org on 2016/05/18 00:44:36 UTC
hive git commit: HIVE-13449 : LLAP: HS2 should get the token directly,
rather than from LLAP (Sergey Shelukhin,
reviewed by Siddharth Seth and Lefty Leverenz)
Repository: hive
Updated Branches:
refs/heads/master 298644f66 -> 8c4b99a4e
HIVE-13449 : LLAP: HS2 should get the token directly, rather than from LLAP (Sergey Shelukhin, reviewed by Siddharth Seth and Lefty Leverenz)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/8c4b99a4
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/8c4b99a4
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/8c4b99a4
Branch: refs/heads/master
Commit: 8c4b99a4e49ea297f5a4f52a723f0697dc4ea272
Parents: 298644f
Author: Sergey Shelukhin <se...@apache.org>
Authored: Tue May 17 17:41:17 2016 -0700
Committer: Sergey Shelukhin <se...@apache.org>
Committed: Tue May 17 17:41:17 2016 -0700
----------------------------------------------------------------------
.../org/apache/hadoop/hive/conf/HiveConf.java | 5 +
llap-client/pom.xml | 11 +-
.../hadoop/hive/llap/io/api/LlapProxy.java | 37 -----
.../llap/security/LlapTokenClientFactory.java | 160 ++++++++++++++++++
.../llap/security/LlapTokenLocalClient.java | 59 +++++++
.../org/apache/hadoop/hive/llap/DaemonId.java | 9 +-
.../org/apache/hadoop/hive/llap/LlapUtil.java | 26 +++
.../impl/LlapManagementProtocolClientImpl.java | 3 +-
.../hive/llap/security/LlapTokenProvider.java | 27 ---
.../hive/llap/security/SecretManager.java | 162 ++++++++++++++++++
.../hive/llap/daemon/impl/LlapDaemon.java | 8 +-
.../daemon/impl/LlapProtocolServerImpl.java | 26 +--
.../hive/llap/security/LlapSecurityHelper.java | 164 -------------------
.../hive/llap/security/SecretManager.java | 131 ---------------
.../hive/ql/exec/tez/TezSessionState.java | 76 ++++++++-
15 files changed, 502 insertions(+), 402 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/8c4b99a4/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index 541af57..cbb3a72 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -2704,6 +2704,11 @@ public class HiveConf extends Configuration {
"RPC port for LLAP daemon management service."),
LLAP_WEB_AUTO_AUTH("hive.llap.auto.auth", true,
"Whether or not to set Hadoop configs to enable auth in LLAP web app."),
+ LLAP_CREATE_TOKEN_LOCALLY("hive.llap.create.token.locally", "hs2",
+ new StringSet("true", "hs2", "false"),
+ "Whether to create LLAP tokens locally, saving directly to ZooKeeper SecretManager.\n" +
+ "Requires one to have access to ZK paths; in other words, this should only be used in\n" +
+ "HiveServer2. By default, the value is 'hs2', which means exactly that."),
LLAP_DAEMON_RPC_NUM_HANDLERS("hive.llap.daemon.rpc.num.handlers", 5,
"Number of RPC handlers for LLAP daemon.", "llap.daemon.rpc.num.handlers"),
http://git-wip-us.apache.org/repos/asf/hive/blob/8c4b99a4/llap-client/pom.xml
----------------------------------------------------------------------
diff --git a/llap-client/pom.xml b/llap-client/pom.xml
index 4a75bbb..cbfdcd9 100644
--- a/llap-client/pom.xml
+++ b/llap-client/pom.xml
@@ -46,6 +46,11 @@
</dependency>
<!-- inter-project -->
<dependency>
+ <groupId>org.apache.commons</groupId>
+ <artifactId>commons-lang3</artifactId>
+ <version>${commons-lang3.version}</version>
+ </dependency>
+ <dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>${hadoop.version}</version>
@@ -81,12 +86,6 @@
<scope>test</scope>
</dependency>
<dependency>
- <groupId>org.apache.commons</groupId>
- <artifactId>commons-lang3</artifactId>
- <version>${commons-lang3.version}</version>
- <scope>test</scope>
- </dependency>
- <dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-all</artifactId>
<version>${mockito-all.version}</version>
http://git-wip-us.apache.org/repos/asf/hive/blob/8c4b99a4/llap-client/src/java/org/apache/hadoop/hive/llap/io/api/LlapProxy.java
----------------------------------------------------------------------
diff --git a/llap-client/src/java/org/apache/hadoop/hive/llap/io/api/LlapProxy.java b/llap-client/src/java/org/apache/hadoop/hive/llap/io/api/LlapProxy.java
index 424769f..6c2618b 100644
--- a/llap-client/src/java/org/apache/hadoop/hive/llap/io/api/LlapProxy.java
+++ b/llap-client/src/java/org/apache/hadoop/hive/llap/io/api/LlapProxy.java
@@ -21,23 +21,14 @@ import java.io.IOException;
import java.lang.reflect.Constructor;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
-import org.apache.hadoop.hive.llap.security.LlapTokenProvider;
-
@SuppressWarnings("rawtypes")
public class LlapProxy {
private final static String IMPL_CLASS = "org.apache.hadoop.hive.llap.io.api.impl.LlapIoImpl";
- private final static String TOKEN_CLASS =
- "org.apache.hadoop.hive.llap.security.LlapSecurityHelper";
// Llap server depends on Hive execution, so the reverse cannot be true. We create the I/O
// singleton once (on daemon startup); the said singleton serves as the IO interface.
private static LlapIo io = null;
- private static LlapTokenProvider tokenProvider = null;
- private static final Object tpInitLock = new Object();
- private static volatile boolean isTpInitDone = false;
private static boolean isDaemon = false;
@@ -77,34 +68,6 @@ public class LlapProxy {
}
}
- public static LlapTokenProvider getOrInitTokenProvider(Configuration conf) {
- if (isTpInitDone) return tokenProvider;
- synchronized (tpInitLock) {
- if (isTpInitDone) return tokenProvider;
- try {
- tokenProvider = createTokenProviderImpl(conf);
- isTpInitDone = true;
- } catch (IOException e) {
- throw new RuntimeException("Cannot initialize token provider", e);
- }
- return tokenProvider;
- }
- }
-
- private static LlapTokenProvider createTokenProviderImpl(Configuration conf) throws IOException {
- try {
- @SuppressWarnings("unchecked")
- Class<? extends LlapTokenProvider> clazz =
- (Class<? extends LlapTokenProvider>)Class.forName(TOKEN_CLASS);
- Constructor<? extends LlapTokenProvider> ctor =
- clazz.getDeclaredConstructor(Configuration.class);
- ctor.setAccessible(true);
- return ctor.newInstance(conf);
- } catch (Exception e) {
- throw new RuntimeException("Failed to create token provider class", e);
- }
- }
-
public static void close() {
if (io != null) {
io.close();
http://git-wip-us.apache.org/repos/asf/hive/blob/8c4b99a4/llap-client/src/java/org/apache/hadoop/hive/llap/security/LlapTokenClientFactory.java
----------------------------------------------------------------------
diff --git a/llap-client/src/java/org/apache/hadoop/hive/llap/security/LlapTokenClientFactory.java b/llap-client/src/java/org/apache/hadoop/hive/llap/security/LlapTokenClientFactory.java
new file mode 100644
index 0000000..ebc91b1
--- /dev/null
+++ b/llap-client/src/java/org/apache/hadoop/hive/llap/security/LlapTokenClientFactory.java
@@ -0,0 +1,160 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.llap.security;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import javax.net.SocketFactory;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.llap.impl.LlapManagementProtocolClientImpl;
+import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.GetTokenRequestProto;
+import org.apache.hadoop.hive.llap.registry.impl.LlapRegistryService;
+import org.apache.hadoop.hive.llap.registry.ServiceInstance;
+import org.apache.hadoop.hive.llap.registry.ServiceInstanceSet;
+import org.apache.hadoop.hive.llap.security.LlapTokenIdentifier;
+import org.apache.hadoop.io.DataInputByteBuffer;
+import org.apache.hadoop.io.retry.RetryPolicies;
+import org.apache.hadoop.io.retry.RetryPolicy;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.protobuf.ByteString;
+import com.google.protobuf.ServiceException;
+
+public class LlapTokenClientFactory {
+ private static final Logger LOG = LoggerFactory.getLogger(LlapTokenClientFactory.class);
+
+ private final LlapRegistryService registry;
+ private final SocketFactory socketFactory;
+ private final RetryPolicy retryPolicy;
+ private final Configuration conf;
+ private ServiceInstanceSet activeInstances;
+ private Collection<ServiceInstance> lastKnownInstances;
+
+ public LlapTokenClientFactory(Configuration conf) {
+ this.conf = conf;
+ registry = new LlapRegistryService(false);
+ registry.init(conf);
+ socketFactory = NetUtils.getDefaultSocketFactory(conf);
+ retryPolicy = RetryPolicies.retryUpToMaximumTimeWithFixedSleep(
+ 16000, 2000l, TimeUnit.MILLISECONDS);
+ }
+
+ public interface Client {
+ Token<LlapTokenIdentifier> getDelegationToken(String appId) throws IOException;
+ }
+
+ public Client createClient() {
+ return new ClientImpl(); // Client is separate from factory mostly for thread-safety reasons.
+ }
+
+ private class ClientImpl implements Client {
+ private LlapManagementProtocolClientImpl client;
+ private ServiceInstance clientInstance;
+
+ @Override
+ public Token<LlapTokenIdentifier> getDelegationToken(String appId) throws IOException {
+ if (!UserGroupInformation.isSecurityEnabled()) return null;
+ Iterator<ServiceInstance> llaps = null;
+ if (clientInstance == null) {
+ assert client == null;
+ llaps = getLlapServices(false).iterator();
+ clientInstance = llaps.next();
+ }
+
+ ByteString tokenBytes = null;
+ boolean hasRefreshed = false;
+ while (true) {
+ try {
+ tokenBytes = getTokenBytes(appId);
+ break;
+ } catch (IOException | ServiceException ex) {
+ LOG.error("Cannot get a token, trying a different instance", ex);
+ client = null;
+ clientInstance = null;
+ }
+ if (llaps == null || !llaps.hasNext()) {
+ if (hasRefreshed) { // Only refresh once.
+ throw new RuntimeException("Cannot find any LLAPs to get the token from");
+ }
+ llaps = getLlapServices(true).iterator();
+ hasRefreshed = true;
+ }
+ clientInstance = llaps.next();
+ }
+
+ Token<LlapTokenIdentifier> token = extractToken(tokenBytes);
+ if (LOG.isInfoEnabled()) {
+ LOG.info("Obtained a LLAP delegation token from " + clientInstance + ": " + token);
+ }
+ return token;
+ }
+
+ private Token<LlapTokenIdentifier> extractToken(ByteString tokenBytes) throws IOException {
+ Token<LlapTokenIdentifier> token = new Token<>();
+ DataInputByteBuffer in = new DataInputByteBuffer();
+ in.reset(tokenBytes.asReadOnlyByteBuffer());
+ token.readFields(in);
+ return token;
+ }
+
+ private ByteString getTokenBytes(final String appId) throws IOException, ServiceException {
+ assert clientInstance != null;
+ if (client == null) {
+ client = new LlapManagementProtocolClientImpl(conf, clientInstance.getHost(),
+ clientInstance.getManagementPort(), retryPolicy, socketFactory);
+ }
+ GetTokenRequestProto.Builder req = GetTokenRequestProto.newBuilder();
+ if (!StringUtils.isBlank(appId)) {
+ req.setAppId(appId);
+ }
+ return client.getDelegationToken(null, req.build()).getToken();
+ }
+ }
+
+ /** Synchronized - LLAP registry and instance set are not thread safe. */
+ private synchronized List<ServiceInstance> getLlapServices(
+ boolean doForceRefresh) throws IOException {
+ if (!doForceRefresh && lastKnownInstances != null) {
+ return new ArrayList<>(lastKnownInstances);
+ }
+ if (activeInstances == null) {
+ registry.start();
+ activeInstances = registry.getInstances();
+ }
+ Map<String, ServiceInstance> daemons = activeInstances.getAll();
+ if (daemons == null || daemons.isEmpty()) {
+ throw new RuntimeException("No LLAPs found");
+ }
+ lastKnownInstances = daemons.values();
+ return new ArrayList<ServiceInstance>(lastKnownInstances);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/8c4b99a4/llap-client/src/java/org/apache/hadoop/hive/llap/security/LlapTokenLocalClient.java
----------------------------------------------------------------------
diff --git a/llap-client/src/java/org/apache/hadoop/hive/llap/security/LlapTokenLocalClient.java b/llap-client/src/java/org/apache/hadoop/hive/llap/security/LlapTokenLocalClient.java
new file mode 100644
index 0000000..f10351b
--- /dev/null
+++ b/llap-client/src/java/org/apache/hadoop/hive/llap/security/LlapTokenLocalClient.java
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.llap.security;
+
+import java.io.IOException;
+
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
+import org.apache.hadoop.security.token.Token;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class LlapTokenLocalClient {
+ private static final Logger LOG = LoggerFactory.getLogger(LlapTokenLocalClient.class);
+ private final SecretManager secretManager;
+
+ public LlapTokenLocalClient(Configuration conf, String clusterId) {
+ secretManager = SecretManager.createSecretManager(conf, clusterId);
+ }
+
+ public Token<LlapTokenIdentifier> createToken(String appId, String user) throws IOException {
+ try {
+ Token<LlapTokenIdentifier> token = secretManager.createLlapToken(appId, user);
+ if (LOG.isInfoEnabled()) {
+ LOG.info("Created a LLAP delegation token locally: " + token);
+ }
+ return token;
+ } catch (Exception ex) {
+ throw new IOException("Failed to create LLAP token locally. You might need to set "
+ + ConfVars.LLAP_CREATE_TOKEN_LOCALLY.varname
+ + " to false, or make sure you can access secure ZK paths.", ex);
+ }
+ }
+
+ public void close() {
+ try {
+ secretManager.stopThreads();
+ } catch (Exception ex) {
+ // Ignore.
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/8c4b99a4/llap-common/src/java/org/apache/hadoop/hive/llap/DaemonId.java
----------------------------------------------------------------------
diff --git a/llap-common/src/java/org/apache/hadoop/hive/llap/DaemonId.java b/llap-common/src/java/org/apache/hadoop/hive/llap/DaemonId.java
index 18355e6..ea47330 100644
--- a/llap-common/src/java/org/apache/hadoop/hive/llap/DaemonId.java
+++ b/llap-common/src/java/org/apache/hadoop/hive/llap/DaemonId.java
@@ -32,7 +32,14 @@ public class DaemonId {
}
public String getClusterString() {
- return userName + "_" + clusterName + "_" + appId;
+ return createClusterString(userName, clusterName);
+ }
+
+ public static String createClusterString(String userName, String clusterName) {
+ // Note that this doesn't include appId. We assume that all the subsequent instances
+ // of the same user+cluster are logically the same, i.e. all the ZK paths will be reused,
+ // all the security tokens/etc. should transition between them, etc.
+ return userName + "_" + clusterName;
}
public String getApplicationId() {
http://git-wip-us.apache.org/repos/asf/hive/blob/8c4b99a4/llap-common/src/java/org/apache/hadoop/hive/llap/LlapUtil.java
----------------------------------------------------------------------
diff --git a/llap-common/src/java/org/apache/hadoop/hive/llap/LlapUtil.java b/llap-common/src/java/org/apache/hadoop/hive/llap/LlapUtil.java
index ce03de0..9dcacea 100644
--- a/llap-common/src/java/org/apache/hadoop/hive/llap/LlapUtil.java
+++ b/llap-common/src/java/org/apache/hadoop/hive/llap/LlapUtil.java
@@ -13,14 +13,40 @@
*/
package org.apache.hadoop.hive.llap;
+import java.io.IOException;
+import java.util.regex.Pattern;
+
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
+import org.apache.hadoop.security.SecurityUtil;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public class LlapUtil {
+ private static final Logger LOG = LoggerFactory.getLogger(LlapUtil.class);
+
public static String getDaemonLocalDirList(Configuration conf) {
String localDirList = HiveConf.getVar(conf, ConfVars.LLAP_DAEMON_WORK_DIRS);
if (localDirList != null && !localDirList.isEmpty()) return localDirList;
return conf.get("yarn.nodemanager.local-dirs");
}
+
+ public static UserGroupInformation loginWithKerberos(
+ String principal, String keytabFile) throws IOException {
+ if (!UserGroupInformation.isSecurityEnabled()) return null;
+ if (principal.isEmpty() || keytabFile.isEmpty()) {
+ throw new RuntimeException("Kerberos principal and/or keytab are empty");
+ }
+ LOG.info("Logging in as " + principal + " via " + keytabFile);
+ return UserGroupInformation.loginUserFromKeytabAndReturnUGI(
+ SecurityUtil.getServerPrincipal(principal, "0.0.0.0"), keytabFile);
+ }
+
+ private final static Pattern hostsRe = Pattern.compile("[^A-Za-z0-9_-]");
+ public static String generateClusterName(Configuration conf) {
+ String hosts = HiveConf.getTrimmedVar(conf, ConfVars.LLAP_DAEMON_SERVICE_HOSTS);
+ return hostsRe.matcher(hosts.startsWith("@") ? hosts.substring(1) : hosts).replaceAll("_");
+ }
}
http://git-wip-us.apache.org/repos/asf/hive/blob/8c4b99a4/llap-common/src/java/org/apache/hadoop/hive/llap/impl/LlapManagementProtocolClientImpl.java
----------------------------------------------------------------------
diff --git a/llap-common/src/java/org/apache/hadoop/hive/llap/impl/LlapManagementProtocolClientImpl.java b/llap-common/src/java/org/apache/hadoop/hive/llap/impl/LlapManagementProtocolClientImpl.java
index cd11bdb..af760b1 100644
--- a/llap-common/src/java/org/apache/hadoop/hive/llap/impl/LlapManagementProtocolClientImpl.java
+++ b/llap-common/src/java/org/apache/hadoop/hive/llap/impl/LlapManagementProtocolClientImpl.java
@@ -65,8 +65,7 @@ public class LlapManagementProtocolClientImpl implements LlapManagementProtocolP
RPC.setProtocolEngine(conf, LlapManagementProtocolPB.class, ProtobufRpcEngine.class);
ProtocolProxy<LlapManagementProtocolPB> proxy =
RPC.getProtocolProxy(LlapManagementProtocolPB.class, 0, serverAddr,
- UserGroupInformation.getCurrentUser(), conf, NetUtils.getDefaultSocketFactory(conf), 0,
- retryPolicy);
+ UserGroupInformation.getCurrentUser(), conf, socketFactory, 0, retryPolicy);
return proxy.getProxy();
}
http://git-wip-us.apache.org/repos/asf/hive/blob/8c4b99a4/llap-common/src/java/org/apache/hadoop/hive/llap/security/LlapTokenProvider.java
----------------------------------------------------------------------
diff --git a/llap-common/src/java/org/apache/hadoop/hive/llap/security/LlapTokenProvider.java b/llap-common/src/java/org/apache/hadoop/hive/llap/security/LlapTokenProvider.java
deleted file mode 100644
index edf9b18..0000000
--- a/llap-common/src/java/org/apache/hadoop/hive/llap/security/LlapTokenProvider.java
+++ /dev/null
@@ -1,27 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hive.llap.security;
-
-import java.io.IOException;
-
-import org.apache.hadoop.security.token.Token;
-
-public interface LlapTokenProvider {
- Token<LlapTokenIdentifier> getDelegationToken(String appId) throws IOException;
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hive/blob/8c4b99a4/llap-common/src/java/org/apache/hadoop/hive/llap/security/SecretManager.java
----------------------------------------------------------------------
diff --git a/llap-common/src/java/org/apache/hadoop/hive/llap/security/SecretManager.java b/llap-common/src/java/org/apache/hadoop/hive/llap/security/SecretManager.java
new file mode 100644
index 0000000..465b204
--- /dev/null
+++ b/llap-common/src/java/org/apache/hadoop/hive/llap/security/SecretManager.java
@@ -0,0 +1,162 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.llap.security;
+
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.security.PrivilegedAction;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
+import org.apache.hadoop.hive.llap.LlapUtil;
+import org.apache.hadoop.hive.llap.security.LlapTokenIdentifier;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.delegation.ZKDelegationTokenSecretManager;
+import org.apache.hadoop.security.token.delegation.web.DelegationTokenManager;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class SecretManager extends ZKDelegationTokenSecretManager<LlapTokenIdentifier> {
+ private static final Logger LOG = LoggerFactory.getLogger(SecretManager.class);
+ private final String clusterId;
+
+ public SecretManager(Configuration conf, String clusterId) {
+ super(conf);
+ this.clusterId = clusterId;
+ checkForZKDTSMBug(conf);
+ }
+
+ // Workaround for HADOOP-12659 - remove when Hadoop 2.7.X is no longer supported.
+ private void checkForZKDTSMBug(Configuration conf) {
+ // There's a bug in ZKDelegationTokenSecretManager ctor where seconds are not converted to ms.
+ long expectedRenewTimeSec = conf.getLong(DelegationTokenManager.RENEW_INTERVAL, -1);
+ LOG.info("Checking for tokenRenewInterval bug: " + expectedRenewTimeSec);
+ if (expectedRenewTimeSec == -1) return; // The default works, no bug.
+ java.lang.reflect.Field f = null;
+ try {
+ Class<?> c = org.apache.hadoop.security.token.delegation.AbstractDelegationTokenSecretManager.class;
+ f = c.getDeclaredField("tokenRenewInterval");
+ f.setAccessible(true);
+ } catch (Throwable t) {
+ // Maybe someone removed the field; probably ok to ignore.
+ LOG.error("Failed to check for tokenRenewInterval bug, hoping for the best", t);
+ return;
+ }
+ try {
+ long realValue = f.getLong(this);
+ long expectedValue = expectedRenewTimeSec * 1000;
+ LOG.info("tokenRenewInterval is: " + realValue + " (expected " + expectedValue + ")");
+ if (realValue == expectedRenewTimeSec) {
+ // Bug - the field has to be in ms, not sec. Override only if set precisely to sec.
+ f.setLong(this, expectedValue);
+ }
+ } catch (Exception ex) {
+ throw new RuntimeException("Failed to address tokenRenewInterval bug", ex);
+ }
+ }
+
+ @Override
+ public LlapTokenIdentifier createIdentifier() {
+ return new LlapTokenIdentifier();
+ }
+
+ @Override
+ public LlapTokenIdentifier decodeTokenIdentifier(
+ Token<LlapTokenIdentifier> token) throws IOException {
+ DataInputStream dis = new DataInputStream(new ByteArrayInputStream(token.getIdentifier()));
+ LlapTokenIdentifier id = new LlapTokenIdentifier();
+ id.readFields(dis);
+ dis.close();
+ return id;
+ }
+
+ public static SecretManager createSecretManager(final Configuration conf, String clusterId) {
+ String llapPrincipal = HiveConf.getVar(conf, ConfVars.LLAP_KERBEROS_PRINCIPAL),
+ llapKeytab = HiveConf.getVar(conf, ConfVars.LLAP_KERBEROS_KEYTAB_FILE);
+ return SecretManager.createSecretManager(conf, llapPrincipal, llapKeytab, clusterId);
+ }
+
+
+ public static SecretManager createSecretManager(final Configuration conf,
+ String llapPrincipal, String llapKeytab, final String clusterId) {
+ // Create ZK connection under a separate ugi (if specified) - ZK works in mysterious ways.
+ UserGroupInformation zkUgi = null;
+ String principal = HiveConf.getVar(conf, ConfVars.LLAP_ZKSM_KERBEROS_PRINCIPAL, llapPrincipal);
+ String keyTab = HiveConf.getVar(conf, ConfVars.LLAP_ZKSM_KERBEROS_KEYTAB_FILE, llapKeytab);
+ try {
+ zkUgi = LlapUtil.loginWithKerberos(principal, keyTab);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ // Override the default delegation token lifetime for LLAP.
+ // Also set all the necessary ZK settings to defaults and LLAP configs, if not set.
+ final Configuration zkConf = new Configuration(conf);
+ long tokenLifetime = HiveConf.getTimeVar(
+ conf, ConfVars.LLAP_DELEGATION_TOKEN_LIFETIME, TimeUnit.SECONDS);
+ zkConf.setLong(DelegationTokenManager.MAX_LIFETIME, tokenLifetime);
+ zkConf.setLong(DelegationTokenManager.RENEW_INTERVAL, tokenLifetime);
+ zkConf.set(SecretManager.ZK_DTSM_ZK_KERBEROS_PRINCIPAL, principal);
+ zkConf.set(SecretManager.ZK_DTSM_ZK_KERBEROS_KEYTAB, keyTab);
+ String zkPath = clusterId;
+ LOG.info("Using {} as ZK secret manager path", zkPath);
+ zkConf.set(SecretManager.ZK_DTSM_ZNODE_WORKING_PATH, "zkdtsm_" + zkPath);
+ setZkConfIfNotSet(zkConf, SecretManager.ZK_DTSM_ZK_AUTH_TYPE, "sasl");
+ setZkConfIfNotSet(zkConf, SecretManager.ZK_DTSM_ZK_CONNECTION_STRING,
+ HiveConf.getVar(zkConf, ConfVars.LLAP_ZKSM_ZK_CONNECTION_STRING));
+ return zkUgi.doAs(new PrivilegedAction<SecretManager>() {
+ @Override
+ public SecretManager run() {
+ SecretManager zkSecretManager = new SecretManager(zkConf, clusterId);
+ try {
+ zkSecretManager.startThreads();
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ return zkSecretManager;
+ }
+ });
+ }
+
+ private static void setZkConfIfNotSet(Configuration zkConf, String name, String value) {
+ if (zkConf.get(name) != null) return;
+ zkConf.set(name, value);
+ }
+
+ public Token<LlapTokenIdentifier> createLlapToken(String appId, String user) throws IOException {
+ Text realUser = null, renewer = null;
+ if (user == null) {
+ UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
+ user = ugi.getUserName();
+ if (ugi.getRealUser() != null) {
+ realUser = new Text(ugi.getRealUser().getUserName());
+ }
+ renewer = new Text(ugi.getShortUserName());
+ } else {
+ renewer = new Text(user);
+ }
+ LlapTokenIdentifier llapId = new LlapTokenIdentifier(
+ new Text(user), renewer, realUser, clusterId, appId);
+ // TODO: note that the token is not renewable right now and will last for 2 weeks by default.
+ Token<LlapTokenIdentifier> token = new Token<LlapTokenIdentifier>(llapId, this);
+ if (LOG.isInfoEnabled()) {
+ LOG.info("Created LLAP token {}", token);
+ }
+ return token;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/8c4b99a4/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java
index 5731b2c..de817e3 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java
@@ -106,12 +106,6 @@ public class LlapDaemon extends CompositeService implements ContainerRunner, Lla
private final String[] localDirs;
private final DaemonId daemonId;
- private final static Pattern hostsRe = Pattern.compile("[^A-Za-z0-9_-]");
- private static String generateClusterName(Configuration conf) {
- String hosts = HiveConf.getTrimmedVar(conf, ConfVars.LLAP_DAEMON_SERVICE_HOSTS);
- return hostsRe.matcher(hosts.startsWith("@") ? hosts.substring(1) : hosts).replaceAll("_");
- }
-
// TODO Not the best way to share the address
private final AtomicReference<InetSocketAddress> srvAddress = new AtomicReference<>(),
mngAddress = new AtomicReference<>();
@@ -150,7 +144,7 @@ public class LlapDaemon extends CompositeService implements ContainerRunner, Lla
String hostName = MetricsUtils.getHostName();
try {
daemonId = new DaemonId(UserGroupInformation.getCurrentUser().getUserName(),
- generateClusterName(daemonConf), hostName, appName, System.currentTimeMillis());
+ LlapUtil.generateClusterName(daemonConf), hostName, appName, System.currentTimeMillis());
} catch (IOException ex) {
throw new RuntimeException(ex);
}
http://git-wip-us.apache.org/repos/asf/hive/blob/8c4b99a4/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapProtocolServerImpl.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapProtocolServerImpl.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapProtocolServerImpl.java
index db8bfa6..b94fc2e 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapProtocolServerImpl.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapProtocolServerImpl.java
@@ -33,6 +33,7 @@ import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
import org.apache.hadoop.hive.llap.DaemonId;
+import org.apache.hadoop.hive.llap.LlapUtil;
import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos;
import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.GetTokenRequestProto;
import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.GetTokenResponseProto;
@@ -51,7 +52,6 @@ import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.authorize.AccessControlList;
import org.apache.hadoop.security.token.Token;
-import org.apache.hadoop.hive.llap.security.LlapSecurityHelper;
import org.apache.hadoop.hive.llap.security.LlapTokenIdentifier;
import org.apache.hadoop.hive.llap.security.SecretManager;
import org.apache.hadoop.service.AbstractService;
@@ -150,12 +150,13 @@ public class LlapProtocolServerImpl extends AbstractService
}
String llapPrincipal = HiveConf.getVar(conf, ConfVars.LLAP_KERBEROS_PRINCIPAL),
llapKeytab = HiveConf.getVar(conf, ConfVars.LLAP_KERBEROS_KEYTAB_FILE);
- zkSecretManager = SecretManager.createSecretManager(conf, llapPrincipal, llapKeytab, daemonId);
+ zkSecretManager = SecretManager.createSecretManager(
+ conf, llapPrincipal, llapKeytab, daemonId.getClusterString());
// Start the protocol server after properly authenticating with daemon keytab.
UserGroupInformation daemonUgi = null;
try {
- daemonUgi = LlapSecurityHelper.loginWithKerberos(llapPrincipal, llapKeytab);
+ daemonUgi = LlapUtil.loginWithKerberos(llapPrincipal, llapKeytab);
} catch (IOException e) {
throw new RuntimeException(e);
}
@@ -265,9 +266,11 @@ public class LlapProtocolServerImpl extends AbstractService
if (zkSecretManager == null) {
throw new ServiceException("Operation not supported on unsecure cluster");
}
- UserGroupInformation ugi;
+ UserGroupInformation ugi = null;
+ Token<LlapTokenIdentifier> token = null;
try {
ugi = UserGroupInformation.getCurrentUser();
+ token = zkSecretManager.createLlapToken(request.getAppId(), null);
} catch (IOException e) {
throw new ServiceException(e);
}
@@ -278,20 +281,7 @@ public class LlapProtocolServerImpl extends AbstractService
+ " or adjust " + ConfVars.LLAP_MANAGEMENT_ACL.varname + " and "
+ ConfVars.LLAP_MANAGEMENT_ACL_DENY.varname + " to a more restrictive ACL.");
}
- String user = ugi.getUserName();
- Text owner = new Text(user);
- Text realUser = null;
- if (ugi.getRealUser() != null) {
- realUser = new Text(ugi.getRealUser().getUserName());
- }
- Text renewer = new Text(ugi.getShortUserName());
- LlapTokenIdentifier llapId = new LlapTokenIdentifier(owner, renewer, realUser,
- daemonId.getClusterString(), request.hasAppId() ? request.getAppId() : null);
- // TODO: note that the token is not renewable right now and will last for 2 weeks by default.
- Token<LlapTokenIdentifier> token = new Token<LlapTokenIdentifier>(llapId, zkSecretManager);
- if (LOG.isInfoEnabled()) {
- LOG.info("Created LLAP token " + token);
- }
+
ByteArrayDataOutput out = ByteStreams.newDataOutput();
try {
token.write(out);
http://git-wip-us.apache.org/repos/asf/hive/blob/8c4b99a4/llap-server/src/java/org/apache/hadoop/hive/llap/security/LlapSecurityHelper.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/security/LlapSecurityHelper.java b/llap-server/src/java/org/apache/hadoop/hive/llap/security/LlapSecurityHelper.java
deleted file mode 100644
index f958bc4..0000000
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/security/LlapSecurityHelper.java
+++ /dev/null
@@ -1,164 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hive.llap.security;
-
-import java.io.IOException;
-import java.security.PrivilegedExceptionAction;
-import java.util.Iterator;
-import java.util.Map;
-import java.util.concurrent.TimeUnit;
-
-import javax.net.SocketFactory;
-
-import org.apache.commons.lang3.StringUtils;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hive.llap.impl.LlapManagementProtocolClientImpl;
-import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.GetTokenRequestProto;
-import org.apache.hadoop.hive.llap.registry.impl.LlapRegistryService;
-import org.apache.hadoop.hive.llap.registry.ServiceInstance;
-import org.apache.hadoop.hive.llap.registry.ServiceInstanceSet;
-import org.apache.hadoop.io.DataInputByteBuffer;
-import org.apache.hadoop.io.retry.RetryPolicies;
-import org.apache.hadoop.io.retry.RetryPolicy;
-import org.apache.hadoop.net.NetUtils;
-import org.apache.hadoop.security.SecurityUtil;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.security.token.Token;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.protobuf.ByteString;
-
-/** Individual instances of this class are not thread safe. */
-public class LlapSecurityHelper implements LlapTokenProvider {
- private static final Logger LOG = LoggerFactory.getLogger(LlapSecurityHelper.class);
-
- private UserGroupInformation llapUgi;
-
- private final LlapRegistryService registry;
- private ServiceInstanceSet activeInstances;
- private final Configuration conf;
- private LlapManagementProtocolClientImpl client;
- private ServiceInstance clientInstance;
-
- private final SocketFactory socketFactory;
- private final RetryPolicy retryPolicy;
-
- public LlapSecurityHelper(Configuration conf) {
- this.conf = conf;
- registry = new LlapRegistryService(false);
- registry.init(conf);
- socketFactory = NetUtils.getDefaultSocketFactory(conf);
- retryPolicy = RetryPolicies.retryUpToMaximumTimeWithFixedSleep(
- 16000, 2000l, TimeUnit.MILLISECONDS);
- }
-
- public static UserGroupInformation loginWithKerberos(
- String principal, String keytabFile) throws IOException {
- if (!UserGroupInformation.isSecurityEnabled()) return null;
- if (principal.isEmpty() || keytabFile.isEmpty()) {
- throw new RuntimeException("Kerberos principal and/or keytab are empty");
- }
- LOG.info("Logging in as " + principal + " via " + keytabFile);
- UserGroupInformation.loginUserFromKeytab(
- SecurityUtil.getServerPrincipal(principal, "0.0.0.0"), keytabFile);
- return UserGroupInformation.getLoginUser();
- }
-
- @Override
- public Token<LlapTokenIdentifier> getDelegationToken(String appId) throws IOException {
- if (!UserGroupInformation.isSecurityEnabled()) return null;
- if (llapUgi == null) {
- llapUgi = UserGroupInformation.getCurrentUser();
- // We could have also added keytab support; right now client must do smth like kinit.
- }
- Iterator<ServiceInstance> llaps = null;
- if (clientInstance == null) {
- assert client == null;
- llaps = getLlapServices(false);
- clientInstance = llaps.next();
- }
-
- ByteString tokenBytes = null;
- boolean hasRefreshed = false;
- while (true) {
- try {
- tokenBytes = getTokenBytes(appId);
- break;
- } catch (InterruptedException ie) {
- throw new RuntimeException(ie);
- } catch (IOException ex) {
- LOG.error("Cannot get a token, trying a different instance", ex);
- client = null;
- clientInstance = null;
- }
- if (llaps == null || !llaps.hasNext()) {
- if (hasRefreshed) { // Only refresh once.
- throw new RuntimeException("Cannot find any LLAPs to get the token from");
- }
- llaps = getLlapServices(true);
- hasRefreshed = true;
- }
- clientInstance = llaps.next();
- }
-
- // Stupid protobuf byte-buffer reinvention.
- Token<LlapTokenIdentifier> token = new Token<>();
- DataInputByteBuffer in = new DataInputByteBuffer();
- in.reset(tokenBytes.asReadOnlyByteBuffer());
- token.readFields(in);
- if (LOG.isInfoEnabled()) {
- LOG.info("Obtained a LLAP delegation token from " + clientInstance + ": " + token);
- }
- return token;
- }
-
- private ByteString getTokenBytes(
- final String appId) throws InterruptedException, IOException {
- return llapUgi.doAs(new PrivilegedExceptionAction<ByteString>() {
- @Override
- public ByteString run() throws Exception {
- assert clientInstance != null;
- if (client == null) {
- client = new LlapManagementProtocolClientImpl(conf, clientInstance.getHost(),
- clientInstance.getManagementPort(), retryPolicy, socketFactory);
- }
- // Client only connects on the first call, so this has to be done in doAs.
- GetTokenRequestProto.Builder req = GetTokenRequestProto.newBuilder();
- if (!StringUtils.isBlank(appId)) {
- req.setAppId(appId);
- }
- return client.getDelegationToken(null, req.build()).getToken();
- }
- });
- }
-
- private Iterator<ServiceInstance> getLlapServices(boolean doForceRefresh) throws IOException {
- if (activeInstances == null) {
- registry.start();
- activeInstances = registry.getInstances();
- }
- Map<String, ServiceInstance> daemons = activeInstances.getAll();
- if (doForceRefresh || daemons == null || daemons.isEmpty()) {
- daemons = activeInstances.getAll();
- if (daemons == null || daemons.isEmpty()) throw new RuntimeException("No LLAPs found");
- }
- return daemons.values().iterator();
- }
-}
http://git-wip-us.apache.org/repos/asf/hive/blob/8c4b99a4/llap-server/src/java/org/apache/hadoop/hive/llap/security/SecretManager.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/security/SecretManager.java b/llap-server/src/java/org/apache/hadoop/hive/llap/security/SecretManager.java
deleted file mode 100644
index c54e726..0000000
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/security/SecretManager.java
+++ /dev/null
@@ -1,131 +0,0 @@
-/*
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hive.llap.security;
-
-import java.io.ByteArrayInputStream;
-import java.io.DataInputStream;
-import java.io.IOException;
-import java.security.PrivilegedAction;
-import java.util.concurrent.TimeUnit;
-import java.util.regex.Pattern;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
-import org.apache.hadoop.hive.llap.DaemonId;
-import org.apache.hadoop.hive.llap.security.LlapTokenIdentifier;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.security.token.Token;
-import org.apache.hadoop.security.token.delegation.ZKDelegationTokenSecretManager;
-import org.apache.hadoop.security.token.delegation.web.DelegationTokenManager;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class SecretManager extends ZKDelegationTokenSecretManager<LlapTokenIdentifier> {
- private static final Logger LOG = LoggerFactory.getLogger(SecretManager.class);
-
- public SecretManager(Configuration conf) {
- super(conf);
- checkForZKDTSMBug(conf);
- }
-
- // Workaround for HADOOP-12659 - remove when Hadoop 2.7.X is no longer supported.
- private void checkForZKDTSMBug(Configuration conf) {
- // There's a bug in ZKDelegationTokenSecretManager ctor where seconds are not converted to ms.
- long expectedRenewTimeSec = conf.getLong(DelegationTokenManager.RENEW_INTERVAL, -1);
- LOG.info("Checking for tokenRenewInterval bug: " + expectedRenewTimeSec);
- if (expectedRenewTimeSec == -1) return; // The default works, no bug.
- java.lang.reflect.Field f = null;
- try {
- Class<?> c = org.apache.hadoop.security.token.delegation.AbstractDelegationTokenSecretManager.class;
- f = c.getDeclaredField("tokenRenewInterval");
- f.setAccessible(true);
- } catch (Throwable t) {
- // Maybe someone removed the field; probably ok to ignore.
- LOG.error("Failed to check for tokenRenewInterval bug, hoping for the best", t);
- return;
- }
- try {
- long realValue = f.getLong(this);
- long expectedValue = expectedRenewTimeSec * 1000;
- LOG.info("tokenRenewInterval is: " + realValue + " (expected " + expectedValue + ")");
- if (realValue == expectedRenewTimeSec) {
- // Bug - the field has to be in ms, not sec. Override only if set precisely to sec.
- f.setLong(this, expectedValue);
- }
- } catch (Exception ex) {
- throw new RuntimeException("Failed to address tokenRenewInterval bug", ex);
- }
- }
-
- @Override
- public LlapTokenIdentifier createIdentifier() {
- return new LlapTokenIdentifier();
- }
-
- @Override
- public LlapTokenIdentifier decodeTokenIdentifier(
- Token<LlapTokenIdentifier> token) throws IOException {
- DataInputStream dis = new DataInputStream(new ByteArrayInputStream(token.getIdentifier()));
- LlapTokenIdentifier id = new LlapTokenIdentifier();
- id.readFields(dis);
- dis.close();
- return id;
- }
-
- public static SecretManager createSecretManager(
- final Configuration conf, String llapPrincipal, String llapKeytab, DaemonId daemonId) {
- // Create ZK connection under a separate ugi (if specified) - ZK works in mysterious ways.
- UserGroupInformation zkUgi = null;
- String principal = HiveConf.getVar(conf, ConfVars.LLAP_ZKSM_KERBEROS_PRINCIPAL, llapPrincipal);
- String keyTab = HiveConf.getVar(conf, ConfVars.LLAP_ZKSM_KERBEROS_KEYTAB_FILE, llapKeytab);
- try {
- zkUgi = LlapSecurityHelper.loginWithKerberos(principal, keyTab);
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
- // Override the default delegation token lifetime for LLAP.
- // Also set all the necessary ZK settings to defaults and LLAP configs, if not set.
- final Configuration zkConf = new Configuration(conf);
- long tokenLifetime = HiveConf.getTimeVar(
- conf, ConfVars.LLAP_DELEGATION_TOKEN_LIFETIME, TimeUnit.SECONDS);
- zkConf.setLong(DelegationTokenManager.MAX_LIFETIME, tokenLifetime);
- zkConf.setLong(DelegationTokenManager.RENEW_INTERVAL, tokenLifetime);
- zkConf.set(SecretManager.ZK_DTSM_ZK_KERBEROS_PRINCIPAL, principal);
- zkConf.set(SecretManager.ZK_DTSM_ZK_KERBEROS_KEYTAB, keyTab);
- String zkPath = daemonId.getClusterString();
- LOG.info("Using {} as ZK secret manager path", zkPath);
- zkConf.set(SecretManager.ZK_DTSM_ZNODE_WORKING_PATH, "zkdtsm_" + zkPath);
- setZkConfIfNotSet(zkConf, SecretManager.ZK_DTSM_ZK_AUTH_TYPE, "sasl");
- setZkConfIfNotSet(zkConf, SecretManager.ZK_DTSM_ZK_CONNECTION_STRING,
- HiveConf.getVar(zkConf, ConfVars.LLAP_ZKSM_ZK_CONNECTION_STRING));
- return zkUgi.doAs(new PrivilegedAction<SecretManager>() {
- @Override
- public SecretManager run() {
- SecretManager zkSecretManager = new SecretManager(zkConf);
- try {
- zkSecretManager.startThreads();
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
- return zkSecretManager;
- }
- });
- }
-
- private static void setZkConfIfNotSet(Configuration zkConf, String name, String value) {
- if (zkConf.get(name) != null) return;
- zkConf.set(name, value);
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hive/blob/8c4b99a4/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java
index fd6465a..c9b912b 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java
@@ -44,16 +44,19 @@ import javax.security.auth.login.LoginException;
import org.apache.commons.codec.digest.DigestUtils;
import org.apache.commons.io.FilenameUtils;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
+import org.apache.hadoop.hive.llap.DaemonId;
+import org.apache.hadoop.hive.llap.LlapUtil;
import org.apache.hadoop.hive.llap.impl.LlapProtocolClientImpl;
-import org.apache.hadoop.hive.llap.io.api.LlapProxy;
+import org.apache.hadoop.hive.llap.security.LlapTokenClientFactory;
import org.apache.hadoop.hive.llap.security.LlapTokenIdentifier;
-import org.apache.hadoop.hive.llap.security.LlapTokenProvider;
+import org.apache.hadoop.hive.llap.security.LlapTokenLocalClient;
import org.apache.hadoop.hive.llap.tez.LlapProtocolClientProxy;
import org.apache.hadoop.hive.llap.tezplugins.LlapContainerLauncher;
import org.apache.hadoop.hive.llap.tezplugins.LlapTaskCommunicator;
@@ -82,6 +85,11 @@ import org.apache.tez.serviceplugins.api.TaskSchedulerDescriptor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.RemovalListener;
+import com.google.common.cache.RemovalNotification;
+
/**
* Holds session state related to Tez
*/
@@ -274,14 +282,8 @@ public class TezSessionState {
Credentials llapCredentials = null;
if (llapMode) {
if (UserGroupInformation.isSecurityEnabled()) {
- LlapTokenProvider tp = LlapProxy.getOrInitTokenProvider(conf);
- // For Tez, we don't use appId to distinguish the tokens; security scope is the user.
- Token<LlapTokenIdentifier> token = tp.getDelegationToken(null);
- if (LOG.isInfoEnabled()) {
- LOG.info("Obtained a LLAP token: " + token);
- }
llapCredentials = new Credentials();
- llapCredentials.addToken(LlapTokenIdentifier.KIND_NAME, token);
+ llapCredentials.addToken(LlapTokenIdentifier.KIND_NAME, getLlapToken(user, tezConfig));
}
UserPayload servicePluginPayload = TezUtils.createUserPayloadFromConf(tezConfig);
// we need plugins to handle llap and uber mode
@@ -336,6 +338,62 @@ public class TezSessionState {
}
}
+ // Only cache ZK connections (ie local clients); these are presumed to be used in HS2.
+ // TODO: temporary before HIVE-13698.
+ private static final Cache<String, LlapTokenLocalClient> localClientCache = CacheBuilder
+ .newBuilder().expireAfterAccess(10, TimeUnit.MINUTES)
+ .removalListener(new RemovalListener<String, LlapTokenLocalClient>() {
+ @Override
+ public void onRemoval(RemovalNotification<String, LlapTokenLocalClient> notification) {
+ if (notification.getValue() != null) {
+ notification.getValue().close();
+ }
+ }
+ }).build();
+
+ private static Token<LlapTokenIdentifier> getLlapToken(
+ String user, final Configuration conf) throws IOException {
+ // TODO: parts of this should be moved out of TezSession to reuse the clients, but there's
+ // no good place for that right now (HIVE-13698).
+ boolean useLocalTokenClient = isUsingLocalClient(conf);
+ Token<LlapTokenIdentifier> token = null;
+ // For Tez, we don't use appId to distinguish the tokens.
+ if (useLocalTokenClient) {
+ String clusterName = LlapUtil.generateClusterName(conf);
+ // This assumes that the LLAP cluster and session are both running under HS2 user.
+ final String clusterId = DaemonId.createClusterString(user, clusterName);
+ try {
+ token = localClientCache.get(clusterId, new Callable<LlapTokenLocalClient>() {
+ @Override
+ public LlapTokenLocalClient call() throws Exception {
+ return new LlapTokenLocalClient(conf, clusterId);
+ }
+ }).createToken(null, null);
+ } catch (ExecutionException e) {
+ throw new IOException(e);
+ }
+ } else {
+ token = new LlapTokenClientFactory(conf).createClient().getDelegationToken(null);
+ }
+ if (LOG.isInfoEnabled()) {
+ LOG.info("Obtained a LLAP token: " + token);
+ }
+ return token;
+ }
+
+ private static boolean isUsingLocalClient(Configuration conf) {
+ String mode = HiveConf.getVar(conf, ConfVars.LLAP_CREATE_TOKEN_LOCALLY).toLowerCase();
+ boolean isHs2Only = "hs2".equals(mode);
+ // We are initialized on first use inside TezSessionState::openInternal; assume the session
+ // should be available.
+ if (!isHs2Only) return "true".equals(mode);
+ SessionState session = SessionState.get();
+ if (session == null && LOG.isInfoEnabled()) {
+ LOG.warn("There's no session to check if we are in HS2");
+ }
+ return session != null && session.isHiveServerQuery();
+ }
+
private TezClient startSessionAndContainers(TezClient session, HiveConf conf,
Map<String, LocalResource> commonLocalResources, TezConfiguration tezConfig,
boolean isOnThread) throws TezException, IOException {