You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by se...@apache.org on 2016/05/18 00:44:36 UTC

hive git commit: HIVE-13449 : LLAP: HS2 should get the token directly, rather than from LLAP (Sergey Shelukhin, reviewed by Siddharth Seth and Lefty Leverenz)

Repository: hive
Updated Branches:
  refs/heads/master 298644f66 -> 8c4b99a4e


HIVE-13449 : LLAP: HS2 should get the token directly, rather than from LLAP (Sergey Shelukhin, reviewed by Siddharth Seth and Lefty Leverenz)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/8c4b99a4
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/8c4b99a4
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/8c4b99a4

Branch: refs/heads/master
Commit: 8c4b99a4e49ea297f5a4f52a723f0697dc4ea272
Parents: 298644f
Author: Sergey Shelukhin <se...@apache.org>
Authored: Tue May 17 17:41:17 2016 -0700
Committer: Sergey Shelukhin <se...@apache.org>
Committed: Tue May 17 17:41:17 2016 -0700

----------------------------------------------------------------------
 .../org/apache/hadoop/hive/conf/HiveConf.java   |   5 +
 llap-client/pom.xml                             |  11 +-
 .../hadoop/hive/llap/io/api/LlapProxy.java      |  37 -----
 .../llap/security/LlapTokenClientFactory.java   | 160 ++++++++++++++++++
 .../llap/security/LlapTokenLocalClient.java     |  59 +++++++
 .../org/apache/hadoop/hive/llap/DaemonId.java   |   9 +-
 .../org/apache/hadoop/hive/llap/LlapUtil.java   |  26 +++
 .../impl/LlapManagementProtocolClientImpl.java  |   3 +-
 .../hive/llap/security/LlapTokenProvider.java   |  27 ---
 .../hive/llap/security/SecretManager.java       | 162 ++++++++++++++++++
 .../hive/llap/daemon/impl/LlapDaemon.java       |   8 +-
 .../daemon/impl/LlapProtocolServerImpl.java     |  26 +--
 .../hive/llap/security/LlapSecurityHelper.java  | 164 -------------------
 .../hive/llap/security/SecretManager.java       | 131 ---------------
 .../hive/ql/exec/tez/TezSessionState.java       |  76 ++++++++-
 15 files changed, 502 insertions(+), 402 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/8c4b99a4/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index 541af57..cbb3a72 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -2704,6 +2704,11 @@ public class HiveConf extends Configuration {
         "RPC port for LLAP daemon management service."),
     LLAP_WEB_AUTO_AUTH("hive.llap.auto.auth", true,
         "Whether or not to set Hadoop configs to enable auth in LLAP web app."),
+    LLAP_CREATE_TOKEN_LOCALLY("hive.llap.create.token.locally", "hs2",
+        new StringSet("true", "hs2", "false"),
+        "Whether to create LLAP tokens locally, saving directly to ZooKeeper SecretManager.\n" +
+        "Requires one to have access to ZK paths; in other words, this should only be used in\n" +
+        "HiveServer2. By default, the value is 'hs2', which means exactly that."),
 
     LLAP_DAEMON_RPC_NUM_HANDLERS("hive.llap.daemon.rpc.num.handlers", 5,
       "Number of RPC handlers for LLAP daemon.", "llap.daemon.rpc.num.handlers"),

http://git-wip-us.apache.org/repos/asf/hive/blob/8c4b99a4/llap-client/pom.xml
----------------------------------------------------------------------
diff --git a/llap-client/pom.xml b/llap-client/pom.xml
index 4a75bbb..cbfdcd9 100644
--- a/llap-client/pom.xml
+++ b/llap-client/pom.xml
@@ -46,6 +46,11 @@
     </dependency>
     <!-- inter-project -->
     <dependency>
+      <groupId>org.apache.commons</groupId>
+      <artifactId>commons-lang3</artifactId>
+      <version>${commons-lang3.version}</version>
+    </dependency>
+    <dependency>
       <groupId>org.apache.hadoop</groupId>
       <artifactId>hadoop-common</artifactId>
       <version>${hadoop.version}</version>
@@ -81,12 +86,6 @@
       <scope>test</scope>
     </dependency>
     <dependency>
-      <groupId>org.apache.commons</groupId>
-      <artifactId>commons-lang3</artifactId>
-      <version>${commons-lang3.version}</version>
-      <scope>test</scope>
-    </dependency>
-    <dependency>
       <groupId>org.mockito</groupId>
       <artifactId>mockito-all</artifactId>
       <version>${mockito-all.version}</version>

http://git-wip-us.apache.org/repos/asf/hive/blob/8c4b99a4/llap-client/src/java/org/apache/hadoop/hive/llap/io/api/LlapProxy.java
----------------------------------------------------------------------
diff --git a/llap-client/src/java/org/apache/hadoop/hive/llap/io/api/LlapProxy.java b/llap-client/src/java/org/apache/hadoop/hive/llap/io/api/LlapProxy.java
index 424769f..6c2618b 100644
--- a/llap-client/src/java/org/apache/hadoop/hive/llap/io/api/LlapProxy.java
+++ b/llap-client/src/java/org/apache/hadoop/hive/llap/io/api/LlapProxy.java
@@ -21,23 +21,14 @@ import java.io.IOException;
 import java.lang.reflect.Constructor;
 
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
-import org.apache.hadoop.hive.llap.security.LlapTokenProvider;
-
 
 @SuppressWarnings("rawtypes")
 public class LlapProxy {
   private final static String IMPL_CLASS = "org.apache.hadoop.hive.llap.io.api.impl.LlapIoImpl";
-  private final static String TOKEN_CLASS =
-      "org.apache.hadoop.hive.llap.security.LlapSecurityHelper";
 
   // Llap server depends on Hive execution, so the reverse cannot be true. We create the I/O
   // singleton once (on daemon startup); the said singleton serves as the IO interface.
   private static LlapIo io = null;
-  private static LlapTokenProvider tokenProvider = null;
-  private static final Object tpInitLock = new Object();
-  private static volatile boolean isTpInitDone = false;
 
   private static boolean isDaemon = false;
 
@@ -77,34 +68,6 @@ public class LlapProxy {
     }
   }
 
-  public static LlapTokenProvider getOrInitTokenProvider(Configuration conf) {
-    if (isTpInitDone) return tokenProvider;
-    synchronized (tpInitLock) {
-      if (isTpInitDone) return tokenProvider;
-      try {
-        tokenProvider = createTokenProviderImpl(conf);
-        isTpInitDone = true;
-      } catch (IOException e) {
-        throw new RuntimeException("Cannot initialize token provider", e);
-      }
-      return tokenProvider;
-    }
-  }
-
-  private static LlapTokenProvider createTokenProviderImpl(Configuration conf) throws IOException {
-    try {
-      @SuppressWarnings("unchecked")
-      Class<? extends LlapTokenProvider> clazz =
-        (Class<? extends LlapTokenProvider>)Class.forName(TOKEN_CLASS);
-      Constructor<? extends LlapTokenProvider> ctor =
-          clazz.getDeclaredConstructor(Configuration.class);
-      ctor.setAccessible(true);
-      return ctor.newInstance(conf);
-    } catch (Exception e) {
-      throw new RuntimeException("Failed to create token provider class", e);
-    }
-  }
-
   public static void close() {
     if (io != null) {
       io.close();

http://git-wip-us.apache.org/repos/asf/hive/blob/8c4b99a4/llap-client/src/java/org/apache/hadoop/hive/llap/security/LlapTokenClientFactory.java
----------------------------------------------------------------------
diff --git a/llap-client/src/java/org/apache/hadoop/hive/llap/security/LlapTokenClientFactory.java b/llap-client/src/java/org/apache/hadoop/hive/llap/security/LlapTokenClientFactory.java
new file mode 100644
index 0000000..ebc91b1
--- /dev/null
+++ b/llap-client/src/java/org/apache/hadoop/hive/llap/security/LlapTokenClientFactory.java
@@ -0,0 +1,160 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.llap.security;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import javax.net.SocketFactory;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.llap.impl.LlapManagementProtocolClientImpl;
+import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.GetTokenRequestProto;
+import org.apache.hadoop.hive.llap.registry.impl.LlapRegistryService;
+import org.apache.hadoop.hive.llap.registry.ServiceInstance;
+import org.apache.hadoop.hive.llap.registry.ServiceInstanceSet;
+import org.apache.hadoop.hive.llap.security.LlapTokenIdentifier;
+import org.apache.hadoop.io.DataInputByteBuffer;
+import org.apache.hadoop.io.retry.RetryPolicies;
+import org.apache.hadoop.io.retry.RetryPolicy;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.protobuf.ByteString;
+import com.google.protobuf.ServiceException;
+
+public class LlapTokenClientFactory {
+  private static final Logger LOG = LoggerFactory.getLogger(LlapTokenClientFactory.class);
+
+  private final LlapRegistryService registry;
+  private final SocketFactory socketFactory;
+  private final RetryPolicy retryPolicy;
+  private final Configuration conf;
+  private ServiceInstanceSet activeInstances;
+  private Collection<ServiceInstance> lastKnownInstances;
+
+  public LlapTokenClientFactory(Configuration conf) {
+    this.conf = conf;
+    registry = new LlapRegistryService(false);
+    registry.init(conf);
+    socketFactory = NetUtils.getDefaultSocketFactory(conf);
+    retryPolicy = RetryPolicies.retryUpToMaximumTimeWithFixedSleep(
+        16000, 2000l, TimeUnit.MILLISECONDS);
+  }
+
+  public interface Client {
+    Token<LlapTokenIdentifier> getDelegationToken(String appId) throws IOException;
+  }
+
+  public Client createClient() {
+    return new ClientImpl(); // Client is separate from factory mostly for thread-safety reasons.
+  }
+
+  private class ClientImpl implements Client {
+    private LlapManagementProtocolClientImpl client;
+    private ServiceInstance clientInstance;
+
+    @Override
+    public Token<LlapTokenIdentifier> getDelegationToken(String appId) throws IOException {
+      if (!UserGroupInformation.isSecurityEnabled()) return null;
+      Iterator<ServiceInstance> llaps = null;
+      if (clientInstance == null) {
+        assert client == null;
+        llaps = getLlapServices(false).iterator();
+        clientInstance = llaps.next();
+      }
+
+      ByteString tokenBytes = null;
+      boolean hasRefreshed = false;
+      while (true) {
+        try {
+          tokenBytes = getTokenBytes(appId);
+          break;
+        } catch (IOException | ServiceException ex) {
+          LOG.error("Cannot get a token, trying a different instance", ex);
+          client = null;
+          clientInstance = null;
+        }
+        if (llaps == null || !llaps.hasNext()) {
+          if (hasRefreshed) { // Only refresh once.
+            throw new RuntimeException("Cannot find any LLAPs to get the token from");
+          }
+          llaps = getLlapServices(true).iterator();
+          hasRefreshed = true;
+        }
+        clientInstance = llaps.next();
+      }
+
+      Token<LlapTokenIdentifier> token = extractToken(tokenBytes);
+      if (LOG.isInfoEnabled()) {
+        LOG.info("Obtained a LLAP delegation token from " + clientInstance + ": " + token);
+      }
+      return token;
+    }
+
+    private Token<LlapTokenIdentifier> extractToken(ByteString tokenBytes) throws IOException {
+      Token<LlapTokenIdentifier> token = new Token<>();
+      DataInputByteBuffer in = new DataInputByteBuffer();
+      in.reset(tokenBytes.asReadOnlyByteBuffer());
+      token.readFields(in);
+      return token;
+    }
+
+    private ByteString getTokenBytes(final String appId) throws IOException, ServiceException {
+      assert clientInstance != null;
+      if (client == null) {
+        client = new LlapManagementProtocolClientImpl(conf, clientInstance.getHost(),
+            clientInstance.getManagementPort(), retryPolicy, socketFactory);
+      }
+      GetTokenRequestProto.Builder req = GetTokenRequestProto.newBuilder();
+      if (!StringUtils.isBlank(appId)) {
+        req.setAppId(appId);
+      }
+      return client.getDelegationToken(null, req.build()).getToken();
+    }
+  }
+
+  /** Synchronized - LLAP registry and instance set are not thread safe. */
+  private synchronized List<ServiceInstance> getLlapServices(
+      boolean doForceRefresh) throws IOException {
+    if (!doForceRefresh && lastKnownInstances != null) {
+      return new ArrayList<>(lastKnownInstances);
+    }
+    if (activeInstances == null) {
+      registry.start();
+      activeInstances = registry.getInstances();
+    }
+    Map<String, ServiceInstance> daemons = activeInstances.getAll();
+    if (daemons == null || daemons.isEmpty()) {
+      throw new RuntimeException("No LLAPs found");
+    }
+    lastKnownInstances = daemons.values();
+    return new ArrayList<ServiceInstance>(lastKnownInstances);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/8c4b99a4/llap-client/src/java/org/apache/hadoop/hive/llap/security/LlapTokenLocalClient.java
----------------------------------------------------------------------
diff --git a/llap-client/src/java/org/apache/hadoop/hive/llap/security/LlapTokenLocalClient.java b/llap-client/src/java/org/apache/hadoop/hive/llap/security/LlapTokenLocalClient.java
new file mode 100644
index 0000000..f10351b
--- /dev/null
+++ b/llap-client/src/java/org/apache/hadoop/hive/llap/security/LlapTokenLocalClient.java
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.llap.security;
+
+import java.io.IOException;
+
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
+import org.apache.hadoop.security.token.Token;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class LlapTokenLocalClient {
+  private static final Logger LOG = LoggerFactory.getLogger(LlapTokenLocalClient.class);
+  private final SecretManager secretManager;
+
+  public LlapTokenLocalClient(Configuration conf, String clusterId) {
+    secretManager = SecretManager.createSecretManager(conf, clusterId);
+  }
+
+  public Token<LlapTokenIdentifier> createToken(String appId, String user) throws IOException {
+    try {
+      Token<LlapTokenIdentifier> token = secretManager.createLlapToken(appId, user);
+      if (LOG.isInfoEnabled()) {
+        LOG.info("Created a LLAP delegation token locally: " + token);
+      }
+      return token;
+    } catch (Exception ex) {
+      throw new IOException("Failed to create LLAP token locally. You might need to set "
+          + ConfVars.LLAP_CREATE_TOKEN_LOCALLY.varname
+          + " to false, or make sure you can access secure ZK paths.", ex);
+    }
+  }
+
+  public void close() {
+    try {
+      secretManager.stopThreads();
+    } catch (Exception ex) {
+      // Ignore.
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/8c4b99a4/llap-common/src/java/org/apache/hadoop/hive/llap/DaemonId.java
----------------------------------------------------------------------
diff --git a/llap-common/src/java/org/apache/hadoop/hive/llap/DaemonId.java b/llap-common/src/java/org/apache/hadoop/hive/llap/DaemonId.java
index 18355e6..ea47330 100644
--- a/llap-common/src/java/org/apache/hadoop/hive/llap/DaemonId.java
+++ b/llap-common/src/java/org/apache/hadoop/hive/llap/DaemonId.java
@@ -32,7 +32,14 @@ public class DaemonId {
   }
 
   public String getClusterString() {
-    return userName + "_" + clusterName + "_" + appId;
+    return createClusterString(userName, clusterName);
+  }
+
+  public static String createClusterString(String userName, String clusterName) {
+    // Note that this doesn't include appId. We assume that all the subsequent instances
+    // of the same user+cluster are logically the same, i.e. all the ZK paths will be reused,
+    // all the security tokens/etc. should transition between them, etc.
+    return userName + "_" + clusterName;
   }
 
   public String getApplicationId() {

http://git-wip-us.apache.org/repos/asf/hive/blob/8c4b99a4/llap-common/src/java/org/apache/hadoop/hive/llap/LlapUtil.java
----------------------------------------------------------------------
diff --git a/llap-common/src/java/org/apache/hadoop/hive/llap/LlapUtil.java b/llap-common/src/java/org/apache/hadoop/hive/llap/LlapUtil.java
index ce03de0..9dcacea 100644
--- a/llap-common/src/java/org/apache/hadoop/hive/llap/LlapUtil.java
+++ b/llap-common/src/java/org/apache/hadoop/hive/llap/LlapUtil.java
@@ -13,14 +13,40 @@
  */
 package org.apache.hadoop.hive.llap;
 
+import java.io.IOException;
+import java.util.regex.Pattern;
+
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
+import org.apache.hadoop.security.SecurityUtil;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class LlapUtil {
+  private static final Logger LOG = LoggerFactory.getLogger(LlapUtil.class);
+
   public static String getDaemonLocalDirList(Configuration conf) {
     String localDirList = HiveConf.getVar(conf, ConfVars.LLAP_DAEMON_WORK_DIRS);
     if (localDirList != null && !localDirList.isEmpty()) return localDirList;
     return conf.get("yarn.nodemanager.local-dirs");
   }
+
+  public static UserGroupInformation loginWithKerberos(
+      String principal, String keytabFile) throws IOException {
+    if (!UserGroupInformation.isSecurityEnabled()) return null;
+    if (principal.isEmpty() || keytabFile.isEmpty()) {
+      throw new RuntimeException("Kerberos principal and/or keytab are empty");
+    }
+    LOG.info("Logging in as " + principal + " via " + keytabFile);
+    return UserGroupInformation.loginUserFromKeytabAndReturnUGI(
+        SecurityUtil.getServerPrincipal(principal, "0.0.0.0"), keytabFile);
+  }
+
+  private final static Pattern hostsRe = Pattern.compile("[^A-Za-z0-9_-]");
+  public static String generateClusterName(Configuration conf) {
+    String hosts = HiveConf.getTrimmedVar(conf, ConfVars.LLAP_DAEMON_SERVICE_HOSTS);
+    return hostsRe.matcher(hosts.startsWith("@") ? hosts.substring(1) : hosts).replaceAll("_");
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/8c4b99a4/llap-common/src/java/org/apache/hadoop/hive/llap/impl/LlapManagementProtocolClientImpl.java
----------------------------------------------------------------------
diff --git a/llap-common/src/java/org/apache/hadoop/hive/llap/impl/LlapManagementProtocolClientImpl.java b/llap-common/src/java/org/apache/hadoop/hive/llap/impl/LlapManagementProtocolClientImpl.java
index cd11bdb..af760b1 100644
--- a/llap-common/src/java/org/apache/hadoop/hive/llap/impl/LlapManagementProtocolClientImpl.java
+++ b/llap-common/src/java/org/apache/hadoop/hive/llap/impl/LlapManagementProtocolClientImpl.java
@@ -65,8 +65,7 @@ public class LlapManagementProtocolClientImpl implements LlapManagementProtocolP
     RPC.setProtocolEngine(conf, LlapManagementProtocolPB.class, ProtobufRpcEngine.class);
     ProtocolProxy<LlapManagementProtocolPB> proxy =
         RPC.getProtocolProxy(LlapManagementProtocolPB.class, 0, serverAddr,
-            UserGroupInformation.getCurrentUser(), conf, NetUtils.getDefaultSocketFactory(conf), 0,
-            retryPolicy);
+            UserGroupInformation.getCurrentUser(), conf, socketFactory, 0, retryPolicy);
     return proxy.getProxy();
   }
 

http://git-wip-us.apache.org/repos/asf/hive/blob/8c4b99a4/llap-common/src/java/org/apache/hadoop/hive/llap/security/LlapTokenProvider.java
----------------------------------------------------------------------
diff --git a/llap-common/src/java/org/apache/hadoop/hive/llap/security/LlapTokenProvider.java b/llap-common/src/java/org/apache/hadoop/hive/llap/security/LlapTokenProvider.java
deleted file mode 100644
index edf9b18..0000000
--- a/llap-common/src/java/org/apache/hadoop/hive/llap/security/LlapTokenProvider.java
+++ /dev/null
@@ -1,27 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hive.llap.security;
-
-import java.io.IOException;
-
-import org.apache.hadoop.security.token.Token;
-
-public interface LlapTokenProvider {
-  Token<LlapTokenIdentifier> getDelegationToken(String appId) throws IOException;
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hive/blob/8c4b99a4/llap-common/src/java/org/apache/hadoop/hive/llap/security/SecretManager.java
----------------------------------------------------------------------
diff --git a/llap-common/src/java/org/apache/hadoop/hive/llap/security/SecretManager.java b/llap-common/src/java/org/apache/hadoop/hive/llap/security/SecretManager.java
new file mode 100644
index 0000000..465b204
--- /dev/null
+++ b/llap-common/src/java/org/apache/hadoop/hive/llap/security/SecretManager.java
@@ -0,0 +1,162 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.llap.security;
+
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.security.PrivilegedAction;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
+import org.apache.hadoop.hive.llap.LlapUtil;
+import org.apache.hadoop.hive.llap.security.LlapTokenIdentifier;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.delegation.ZKDelegationTokenSecretManager;
+import org.apache.hadoop.security.token.delegation.web.DelegationTokenManager;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class SecretManager extends ZKDelegationTokenSecretManager<LlapTokenIdentifier> {
+  private static final Logger LOG = LoggerFactory.getLogger(SecretManager.class);
+  private final String clusterId;
+
+  public SecretManager(Configuration conf, String clusterId) {
+    super(conf);
+    this.clusterId = clusterId;
+    checkForZKDTSMBug(conf);
+  }
+
+  // Workaround for HADOOP-12659 - remove when Hadoop 2.7.X is no longer supported.
+  private void checkForZKDTSMBug(Configuration conf) {
+    // There's a bug in ZKDelegationTokenSecretManager ctor where seconds are not converted to ms.
+    long expectedRenewTimeSec = conf.getLong(DelegationTokenManager.RENEW_INTERVAL, -1);
+    LOG.info("Checking for tokenRenewInterval bug: " + expectedRenewTimeSec);
+    if (expectedRenewTimeSec == -1) return; // The default works, no bug.
+    java.lang.reflect.Field f = null;
+    try {
+     Class<?> c = org.apache.hadoop.security.token.delegation.AbstractDelegationTokenSecretManager.class;
+     f = c.getDeclaredField("tokenRenewInterval");
+     f.setAccessible(true);
+    } catch (Throwable t) {
+      // Maybe someone removed the field; probably ok to ignore.
+      LOG.error("Failed to check for tokenRenewInterval bug, hoping for the best", t);
+      return;
+    }
+    try {
+      long realValue = f.getLong(this);
+      long expectedValue = expectedRenewTimeSec * 1000;
+      LOG.info("tokenRenewInterval is: " + realValue + " (expected " + expectedValue + ")");
+      if (realValue == expectedRenewTimeSec) {
+        // Bug - the field has to be in ms, not sec. Override only if set precisely to sec.
+        f.setLong(this, expectedValue);
+      }
+    } catch (Exception ex) {
+      throw new RuntimeException("Failed to address tokenRenewInterval bug", ex);
+    }
+  }
+
+  @Override
+  public LlapTokenIdentifier createIdentifier() {
+    return new LlapTokenIdentifier();
+  }
+
+  @Override
+  public LlapTokenIdentifier decodeTokenIdentifier(
+      Token<LlapTokenIdentifier> token) throws IOException {
+    DataInputStream dis = new DataInputStream(new ByteArrayInputStream(token.getIdentifier()));
+    LlapTokenIdentifier id = new LlapTokenIdentifier();
+    id.readFields(dis);
+    dis.close();
+    return id;
+  }
+
+  public static SecretManager createSecretManager(final Configuration conf, String clusterId) {
+    String llapPrincipal = HiveConf.getVar(conf, ConfVars.LLAP_KERBEROS_PRINCIPAL),
+        llapKeytab = HiveConf.getVar(conf, ConfVars.LLAP_KERBEROS_KEYTAB_FILE);
+    return SecretManager.createSecretManager(conf, llapPrincipal, llapKeytab, clusterId);
+  }
+
+
+  public static SecretManager createSecretManager(final Configuration conf,
+      String llapPrincipal, String llapKeytab, final String clusterId) {
+    // Create ZK connection under a separate ugi (if specified) - ZK works in mysterious ways.
+    UserGroupInformation zkUgi = null;
+    String principal = HiveConf.getVar(conf, ConfVars.LLAP_ZKSM_KERBEROS_PRINCIPAL, llapPrincipal);
+    String keyTab = HiveConf.getVar(conf, ConfVars.LLAP_ZKSM_KERBEROS_KEYTAB_FILE, llapKeytab);
+    try {
+      zkUgi = LlapUtil.loginWithKerberos(principal, keyTab);
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+    // Override the default delegation token lifetime for LLAP.
+    // Also set all the necessary ZK settings to defaults and LLAP configs, if not set.
+    final Configuration zkConf = new Configuration(conf);
+    long tokenLifetime = HiveConf.getTimeVar(
+        conf, ConfVars.LLAP_DELEGATION_TOKEN_LIFETIME, TimeUnit.SECONDS);
+    zkConf.setLong(DelegationTokenManager.MAX_LIFETIME, tokenLifetime);
+    zkConf.setLong(DelegationTokenManager.RENEW_INTERVAL, tokenLifetime);
+    zkConf.set(SecretManager.ZK_DTSM_ZK_KERBEROS_PRINCIPAL, principal);
+    zkConf.set(SecretManager.ZK_DTSM_ZK_KERBEROS_KEYTAB, keyTab);
+    String zkPath = clusterId;
+    LOG.info("Using {} as ZK secret manager path", zkPath);
+    zkConf.set(SecretManager.ZK_DTSM_ZNODE_WORKING_PATH, "zkdtsm_" + zkPath);
+    setZkConfIfNotSet(zkConf, SecretManager.ZK_DTSM_ZK_AUTH_TYPE, "sasl");
+    setZkConfIfNotSet(zkConf, SecretManager.ZK_DTSM_ZK_CONNECTION_STRING,
+        HiveConf.getVar(zkConf, ConfVars.LLAP_ZKSM_ZK_CONNECTION_STRING));
+    return zkUgi.doAs(new PrivilegedAction<SecretManager>() {
+      @Override
+      public SecretManager run() {
+        SecretManager zkSecretManager = new SecretManager(zkConf, clusterId);
+        try {
+          zkSecretManager.startThreads();
+        } catch (IOException e) {
+          throw new RuntimeException(e);
+        }
+        return zkSecretManager;
+      }
+    });
+  }
+
+  private static void setZkConfIfNotSet(Configuration zkConf, String name, String value) {
+    if (zkConf.get(name) != null) return;
+    zkConf.set(name, value);
+  }
+
+  public Token<LlapTokenIdentifier> createLlapToken(String appId, String user) throws IOException {
+    Text realUser = null, renewer = null;
+    if (user == null) {
+      UserGroupInformation ugi  = UserGroupInformation.getCurrentUser();
+      user = ugi.getUserName();
+      if (ugi.getRealUser() != null) {
+        realUser = new Text(ugi.getRealUser().getUserName());
+      }
+      renewer = new Text(ugi.getShortUserName());
+    } else {
+      renewer = new Text(user);
+    }
+    LlapTokenIdentifier llapId = new LlapTokenIdentifier(
+        new Text(user), renewer, realUser, clusterId, appId);
+    // TODO: note that the token is not renewable right now and will last for 2 weeks by default.
+    Token<LlapTokenIdentifier> token = new Token<LlapTokenIdentifier>(llapId, this);
+    if (LOG.isInfoEnabled()) {
+      LOG.info("Created LLAP token {}", token);
+    }
+    return token;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/8c4b99a4/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java
index 5731b2c..de817e3 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java
@@ -106,12 +106,6 @@ public class LlapDaemon extends CompositeService implements ContainerRunner, Lla
   private final String[] localDirs;
   private final DaemonId daemonId;
 
-  private final static Pattern hostsRe = Pattern.compile("[^A-Za-z0-9_-]");
-  private static String generateClusterName(Configuration conf) {
-    String hosts = HiveConf.getTrimmedVar(conf, ConfVars.LLAP_DAEMON_SERVICE_HOSTS);
-    return hostsRe.matcher(hosts.startsWith("@") ? hosts.substring(1) : hosts).replaceAll("_");
-  }
-
   // TODO Not the best way to share the address
   private final AtomicReference<InetSocketAddress> srvAddress = new AtomicReference<>(),
       mngAddress = new AtomicReference<>();
@@ -150,7 +144,7 @@ public class LlapDaemon extends CompositeService implements ContainerRunner, Lla
     String hostName = MetricsUtils.getHostName();
     try {
       daemonId = new DaemonId(UserGroupInformation.getCurrentUser().getUserName(),
-          generateClusterName(daemonConf), hostName, appName, System.currentTimeMillis());
+          LlapUtil.generateClusterName(daemonConf), hostName, appName, System.currentTimeMillis());
     } catch (IOException ex) {
       throw new RuntimeException(ex);
     }

http://git-wip-us.apache.org/repos/asf/hive/blob/8c4b99a4/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapProtocolServerImpl.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapProtocolServerImpl.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapProtocolServerImpl.java
index db8bfa6..b94fc2e 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapProtocolServerImpl.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapProtocolServerImpl.java
@@ -33,6 +33,7 @@ import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
 import org.apache.hadoop.hive.llap.DaemonId;
+import org.apache.hadoop.hive.llap.LlapUtil;
 import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos;
 import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.GetTokenRequestProto;
 import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.GetTokenResponseProto;
@@ -51,7 +52,6 @@ import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.authorize.AccessControlList;
 import org.apache.hadoop.security.token.Token;
-import org.apache.hadoop.hive.llap.security.LlapSecurityHelper;
 import org.apache.hadoop.hive.llap.security.LlapTokenIdentifier;
 import org.apache.hadoop.hive.llap.security.SecretManager;
 import org.apache.hadoop.service.AbstractService;
@@ -150,12 +150,13 @@ public class LlapProtocolServerImpl extends AbstractService
     }
     String llapPrincipal = HiveConf.getVar(conf, ConfVars.LLAP_KERBEROS_PRINCIPAL),
         llapKeytab = HiveConf.getVar(conf, ConfVars.LLAP_KERBEROS_KEYTAB_FILE);
-    zkSecretManager = SecretManager.createSecretManager(conf, llapPrincipal, llapKeytab, daemonId);
+    zkSecretManager = SecretManager.createSecretManager(
+        conf, llapPrincipal, llapKeytab, daemonId.getClusterString());
 
     // Start the protocol server after properly authenticating with daemon keytab.
     UserGroupInformation daemonUgi = null;
     try {
-      daemonUgi = LlapSecurityHelper.loginWithKerberos(llapPrincipal, llapKeytab);
+      daemonUgi = LlapUtil.loginWithKerberos(llapPrincipal, llapKeytab);
     } catch (IOException e) {
       throw new RuntimeException(e);
     }
@@ -265,9 +266,11 @@ public class LlapProtocolServerImpl extends AbstractService
     if (zkSecretManager == null) {
       throw new ServiceException("Operation not supported on unsecure cluster");
     }
-    UserGroupInformation ugi;
+    UserGroupInformation ugi = null;
+    Token<LlapTokenIdentifier> token = null;
     try {
       ugi = UserGroupInformation.getCurrentUser();
+      token = zkSecretManager.createLlapToken(request.getAppId(), null);
     } catch (IOException e) {
       throw new ServiceException(e);
     }
@@ -278,20 +281,7 @@ public class LlapProtocolServerImpl extends AbstractService
           + " or adjust " + ConfVars.LLAP_MANAGEMENT_ACL.varname + " and "
           + ConfVars.LLAP_MANAGEMENT_ACL_DENY.varname + " to a more restrictive ACL.");
     }
-    String user = ugi.getUserName();
-    Text owner = new Text(user);
-    Text realUser = null;
-    if (ugi.getRealUser() != null) {
-      realUser = new Text(ugi.getRealUser().getUserName());
-    }
-    Text renewer = new Text(ugi.getShortUserName());
-    LlapTokenIdentifier llapId = new LlapTokenIdentifier(owner, renewer, realUser,
-        daemonId.getClusterString(), request.hasAppId() ? request.getAppId() : null);
-    // TODO: note that the token is not renewable right now and will last for 2 weeks by default.
-    Token<LlapTokenIdentifier> token = new Token<LlapTokenIdentifier>(llapId, zkSecretManager);
-    if (LOG.isInfoEnabled()) {
-      LOG.info("Created LLAP token " + token);
-    }
+
     ByteArrayDataOutput out = ByteStreams.newDataOutput();
     try {
       token.write(out);

http://git-wip-us.apache.org/repos/asf/hive/blob/8c4b99a4/llap-server/src/java/org/apache/hadoop/hive/llap/security/LlapSecurityHelper.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/security/LlapSecurityHelper.java b/llap-server/src/java/org/apache/hadoop/hive/llap/security/LlapSecurityHelper.java
deleted file mode 100644
index f958bc4..0000000
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/security/LlapSecurityHelper.java
+++ /dev/null
@@ -1,164 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hive.llap.security;
-
-import java.io.IOException;
-import java.security.PrivilegedExceptionAction;
-import java.util.Iterator;
-import java.util.Map;
-import java.util.concurrent.TimeUnit;
-
-import javax.net.SocketFactory;
-
-import org.apache.commons.lang3.StringUtils;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hive.llap.impl.LlapManagementProtocolClientImpl;
-import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.GetTokenRequestProto;
-import org.apache.hadoop.hive.llap.registry.impl.LlapRegistryService;
-import org.apache.hadoop.hive.llap.registry.ServiceInstance;
-import org.apache.hadoop.hive.llap.registry.ServiceInstanceSet;
-import org.apache.hadoop.io.DataInputByteBuffer;
-import org.apache.hadoop.io.retry.RetryPolicies;
-import org.apache.hadoop.io.retry.RetryPolicy;
-import org.apache.hadoop.net.NetUtils;
-import org.apache.hadoop.security.SecurityUtil;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.security.token.Token;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.protobuf.ByteString;
-
-/** Individual instances of this class are not thread safe. */
-public class LlapSecurityHelper implements LlapTokenProvider {
-  private static final Logger LOG = LoggerFactory.getLogger(LlapSecurityHelper.class);
-
-  private UserGroupInformation llapUgi;
-
-  private final LlapRegistryService registry;
-  private ServiceInstanceSet activeInstances;
-  private final Configuration conf;
-  private LlapManagementProtocolClientImpl client;
-  private ServiceInstance clientInstance;
-
-  private final SocketFactory socketFactory;
-  private final RetryPolicy retryPolicy;
-
-  public LlapSecurityHelper(Configuration conf) {
-    this.conf = conf;
-    registry = new LlapRegistryService(false);
-    registry.init(conf);
-    socketFactory = NetUtils.getDefaultSocketFactory(conf);
-    retryPolicy = RetryPolicies.retryUpToMaximumTimeWithFixedSleep(
-        16000, 2000l, TimeUnit.MILLISECONDS);
-  }
-
-  public static UserGroupInformation loginWithKerberos(
-      String principal, String keytabFile) throws IOException {
-    if (!UserGroupInformation.isSecurityEnabled()) return null;
-    if (principal.isEmpty() || keytabFile.isEmpty()) {
-      throw new RuntimeException("Kerberos principal and/or keytab are empty");
-    }
-    LOG.info("Logging in as " + principal + " via " + keytabFile);
-    UserGroupInformation.loginUserFromKeytab(
-        SecurityUtil.getServerPrincipal(principal, "0.0.0.0"), keytabFile);
-    return UserGroupInformation.getLoginUser();
-  }
-
-  @Override
-  public Token<LlapTokenIdentifier> getDelegationToken(String appId) throws IOException {
-    if (!UserGroupInformation.isSecurityEnabled()) return null;
-    if (llapUgi == null) {
-      llapUgi = UserGroupInformation.getCurrentUser();
-      // We could have also added keytab support; right now client must do smth like kinit.
-    }
-    Iterator<ServiceInstance> llaps = null;
-    if (clientInstance == null) {
-      assert client == null;
-      llaps = getLlapServices(false);
-      clientInstance = llaps.next();
-    }
-
-    ByteString tokenBytes = null;
-    boolean hasRefreshed = false;
-    while (true) {
-      try {
-        tokenBytes = getTokenBytes(appId);
-        break;
-      } catch (InterruptedException ie) {
-        throw new RuntimeException(ie);
-      } catch (IOException ex) {
-        LOG.error("Cannot get a token, trying a different instance", ex);
-        client = null;
-        clientInstance = null;
-      }
-      if (llaps == null || !llaps.hasNext()) {
-        if (hasRefreshed) { // Only refresh once.
-          throw new RuntimeException("Cannot find any LLAPs to get the token from");
-        }
-        llaps = getLlapServices(true);
-        hasRefreshed = true;
-      }
-      clientInstance = llaps.next();
-    }
-
-    // Stupid protobuf byte-buffer reinvention.
-    Token<LlapTokenIdentifier> token = new Token<>();
-    DataInputByteBuffer in = new DataInputByteBuffer();
-    in.reset(tokenBytes.asReadOnlyByteBuffer());
-    token.readFields(in);
-    if (LOG.isInfoEnabled()) {
-      LOG.info("Obtained a LLAP delegation token from " + clientInstance + ": " + token);
-    }
-    return token;
-  }
-
-  private ByteString getTokenBytes(
-      final String appId) throws InterruptedException, IOException {
-    return llapUgi.doAs(new PrivilegedExceptionAction<ByteString>() {
-      @Override
-      public ByteString run() throws Exception {
-        assert clientInstance != null;
-        if (client == null) {
-          client = new LlapManagementProtocolClientImpl(conf, clientInstance.getHost(),
-              clientInstance.getManagementPort(), retryPolicy, socketFactory);
-        }
-        // Client only connects on the first call, so this has to be done in doAs.
-        GetTokenRequestProto.Builder req = GetTokenRequestProto.newBuilder();
-        if (!StringUtils.isBlank(appId)) {
-          req.setAppId(appId);
-        }
-        return client.getDelegationToken(null, req.build()).getToken();
-      }
-    });
-  }
-
-  private Iterator<ServiceInstance> getLlapServices(boolean doForceRefresh) throws IOException {
-    if (activeInstances == null) {
-      registry.start();
-      activeInstances = registry.getInstances();
-    }
-    Map<String, ServiceInstance> daemons = activeInstances.getAll();
-    if (doForceRefresh || daemons == null || daemons.isEmpty()) {
-      daemons = activeInstances.getAll();
-      if (daemons == null || daemons.isEmpty()) throw new RuntimeException("No LLAPs found");
-    }
-    return daemons.values().iterator();
-  }
-}

http://git-wip-us.apache.org/repos/asf/hive/blob/8c4b99a4/llap-server/src/java/org/apache/hadoop/hive/llap/security/SecretManager.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/security/SecretManager.java b/llap-server/src/java/org/apache/hadoop/hive/llap/security/SecretManager.java
deleted file mode 100644
index c54e726..0000000
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/security/SecretManager.java
+++ /dev/null
@@ -1,131 +0,0 @@
-/*
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hive.llap.security;
-
-import java.io.ByteArrayInputStream;
-import java.io.DataInputStream;
-import java.io.IOException;
-import java.security.PrivilegedAction;
-import java.util.concurrent.TimeUnit;
-import java.util.regex.Pattern;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
-import org.apache.hadoop.hive.llap.DaemonId;
-import org.apache.hadoop.hive.llap.security.LlapTokenIdentifier;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.security.token.Token;
-import org.apache.hadoop.security.token.delegation.ZKDelegationTokenSecretManager;
-import org.apache.hadoop.security.token.delegation.web.DelegationTokenManager;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class SecretManager extends ZKDelegationTokenSecretManager<LlapTokenIdentifier> {
-  private static final Logger LOG = LoggerFactory.getLogger(SecretManager.class);
-
-  public SecretManager(Configuration conf) {
-    super(conf);
-    checkForZKDTSMBug(conf);
-  }
-
-  // Workaround for HADOOP-12659 - remove when Hadoop 2.7.X is no longer supported.
-  private void checkForZKDTSMBug(Configuration conf) {
-    // There's a bug in ZKDelegationTokenSecretManager ctor where seconds are not converted to ms.
-    long expectedRenewTimeSec = conf.getLong(DelegationTokenManager.RENEW_INTERVAL, -1);
-    LOG.info("Checking for tokenRenewInterval bug: " + expectedRenewTimeSec);
-    if (expectedRenewTimeSec == -1) return; // The default works, no bug.
-    java.lang.reflect.Field f = null;
-    try {
-     Class<?> c = org.apache.hadoop.security.token.delegation.AbstractDelegationTokenSecretManager.class;
-     f = c.getDeclaredField("tokenRenewInterval");
-     f.setAccessible(true);
-    } catch (Throwable t) {
-      // Maybe someone removed the field; probably ok to ignore.
-      LOG.error("Failed to check for tokenRenewInterval bug, hoping for the best", t);
-      return;
-    }
-    try {
-      long realValue = f.getLong(this);
-      long expectedValue = expectedRenewTimeSec * 1000;
-      LOG.info("tokenRenewInterval is: " + realValue + " (expected " + expectedValue + ")");
-      if (realValue == expectedRenewTimeSec) {
-        // Bug - the field has to be in ms, not sec. Override only if set precisely to sec.
-        f.setLong(this, expectedValue);
-      }
-    } catch (Exception ex) {
-      throw new RuntimeException("Failed to address tokenRenewInterval bug", ex);
-    }
-  }
-
-  @Override
-  public LlapTokenIdentifier createIdentifier() {
-    return new LlapTokenIdentifier();
-  }
-
-  @Override
-  public LlapTokenIdentifier decodeTokenIdentifier(
-      Token<LlapTokenIdentifier> token) throws IOException {
-    DataInputStream dis = new DataInputStream(new ByteArrayInputStream(token.getIdentifier()));
-    LlapTokenIdentifier id = new LlapTokenIdentifier();
-    id.readFields(dis);
-    dis.close();
-    return id;
-  }
-
-  public static SecretManager createSecretManager(
-      final Configuration conf, String llapPrincipal, String llapKeytab, DaemonId daemonId) {
-    // Create ZK connection under a separate ugi (if specified) - ZK works in mysterious ways.
-    UserGroupInformation zkUgi = null;
-    String principal = HiveConf.getVar(conf, ConfVars.LLAP_ZKSM_KERBEROS_PRINCIPAL, llapPrincipal);
-    String keyTab = HiveConf.getVar(conf, ConfVars.LLAP_ZKSM_KERBEROS_KEYTAB_FILE, llapKeytab);
-    try {
-      zkUgi = LlapSecurityHelper.loginWithKerberos(principal, keyTab);
-    } catch (IOException e) {
-      throw new RuntimeException(e);
-    }
-    // Override the default delegation token lifetime for LLAP.
-    // Also set all the necessary ZK settings to defaults and LLAP configs, if not set.
-    final Configuration zkConf = new Configuration(conf);
-    long tokenLifetime = HiveConf.getTimeVar(
-        conf, ConfVars.LLAP_DELEGATION_TOKEN_LIFETIME, TimeUnit.SECONDS);
-    zkConf.setLong(DelegationTokenManager.MAX_LIFETIME, tokenLifetime);
-    zkConf.setLong(DelegationTokenManager.RENEW_INTERVAL, tokenLifetime);
-    zkConf.set(SecretManager.ZK_DTSM_ZK_KERBEROS_PRINCIPAL, principal);
-    zkConf.set(SecretManager.ZK_DTSM_ZK_KERBEROS_KEYTAB, keyTab);
-    String zkPath = daemonId.getClusterString();
-    LOG.info("Using {} as ZK secret manager path", zkPath);
-    zkConf.set(SecretManager.ZK_DTSM_ZNODE_WORKING_PATH, "zkdtsm_" + zkPath);
-    setZkConfIfNotSet(zkConf, SecretManager.ZK_DTSM_ZK_AUTH_TYPE, "sasl");
-    setZkConfIfNotSet(zkConf, SecretManager.ZK_DTSM_ZK_CONNECTION_STRING,
-        HiveConf.getVar(zkConf, ConfVars.LLAP_ZKSM_ZK_CONNECTION_STRING));
-    return zkUgi.doAs(new PrivilegedAction<SecretManager>() {
-      @Override
-      public SecretManager run() {
-        SecretManager zkSecretManager = new SecretManager(zkConf);
-        try {
-          zkSecretManager.startThreads();
-        } catch (IOException e) {
-          throw new RuntimeException(e);
-        }
-        return zkSecretManager;
-      }
-    });
-  }
-
-  private static void setZkConfIfNotSet(Configuration zkConf, String name, String value) {
-    if (zkConf.get(name) != null) return;
-    zkConf.set(name, value);
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hive/blob/8c4b99a4/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java
index fd6465a..c9b912b 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java
@@ -44,16 +44,19 @@ import javax.security.auth.login.LoginException;
 
 import org.apache.commons.codec.digest.DigestUtils;
 import org.apache.commons.io.FilenameUtils;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
+import org.apache.hadoop.hive.llap.DaemonId;
+import org.apache.hadoop.hive.llap.LlapUtil;
 import org.apache.hadoop.hive.llap.impl.LlapProtocolClientImpl;
-import org.apache.hadoop.hive.llap.io.api.LlapProxy;
+import org.apache.hadoop.hive.llap.security.LlapTokenClientFactory;
 import org.apache.hadoop.hive.llap.security.LlapTokenIdentifier;
-import org.apache.hadoop.hive.llap.security.LlapTokenProvider;
+import org.apache.hadoop.hive.llap.security.LlapTokenLocalClient;
 import org.apache.hadoop.hive.llap.tez.LlapProtocolClientProxy;
 import org.apache.hadoop.hive.llap.tezplugins.LlapContainerLauncher;
 import org.apache.hadoop.hive.llap.tezplugins.LlapTaskCommunicator;
@@ -82,6 +85,11 @@ import org.apache.tez.serviceplugins.api.TaskSchedulerDescriptor;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.RemovalListener;
+import com.google.common.cache.RemovalNotification;
+
 /**
  * Holds session state related to Tez
  */
@@ -274,14 +282,8 @@ public class TezSessionState {
     Credentials llapCredentials = null;
     if (llapMode) {
       if (UserGroupInformation.isSecurityEnabled()) {
-        LlapTokenProvider tp = LlapProxy.getOrInitTokenProvider(conf);
-        // For Tez, we don't use appId to distinguish the tokens; security scope is the user.
-        Token<LlapTokenIdentifier> token = tp.getDelegationToken(null);
-        if (LOG.isInfoEnabled()) {
-          LOG.info("Obtained a LLAP token: " + token);
-        }
         llapCredentials = new Credentials();
-        llapCredentials.addToken(LlapTokenIdentifier.KIND_NAME, token);
+        llapCredentials.addToken(LlapTokenIdentifier.KIND_NAME, getLlapToken(user, tezConfig));
       }
       UserPayload servicePluginPayload = TezUtils.createUserPayloadFromConf(tezConfig);
       // we need plugins to handle llap and uber mode
@@ -336,6 +338,62 @@ public class TezSessionState {
     }
   }
 
+  // Only cache ZK connections (ie local clients); these are presumed to be used in HS2.
+  // TODO: temporary before HIVE-13698.
+  private static final Cache<String, LlapTokenLocalClient> localClientCache = CacheBuilder
+      .newBuilder().expireAfterAccess(10, TimeUnit.MINUTES)
+      .removalListener(new RemovalListener<String, LlapTokenLocalClient>() {
+        @Override
+        public void onRemoval(RemovalNotification<String, LlapTokenLocalClient> notification) {
+          if (notification.getValue() != null) {
+            notification.getValue().close();
+          }
+        }
+      }).build();
+
+  private static Token<LlapTokenIdentifier> getLlapToken(
+      String user, final Configuration conf) throws IOException {
+    // TODO: parts of this should be moved out of TezSession to reuse the clients, but there's
+    //       no good place for that right now (HIVE-13698).
+    boolean useLocalTokenClient = isUsingLocalClient(conf);
+    Token<LlapTokenIdentifier> token = null;
+    // For Tez, we don't use appId to distinguish the tokens.
+    if (useLocalTokenClient) {
+      String clusterName = LlapUtil.generateClusterName(conf);
+      // This assumes that the LLAP cluster and session are both running under HS2 user.
+      final String clusterId = DaemonId.createClusterString(user, clusterName);
+      try {
+        token = localClientCache.get(clusterId, new Callable<LlapTokenLocalClient>() {
+          @Override
+          public LlapTokenLocalClient call() throws Exception {
+            return new LlapTokenLocalClient(conf, clusterId);
+          }
+        }).createToken(null, null);
+      } catch (ExecutionException e) {
+        throw new IOException(e);
+      }
+    } else {
+      token = new LlapTokenClientFactory(conf).createClient().getDelegationToken(null);
+    }
+    if (LOG.isInfoEnabled()) {
+      LOG.info("Obtained a LLAP token: " + token);
+    }
+    return token;
+  }
+
+  private static boolean isUsingLocalClient(Configuration conf) {
+    String mode = HiveConf.getVar(conf, ConfVars.LLAP_CREATE_TOKEN_LOCALLY).toLowerCase();
+    boolean isHs2Only = "hs2".equals(mode);
+    // We are initialized on first use inside TezSessionState::openInternal; assume the session
+    // should be available.
+    if (!isHs2Only) return "true".equals(mode);
+    SessionState session = SessionState.get();
+    if (session == null && LOG.isInfoEnabled()) {
+      LOG.warn("There's no session to check if we are in HS2");
+    }
+    return session != null && session.isHiveServerQuery();
+  }
+
   private TezClient startSessionAndContainers(TezClient session, HiveConf conf,
       Map<String, LocalResource> commonLocalResources, TezConfiguration tezConfig,
       boolean isOnThread) throws TezException, IOException {