You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ha...@apache.org on 2012/02/25 05:21:13 UTC

svn commit: r1293530 - in /hive/trunk: common/src/java/org/apache/hadoop/hive/conf/ conf/ shims/src/common-secure/java/org/apache/hadoop/hive/thrift/ shims/src/test/org/apache/hadoop/hive/thrift/

Author: hashutosh
Date: Sat Feb 25 04:21:11 2012
New Revision: 1293530

URL: http://svn.apache.org/viewvc?rev=1293530&view=rev
Log:
HIVE-2712: Make ZooKeeper token store ACL configurable (thw via hashutosh)

Added:
    hive/trunk/shims/src/common-secure/java/org/apache/hadoop/hive/thrift/DelegationTokenStore.java
    hive/trunk/shims/src/test/org/apache/hadoop/hive/thrift/TestZooKeeperTokenStore.java
Modified:
    hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
    hive/trunk/conf/hive-default.xml.template
    hive/trunk/shims/src/common-secure/java/org/apache/hadoop/hive/thrift/HadoopThriftAuthBridge20S.java
    hive/trunk/shims/src/common-secure/java/org/apache/hadoop/hive/thrift/MemoryTokenStore.java
    hive/trunk/shims/src/common-secure/java/org/apache/hadoop/hive/thrift/TokenStoreDelegationTokenSecretManager.java
    hive/trunk/shims/src/common-secure/java/org/apache/hadoop/hive/thrift/ZooKeeperTokenStore.java
    hive/trunk/shims/src/test/org/apache/hadoop/hive/thrift/TestHadoop20SAuthBridge.java

Modified: hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
URL: http://svn.apache.org/viewvc/hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java?rev=1293530&r1=1293529&r2=1293530&view=diff
==============================================================================
--- hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java (original)
+++ hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java Sat Feb 25 04:21:11 2012
@@ -263,6 +263,15 @@ public class HiveConf extends Configurat
     METASTORE_KERBEROS_PRINCIPAL("hive.metastore.kerberos.principal",
         "hive-metastore/_HOST@EXAMPLE.COM"),
     METASTORE_USE_THRIFT_SASL("hive.metastore.sasl.enabled", false),
+    METASTORE_CLUSTER_DELEGATION_TOKEN_STORE_CLS(
+        "hive.cluster.delegation.token.store.class",
+        "org.apache.hadoop.hive.thrift.MemoryTokenStore"),
+    METASTORE_CLUSTER_DELEGATION_TOKEN_STORE_ZK_CONNECTSTR(
+        "hive.cluster.delegation.token.store.zookeeper.connectString", ""),
+    METASTORE_CLUSTER_DELEGATION_TOKEN_STORE_ZK_ZNODE(
+        "hive.cluster.delegation.token.store.zookeeper.znode", "/hive/cluster/delegation"),
+    METASTORE_CLUSTER_DELEGATION_TOKEN_STORE_ZK_ACL(
+        "hive.cluster.delegation.token.store.zookeeper.acl", ""),
     METASTORE_CACHE_PINOBJTYPES("hive.metastore.cache.pinobjtypes", "Table,StorageDescriptor,SerDeInfo,Partition,Database,Type,FieldSchema,Order"),
     METASTORE_CONNECTION_POOLING_TYPE("datanucleus.connectionPoolingType", "DBCP"),
     METASTORE_VALIDATE_TABLES("datanucleus.validateTables", false),

Modified: hive/trunk/conf/hive-default.xml.template
URL: http://svn.apache.org/viewvc/hive/trunk/conf/hive-default.xml.template?rev=1293530&r1=1293529&r2=1293530&view=diff
==============================================================================
--- hive/trunk/conf/hive-default.xml.template (original)
+++ hive/trunk/conf/hive-default.xml.template Sat Feb 25 04:21:11 2012
@@ -754,6 +754,30 @@
 </property>
 
 <property>
+  <name>hive.cluster.delegation.token.store.class</name>
+  <value>org.apache.hadoop.hive.thrift.MemoryTokenStore</value>
+  <description>The delegation token store implementation. Set to org.apache.hadoop.hive.thrift.ZooKeeperTokenStore for load-balanced cluster.</description>
+</property>
+
+<property>
+  <name>hive.cluster.delegation.token.store.zookeeper.connectString</name>
+  <value>localhost:2181</value>
+  <description>The ZooKeeper token store connect string.</description>
+</property>
+
+<property>
+  <name>hive.cluster.delegation.token.store.zookeeper.znode</name>
+  <value>/hive/cluster/delegation</value>
+  <description>The root path for token store data.</description>
+</property>
+
+<property>
+  <name>hive.cluster.delegation.token.store.zookeeper.acl</name>
+  <value>sasl:hive/host1@EXAMPLE.COM:cdrwa,sasl:hive/host2@EXAMPLE.COM:cdrwa</value>
+  <description>ACL for token store entries. List comma separated all server principals for the cluster.</description>
+</property>
+
+<property>
   <name>hive.metastore.cache.pinobjtypes</name>
   <value>Table,StorageDescriptor,SerDeInfo,Partition,Database,Type,FieldSchema,Order</value>
   <description>List of comma separated metastore object types that should be pinned in the cache</description>

Added: hive/trunk/shims/src/common-secure/java/org/apache/hadoop/hive/thrift/DelegationTokenStore.java
URL: http://svn.apache.org/viewvc/hive/trunk/shims/src/common-secure/java/org/apache/hadoop/hive/thrift/DelegationTokenStore.java?rev=1293530&view=auto
==============================================================================
--- hive/trunk/shims/src/common-secure/java/org/apache/hadoop/hive/thrift/DelegationTokenStore.java (added)
+++ hive/trunk/shims/src/common-secure/java/org/apache/hadoop/hive/thrift/DelegationTokenStore.java Sat Feb 25 04:21:11 2012
@@ -0,0 +1,110 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.thrift;
+
+import java.util.List;
+
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenSecretManager.DelegationTokenInformation;
+
+/**
+ * Interface for pluggable token store that can be implemented with shared external
+ * storage for load balancing and high availability (for example using ZooKeeper).
+ * Internal, store specific errors are translated into {@link TokenStoreException}.
+ */
+public interface DelegationTokenStore extends Configurable {
+
+  /**
+   * Exception for internal token store errors that typically cannot be handled by the caller.
+   */
+  public static class TokenStoreException extends RuntimeException {
+    private static final long serialVersionUID = -8693819817623074083L;
+
+    public TokenStoreException(Throwable cause) {
+      super(cause);
+    }
+
+    public TokenStoreException(String message, Throwable cause) {
+      super(message, cause);
+    }
+  }
+
+  /**
+   * Add new master key. The token store assigns and returns the sequence number.
+   * Caller needs to use the identifier to update the key (since it is embedded in the key).
+   *
+   * @param s
+   * @return sequence number for new key
+   */
+  int addMasterKey(String s) throws TokenStoreException;
+
+  /**
+   * Update master key (for expiration and setting store assigned sequence within key)
+   * @param keySeq
+   * @param s
+   * @throws TokenStoreException
+   */
+  void updateMasterKey(int keySeq, String s) throws TokenStoreException;
+
+  /**
+   * Remove key for given id.
+   * @param keySeq
+   * @return false if key no longer present, true otherwise.
+   */
+  boolean removeMasterKey(int keySeq);
+
+  /**
+   * Return all master keys.
+   * @return
+   * @throws TokenStoreException
+   */
+  String[] getMasterKeys() throws TokenStoreException;
+
+  /**
+   * Add token. If identifier is already present, token won't be added.
+   * @param tokenIdentifier
+   * @param token
+   * @return true if token was added, false for existing identifier
+   */
+  boolean addToken(DelegationTokenIdentifier tokenIdentifier,
+      DelegationTokenInformation token) throws TokenStoreException;
+
+  /**
+   * Get token. Returns null if the token does not exist.
+   * @param tokenIdentifier
+   * @return
+   */
+  DelegationTokenInformation getToken(DelegationTokenIdentifier tokenIdentifier)
+      throws TokenStoreException;
+
+  /**
+   * Remove token. Return value can be used by caller to detect concurrency.
+   * @param tokenIdentifier
+   * @return true if token was removed, false if it was already removed.
+   * @throws TokenStoreException
+   */
+  boolean removeToken(DelegationTokenIdentifier tokenIdentifier) throws TokenStoreException;
+
+  /**
+   * List of all token identifiers in the store. This is used to remove expired tokens
+   * and a potential scalability improvement would be to partition by master key id
+   * @return
+   */
+  List<DelegationTokenIdentifier> getAllDelegationTokenIdentifiers();
+
+}

Modified: hive/trunk/shims/src/common-secure/java/org/apache/hadoop/hive/thrift/HadoopThriftAuthBridge20S.java
URL: http://svn.apache.org/viewvc/hive/trunk/shims/src/common-secure/java/org/apache/hadoop/hive/thrift/HadoopThriftAuthBridge20S.java?rev=1293530&r1=1293529&r2=1293530&view=diff
==============================================================================
--- hive/trunk/shims/src/common-secure/java/org/apache/hadoop/hive/thrift/HadoopThriftAuthBridge20S.java (original)
+++ hive/trunk/shims/src/common-secure/java/org/apache/hadoop/hive/thrift/HadoopThriftAuthBridge20S.java Sat Feb 25 04:21:11 2012
@@ -213,9 +213,11 @@ import org.apache.thrift.transport.TTran
        "hive.cluster.delegation.token.store.class";
      public static final String DELEGATION_TOKEN_STORE_ZK_CONNECT_STR =
          "hive.cluster.delegation.token.store.zookeeper.connectString";
-     public static final String DELEGATION_TOKEN_STORE_ZK_ROOT_NODE =
-         "hive.cluster.delegation.token.store.zookeeper.rootNode";
-     public static final String DELEGATION_TOKEN_STORE_ZK_ROOT_NODE_DEFAULT =
+     public static final String DELEGATION_TOKEN_STORE_ZK_ZNODE =
+         "hive.cluster.delegation.token.store.zookeeper.znode";
+     public static final String DELEGATION_TOKEN_STORE_ZK_ACL =
+             "hive.cluster.delegation.token.store.zookeeper.acl";
+     public static final String DELEGATION_TOKEN_STORE_ZK_ZNODE_DEFAULT =
          "/hive/cluster/delegation";
 
      public Server() throws TTransportException {
@@ -291,16 +293,16 @@ import org.apache.thrift.transport.TTran
       return new TUGIAssumingProcessor(processor, secretManager);
      }
 
-    protected TokenStoreDelegationTokenSecretManager.TokenStore getTokenStore(Configuration conf)
+    protected DelegationTokenStore getTokenStore(Configuration conf)
         throws IOException {
        String tokenStoreClassName = conf.get(DELEGATION_TOKEN_STORE_CLS, "");
        if (StringUtils.isBlank(tokenStoreClassName)) {
          return new MemoryTokenStore();
        }
        try {
-        Class<? extends TokenStoreDelegationTokenSecretManager.TokenStore> storeClass = Class
+        Class<? extends DelegationTokenStore> storeClass = Class
             .forName(tokenStoreClassName).asSubclass(
-                TokenStoreDelegationTokenSecretManager.TokenStore.class);
+                DelegationTokenStore.class);
         return ReflectionUtils.newInstance(storeClass, conf);
        } catch (ClassNotFoundException e) {
         throw new IOException("Error initializing delegation token store: " + tokenStoreClassName,

Modified: hive/trunk/shims/src/common-secure/java/org/apache/hadoop/hive/thrift/MemoryTokenStore.java
URL: http://svn.apache.org/viewvc/hive/trunk/shims/src/common-secure/java/org/apache/hadoop/hive/thrift/MemoryTokenStore.java?rev=1293530&r1=1293529&r2=1293530&view=diff
==============================================================================
--- hive/trunk/shims/src/common-secure/java/org/apache/hadoop/hive/thrift/MemoryTokenStore.java (original)
+++ hive/trunk/shims/src/common-secure/java/org/apache/hadoop/hive/thrift/MemoryTokenStore.java Sat Feb 25 04:21:11 2012
@@ -18,7 +18,10 @@
 
 package org.apache.hadoop.hive.thrift;
 
+import java.util.ArrayList;
 import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.hadoop.conf.Configuration;
@@ -27,29 +30,31 @@ import org.apache.hadoop.security.token.
 /**
  * Default in-memory token store implementation.
  */
-public class MemoryTokenStore implements TokenStoreDelegationTokenSecretManager.TokenStore {
+public class MemoryTokenStore implements DelegationTokenStore {
 
-  private final java.util.concurrent.ConcurrentHashMap<Integer, String> masterKeys
-      = new java.util.concurrent.ConcurrentHashMap<Integer, String>();
+  private final Map<Integer, String> masterKeys
+      = new ConcurrentHashMap<Integer, String>();
 
-  private final java.util.concurrent.ConcurrentHashMap<DelegationTokenIdentifier, DelegationTokenInformation> tokens
-      = new java.util.concurrent.ConcurrentHashMap<DelegationTokenIdentifier, DelegationTokenInformation>();
+  private final ConcurrentHashMap<DelegationTokenIdentifier, DelegationTokenInformation> tokens
+      = new ConcurrentHashMap<DelegationTokenIdentifier, DelegationTokenInformation>();
 
   private final AtomicInteger masterKeySeq = new AtomicInteger();
+  private Configuration conf;
 
   @Override
   public void setConf(Configuration conf) {
+    this.conf = conf;
   }
 
   @Override
   public Configuration getConf() {
-    return null;
+    return this.conf;
   }
 
   @Override
   public int addMasterKey(String s) {
     int keySeq = masterKeySeq.getAndIncrement();
-    masterKeys.putIfAbsent(keySeq, s);
+    masterKeys.put(keySeq, s);
     return keySeq;
   }
 
@@ -88,7 +93,7 @@ public class MemoryTokenStore implements
 
   @Override
   public List<DelegationTokenIdentifier> getAllDelegationTokenIdentifiers() {
-    List<DelegationTokenIdentifier> result = new java.util.ArrayList<DelegationTokenIdentifier>(
+    List<DelegationTokenIdentifier> result = new ArrayList<DelegationTokenIdentifier>(
         tokens.size());
     for (DelegationTokenIdentifier id : tokens.keySet()) {
         result.add(id);

Modified: hive/trunk/shims/src/common-secure/java/org/apache/hadoop/hive/thrift/TokenStoreDelegationTokenSecretManager.java
URL: http://svn.apache.org/viewvc/hive/trunk/shims/src/common-secure/java/org/apache/hadoop/hive/thrift/TokenStoreDelegationTokenSecretManager.java?rev=1293530&r1=1293529&r2=1293530&view=diff
==============================================================================
--- hive/trunk/shims/src/common-secure/java/org/apache/hadoop/hive/thrift/TokenStoreDelegationTokenSecretManager.java (original)
+++ hive/trunk/shims/src/common-secure/java/org/apache/hadoop/hive/thrift/TokenStoreDelegationTokenSecretManager.java Sat Feb 25 04:21:11 2012
@@ -25,12 +25,12 @@ import java.io.DataOutputStream;
 import java.io.IOException;
 import java.lang.reflect.Method;
 import java.util.Arrays;
+import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 
 import org.apache.commons.codec.binary.Base64;
-import org.apache.hadoop.conf.Configurable;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenSecretManager;
@@ -58,88 +58,16 @@ public class TokenStoreDelegationTokenSe
   private static final Logger LOGGER =
       LoggerFactory.getLogger(TokenStoreDelegationTokenSecretManager.class.getName());
 
-  /**
-   * Exception for internal token store errors that typically cannot be handled by the caller.
-   */
-  public static class TokenStoreError extends RuntimeException {
-    private static final long serialVersionUID = -8693819817623074083L;
-
-    public TokenStoreError(Throwable cause) {
-      super(cause);
-    }
-
-    public TokenStoreError(String message, Throwable cause) {
-      super(message, cause);
-    }
-  }
-
-  /**
-   * Interface for pluggable token store that can be implemented as shared store with external
-   * storage (for example with ZooKeeper for HA).
-   * Internal, store specific errors are translated into {@link TokenStoreError}.
-   */
-  public static interface TokenStore extends Configurable {
-    /**
-     * Add new master key. The token store assigns and returns the sequence number.
-     * Caller needs to use the identifier to update the key (since it is embedded in the key).
-     *
-     * @param s
-     * @return sequence number for new key
-     */
-    int addMasterKey(String s) throws TokenStoreError;
-
-    void updateMasterKey(int keySeq, String s) throws TokenStoreError;
-
-    /**
-     * Remove key for given id.
-     * @param keySeq
-     * @return false if key no longer present, true otherwise.
-     */
-    boolean removeMasterKey(int keySeq);
-
-    String[] getMasterKeys() throws TokenStoreError;
-
-    /**
-     * Add token. If identifier is already present, token won't be added.
-     * @param tokenIdentifier
-     * @param token
-     * @return true if token was added, false for existing identifier
-     */
-    boolean addToken(DelegationTokenIdentifier tokenIdentifier,
-        DelegationTokenInformation token) throws TokenStoreError;
-
-    /**
-     * Get token. Returns null if the token does not exist.
-     * @param tokenIdentifier
-     * @return
-     */
-    DelegationTokenInformation getToken(DelegationTokenIdentifier tokenIdentifier)
-        throws TokenStoreError;
-
-    /**
-     * Remove token. Ignores token does not exist.
-     * @param tokenIdentifier
-     */
-    boolean removeToken(DelegationTokenIdentifier tokenIdentifier) throws TokenStoreError;
-
-    /**
-     * List of all token identifiers in the store. This is used to remove expired tokens
-     * and a potential scalability improvement would be to partition by master key id
-     * @return
-     */
-    List<DelegationTokenIdentifier> getAllDelegationTokenIdentifiers();
-
-  }
-
   final private long keyUpdateInterval;
   final private long tokenRemoverScanInterval;
   private Thread tokenRemoverThread;
 
-  final private TokenStore tokenStore;
+  final private DelegationTokenStore tokenStore;
 
   public TokenStoreDelegationTokenSecretManager(long delegationKeyUpdateInterval,
       long delegationTokenMaxLifetime, long delegationTokenRenewInterval,
-      long delegationTokenRemoverScanInterval, TokenStore sharedStore) {
+      long delegationTokenRemoverScanInterval,
+      DelegationTokenStore sharedStore) {
     super(delegationKeyUpdateInterval, delegationTokenMaxLifetime, delegationTokenRenewInterval,
         delegationTokenRemoverScanInterval);
     this.keyUpdateInterval = delegationKeyUpdateInterval;
@@ -162,7 +90,7 @@ public class TokenStoreDelegationTokenSe
     // read keys from token store
     String[] allKeys = tokenStore.getMasterKeys();
     Map<Integer, DelegationKey> keys
-        = new java.util.HashMap<Integer, DelegationKey>(allKeys.length);
+        = new HashMap<Integer, DelegationKey>(allKeys.length);
     for (String keyStr : allKeys) {
       DelegationKey key = new DelegationKey();
       try {
@@ -180,8 +108,7 @@ public class TokenStoreDelegationTokenSe
   }
 
   @Override
-  public byte[] retrievePassword(DelegationTokenIdentifier identifier)
-      throws org.apache.hadoop.security.token.SecretManager.InvalidToken {
+  public byte[] retrievePassword(DelegationTokenIdentifier identifier) throws InvalidToken {
       DelegationTokenInformation info = this.tokenStore.getToken(identifier);
       if (info == null) {
           throw new InvalidToken("token expired or does not exist: " + identifier);

Modified: hive/trunk/shims/src/common-secure/java/org/apache/hadoop/hive/thrift/ZooKeeperTokenStore.java
URL: http://svn.apache.org/viewvc/hive/trunk/shims/src/common-secure/java/org/apache/hadoop/hive/thrift/ZooKeeperTokenStore.java?rev=1293530&r1=1293529&r2=1293530&view=diff
==============================================================================
--- hive/trunk/shims/src/common-secure/java/org/apache/hadoop/hive/thrift/ZooKeeperTokenStore.java (original)
+++ hive/trunk/shims/src/common-secure/java/org/apache/hadoop/hive/thrift/ZooKeeperTokenStore.java Sat Feb 25 04:21:11 2012
@@ -19,33 +19,36 @@
 package org.apache.hadoop.hive.thrift;
 
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
 import org.apache.commons.lang.StringUtils;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hive.thrift.TokenStoreDelegationTokenSecretManager.TokenStoreError;
 import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenSecretManager.DelegationTokenInformation;
 import org.apache.hadoop.security.token.delegation.HiveDelegationTokenSupport;
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.ZooDefs;
 import org.apache.zookeeper.ZooDefs.Ids;
 import org.apache.zookeeper.ZooKeeper;
 import org.apache.zookeeper.ZooKeeper.States;
+import org.apache.zookeeper.data.ACL;
+import org.apache.zookeeper.data.Id;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
  * ZooKeeper token store implementation.
  */
-public class ZooKeeperTokenStore implements TokenStoreDelegationTokenSecretManager.TokenStore {
+public class ZooKeeperTokenStore implements DelegationTokenStore {
 
   private static final Logger LOGGER =
       LoggerFactory.getLogger(ZooKeeperTokenStore.class.getName());
 
-  private static final String ZK_SEQ_FORMAT = "%010d";
+  protected static final String ZK_SEQ_FORMAT = "%010d";
   private static final String NODE_KEYS = "/keys";
   private static final String NODE_TOKENS = "/tokens";
 
@@ -53,6 +56,8 @@ public class ZooKeeperTokenStore impleme
   private volatile ZooKeeper zkSession;
   private String zkConnectString;
   private final int zkSessionTimeout = 3000;
+  private List<ACL> newNodeAcl = Ids.OPEN_ACL_UNSAFE;
+
 
   private class ZooKeeperWatcher implements Watcher {
     public void process(org.apache.zookeeper.WatchedEvent event) {
@@ -88,7 +93,7 @@ public class ZooKeeperTokenStore impleme
             zkSession = new ZooKeeper(this.zkConnectString, this.zkSessionTimeout,
                 new ZooKeeperWatcher());
             } catch (IOException ex) {
-              throw new TokenStoreError("Token store error.", ex);
+              throw new TokenStoreException("Token store error.", ex);
             }
           }
         }
@@ -96,14 +101,23 @@ public class ZooKeeperTokenStore impleme
     return zkSession;
   }
 
-  private static String ensurePath(ZooKeeper zk, String path) throws KeeperException,
+  /**
+   * Create a path if it does not already exist ("mkdir -p")
+   * @param zk ZooKeeper session
+   * @param path string with '/' separator
+   * @param acl list of ACL entries
+   * @return
+   * @throws KeeperException
+   * @throws InterruptedException
+   */
+  public static String ensurePath(ZooKeeper zk, String path, List<ACL> acl) throws KeeperException,
       InterruptedException {
     String[] pathComps = StringUtils.splitByWholeSeparator(path, "/");
     String currentPath = "";
     for (String pathComp : pathComps) {
       currentPath += "/" + pathComp;
       try {
-        String node = zk.create(currentPath, new byte[0], Ids.OPEN_ACL_UNSAFE,
+        String node = zk.create(currentPath, new byte[0], acl,
             CreateMode.PERSISTENT);
         LOGGER.info("Created path: " + node);
       } catch (KeeperException.NodeExistsException e) {
@@ -112,6 +126,67 @@ public class ZooKeeperTokenStore impleme
     return currentPath;
   }
 
+  /**
+   * Parse ACL permission string, from ZooKeeperMain private method
+   * @param permString
+   * @return
+   */
+  public static int getPermFromString(String permString) {
+      int perm = 0;
+      for (int i = 0; i < permString.length(); i++) {
+          switch (permString.charAt(i)) {
+          case 'r':
+              perm |= ZooDefs.Perms.READ;
+              break;
+          case 'w':
+              perm |= ZooDefs.Perms.WRITE;
+              break;
+          case 'c':
+              perm |= ZooDefs.Perms.CREATE;
+              break;
+          case 'd':
+              perm |= ZooDefs.Perms.DELETE;
+              break;
+          case 'a':
+              perm |= ZooDefs.Perms.ADMIN;
+              break;
+          default:
+              LOGGER.error("Unknown perm type: " + permString.charAt(i));
+          }
+      }
+      return perm;
+  }
+
+  /**
+   * Parse comma separated list of ACL entries to secure generated nodes, e.g.
+   * <code>sasl:hive/host1@MY.DOMAIN:cdrwa,sasl:hive/host2@MY.DOMAIN:cdrwa</code>
+   * @param aclString
+   * @return ACL list
+   */
+  public static List<ACL> parseACLs(String aclString) {
+    String[] aclComps = StringUtils.splitByWholeSeparator(aclString, ",");
+    List<ACL> acl = new ArrayList<ACL>(aclComps.length);
+    for (String a : aclComps) {
+      if (StringUtils.isBlank(a)) {
+         continue;
+      }
+      a = a.trim();
+      // from ZooKeeperMain private method
+      int firstColon = a.indexOf(':');
+      int lastColon = a.lastIndexOf(':');
+      if (firstColon == -1 || lastColon == -1 || firstColon == lastColon) {
+         LOGGER.error(a + " does not have the form scheme:id:perm");
+         continue;
+      }
+      ACL newAcl = new ACL();
+      newAcl.setId(new Id(a.substring(0, firstColon), a.substring(
+          firstColon + 1, lastColon)));
+      newAcl.setPerms(getPermFromString(a.substring(lastColon + 1)));
+      acl.add(newAcl);
+    }
+    return acl;
+  }
+
   private void init() {
     if (this.zkConnectString == null) {
       throw new IllegalStateException("Not initialized");
@@ -127,21 +202,26 @@ public class ZooKeeperTokenStore impleme
 
     ZooKeeper zk = getSession();
     try {
-        ensurePath(zk, rootNode + NODE_KEYS);
-        ensurePath(zk, rootNode + NODE_TOKENS);
+        ensurePath(zk, rootNode + NODE_KEYS, newNodeAcl);
+        ensurePath(zk, rootNode + NODE_TOKENS, newNodeAcl);
       } catch (Exception e) {
-        throw new TokenStoreError("Failed to validate token path.", e);
+        throw new TokenStoreException("Failed to validate token path.", e);
       }
   }
 
   @Override
   public void setConf(Configuration conf) {
-    if (conf != null) {
-      this.zkConnectString = conf.get(
-          HadoopThriftAuthBridge20S.Server.DELEGATION_TOKEN_STORE_ZK_CONNECT_STR, null);
-      this.rootNode = conf.get(
-          HadoopThriftAuthBridge20S.Server.DELEGATION_TOKEN_STORE_ZK_ROOT_NODE,
-          HadoopThriftAuthBridge20S.Server.DELEGATION_TOKEN_STORE_ZK_ROOT_NODE_DEFAULT);
+    if (conf == null) {
+       throw new IllegalArgumentException("conf is null");
+    }
+    this.zkConnectString = conf.get(
+      HadoopThriftAuthBridge20S.Server.DELEGATION_TOKEN_STORE_ZK_CONNECT_STR, null);
+    this.rootNode = conf.get(
+      HadoopThriftAuthBridge20S.Server.DELEGATION_TOKEN_STORE_ZK_ZNODE,
+      HadoopThriftAuthBridge20S.Server.DELEGATION_TOKEN_STORE_ZK_ZNODE_DEFAULT);
+    String csv = conf.get(HadoopThriftAuthBridge20S.Server.DELEGATION_TOKEN_STORE_ZK_ACL, null);
+    if (StringUtils.isNotBlank(csv)) {
+       this.newNodeAcl = parseACLs(csv);
     }
     init();
   }
@@ -176,14 +256,14 @@ public class ZooKeeperTokenStore impleme
   public int addMasterKey(String s) {
     try {
       ZooKeeper zk = getSession();
-      String newNode = zk.create(rootNode + NODE_KEYS + "/", s.getBytes(), Ids.OPEN_ACL_UNSAFE,
+      String newNode = zk.create(rootNode + NODE_KEYS + "/", s.getBytes(), newNodeAcl,
           CreateMode.PERSISTENT_SEQUENTIAL);
       LOGGER.info("Added key {}", newNode);
       return getSeq(newNode);
     } catch (KeeperException ex) {
-      throw new TokenStoreError(ex);
+      throw new TokenStoreException(ex);
     } catch (InterruptedException ex) {
-      throw new TokenStoreError(ex);
+      throw new TokenStoreException(ex);
     }
   }
 
@@ -194,9 +274,9 @@ public class ZooKeeperTokenStore impleme
       zk.setData(rootNode + NODE_KEYS + "/" + String.format(ZK_SEQ_FORMAT, keySeq), s.getBytes(),
           -1);
     } catch (KeeperException ex) {
-      throw new TokenStoreError(ex);
+      throw new TokenStoreException(ex);
     } catch (InterruptedException ex) {
-      throw new TokenStoreError(ex);
+      throw new TokenStoreException(ex);
     }
   }
 
@@ -209,9 +289,9 @@ public class ZooKeeperTokenStore impleme
     } catch (KeeperException.NoNodeException ex) {
       return false;
     } catch (KeeperException ex) {
-      throw new TokenStoreError(ex);
+      throw new TokenStoreException(ex);
     } catch (InterruptedException ex) {
-      throw new TokenStoreError(ex);
+      throw new TokenStoreException(ex);
     }
   }
 
@@ -226,9 +306,9 @@ public class ZooKeeperTokenStore impleme
       }
       return result;
     } catch (KeeperException ex) {
-      throw new TokenStoreError(ex);
+      throw new TokenStoreException(ex);
     } catch (InterruptedException ex) {
-      throw new TokenStoreError(ex);
+      throw new TokenStoreException(ex);
     }
   }
 
@@ -238,7 +318,7 @@ public class ZooKeeperTokenStore impleme
       return rootNode + NODE_TOKENS + "/"
           + TokenStoreDelegationTokenSecretManager.encodeWritable(tokenIdentifier);
     } catch (IOException ex) {
-      throw new TokenStoreError("Failed to encode token identifier", ex);
+      throw new TokenStoreException("Failed to encode token identifier", ex);
     }
   }
 
@@ -249,15 +329,15 @@ public class ZooKeeperTokenStore impleme
       ZooKeeper zk = getSession();
       byte[] tokenBytes = HiveDelegationTokenSupport.encodeDelegationTokenInformation(token);
       String newNode = zk.create(getTokenPath(tokenIdentifier),
-          tokenBytes, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+          tokenBytes, newNodeAcl, CreateMode.PERSISTENT);
       LOGGER.info("Added token: {}", newNode);
       return true;
     } catch (KeeperException.NodeExistsException ex) {
       return false;
     } catch (KeeperException ex) {
-      throw new TokenStoreError(ex);
+      throw new TokenStoreException(ex);
     } catch (InterruptedException ex) {
-      throw new TokenStoreError(ex);
+      throw new TokenStoreException(ex);
     }
   }
 
@@ -270,9 +350,9 @@ public class ZooKeeperTokenStore impleme
     } catch (KeeperException.NoNodeException ex) {
       return false;
     } catch (KeeperException ex) {
-      throw new TokenStoreError(ex);
+      throw new TokenStoreException(ex);
     } catch (InterruptedException ex) {
-      throw new TokenStoreError(ex);
+      throw new TokenStoreException(ex);
     }
   }
 
@@ -284,14 +364,14 @@ public class ZooKeeperTokenStore impleme
       try {
         return HiveDelegationTokenSupport.decodeDelegationTokenInformation(tokenBytes);
       } catch (Exception ex) {
-        throw new TokenStoreError("Failed to decode token", ex);
+        throw new TokenStoreException("Failed to decode token", ex);
       }
     } catch (KeeperException.NoNodeException ex) {
       return null;
     } catch (KeeperException ex) {
-      throw new TokenStoreError(ex);
+      throw new TokenStoreException(ex);
     } catch (InterruptedException ex) {
-      throw new TokenStoreError(ex);
+      throw new TokenStoreException(ex);
     }
   }
 
@@ -302,9 +382,9 @@ public class ZooKeeperTokenStore impleme
     try  {
       nodes = getSession().getChildren(containerNode, false);
     } catch (KeeperException ex) {
-      throw new TokenStoreError(ex);
+      throw new TokenStoreException(ex);
     } catch (InterruptedException ex) {
-      throw new TokenStoreError(ex);
+      throw new TokenStoreException(ex);
     }
     List<DelegationTokenIdentifier> result = new java.util.ArrayList<DelegationTokenIdentifier>(
         nodes.size());

Modified: hive/trunk/shims/src/test/org/apache/hadoop/hive/thrift/TestHadoop20SAuthBridge.java
URL: http://svn.apache.org/viewvc/hive/trunk/shims/src/test/org/apache/hadoop/hive/thrift/TestHadoop20SAuthBridge.java?rev=1293530&r1=1293529&r2=1293530&view=diff
==============================================================================
--- hive/trunk/shims/src/test/org/apache/hadoop/hive/thrift/TestHadoop20SAuthBridge.java (original)
+++ hive/trunk/shims/src/test/org/apache/hadoop/hive/thrift/TestHadoop20SAuthBridge.java Sat Feb 25 04:21:11 2012
@@ -39,7 +39,6 @@ import org.apache.hadoop.hive.metastore.
 import org.apache.hadoop.hive.metastore.MetaStoreUtils;
 import org.apache.hadoop.hive.metastore.api.Database;
 import org.apache.hadoop.hive.metastore.api.MetaException;
-import org.apache.hadoop.hive.thrift.TokenStoreDelegationTokenSecretManager.TokenStore;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.security.SaslRpcServer;
 import org.apache.hadoop.security.SaslRpcServer.AuthMethod;
@@ -82,11 +81,10 @@ public class TestHadoop20SAuthBridge ext
 
         return new TUGIAssumingTransportFactory(transFactory, realUgi);
       }
-      static TokenStore TOKEN_STORE = new MemoryTokenStore();
-      //static TokenStore TOKEN_STORE = new ZooKeeperTokenStore("localhost:2181");
+      static DelegationTokenStore TOKEN_STORE = new MemoryTokenStore();
 
       @Override
-      protected TokenStore getTokenStore(Configuration conf) throws IOException {
+      protected DelegationTokenStore getTokenStore(Configuration conf) throws IOException {
         return TOKEN_STORE;
       }
     }

Added: hive/trunk/shims/src/test/org/apache/hadoop/hive/thrift/TestZooKeeperTokenStore.java
URL: http://svn.apache.org/viewvc/hive/trunk/shims/src/test/org/apache/hadoop/hive/thrift/TestZooKeeperTokenStore.java?rev=1293530&view=auto
==============================================================================
--- hive/trunk/shims/src/test/org/apache/hadoop/hive/thrift/TestZooKeeperTokenStore.java (added)
+++ hive/trunk/shims/src/test/org/apache/hadoop/hive/thrift/TestZooKeeperTokenStore.java Sat Feb 25 04:21:11 2012
@@ -0,0 +1,170 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.thrift;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.List;
+
+import junit.framework.TestCase;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenSecretManager.DelegationTokenInformation;
+import org.apache.hadoop.security.token.delegation.HiveDelegationTokenSupport;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.data.ACL;
+import org.apache.zookeeper.data.Stat;
+import org.junit.Assert;
+
+public class TestZooKeeperTokenStore extends TestCase {
+
+  private MiniZooKeeperCluster zkCluster = null;
+  private ZooKeeper zkClient = null;
+
+  @Override
+  protected void setUp() throws Exception {
+    File zkDataDir = new File(System.getProperty("java.io.tmpdir"));
+    if (this.zkCluster != null) {
+      throw new IOException("Cluster already running");
+    }
+    this.zkCluster = new MiniZooKeeperCluster();
+    this.zkCluster.startup(zkDataDir);
+    this.zkClient = new ZooKeeper("localhost:"
+        + this.zkCluster.getClientPort(), 300, null);
+  }
+
+  @Override
+  protected void tearDown() throws Exception {
+    this.zkClient.close();
+    this.zkCluster.shutdown();
+    this.zkCluster = null;
+  }
+
+  private Configuration createConf(String zkPath) {
+    Configuration conf = new Configuration();
+    conf.set(
+        HadoopThriftAuthBridge20S.Server.DELEGATION_TOKEN_STORE_ZK_CONNECT_STR,
+        "localhost:" + this.zkCluster.getClientPort());
+    conf.set(
+        HadoopThriftAuthBridge20S.Server.DELEGATION_TOKEN_STORE_ZK_ZNODE,
+        zkPath);
+    return conf;
+  }
+
+  public void testTokenStorage() throws Exception {
+    String ZK_PATH = "/zktokenstore-testTokenStorage";
+    ZooKeeperTokenStore ts = new ZooKeeperTokenStore();
+    ts.setConf(createConf(ZK_PATH));
+
+    int keySeq = ts.addMasterKey("key1Data");
+    byte[] keyBytes = zkClient.getData(
+        ZK_PATH
+            + "/keys/"
+            + String.format(ZooKeeperTokenStore.ZK_SEQ_FORMAT,
+                keySeq), false, null);
+    assertNotNull(keyBytes);
+    assertEquals(new String(keyBytes), "key1Data");
+
+    int keySeq2 = ts.addMasterKey("key2Data");
+    assertEquals("keys sequential", keySeq + 1, keySeq2);
+    assertEquals("expected number keys", 2, ts.getMasterKeys().length);
+
+    ts.removeMasterKey(keySeq);
+    assertEquals("expected number keys", 1, ts.getMasterKeys().length);
+
+    // tokens
+    DelegationTokenIdentifier tokenId = new DelegationTokenIdentifier(
+        new Text("owner"), new Text("renewer"), new Text("realUser"));
+    DelegationTokenInformation tokenInfo = new DelegationTokenInformation(
+        99, "password".getBytes());
+    ts.addToken(tokenId, tokenInfo);
+    DelegationTokenInformation tokenInfoRead = ts.getToken(tokenId);
+    assertEquals(tokenInfo.getRenewDate(), tokenInfoRead.getRenewDate());
+    assertNotSame(tokenInfo, tokenInfoRead);
+    Assert.assertArrayEquals(HiveDelegationTokenSupport
+        .encodeDelegationTokenInformation(tokenInfo),
+        HiveDelegationTokenSupport
+            .encodeDelegationTokenInformation(tokenInfoRead));
+
+    List<DelegationTokenIdentifier> allIds = ts
+        .getAllDelegationTokenIdentifiers();
+    assertEquals(1, allIds.size());
+    Assert.assertEquals(TokenStoreDelegationTokenSecretManager
+        .encodeWritable(tokenId),
+        TokenStoreDelegationTokenSecretManager.encodeWritable(allIds
+            .get(0)));
+
+    assertTrue(ts.removeToken(tokenId));
+    assertEquals(0, ts.getAllDelegationTokenIdentifiers().size());
+  }
+
+  public void testAclNoAuth() throws Exception {
+    String ZK_PATH = "/zktokenstore-testAclNoAuth";
+    Configuration conf = createConf(ZK_PATH);
+    conf.set(
+        HadoopThriftAuthBridge20S.Server.DELEGATION_TOKEN_STORE_ZK_ACL,
+        "ip:127.0.0.1:r");
+
+    ZooKeeperTokenStore ts = new ZooKeeperTokenStore();
+    try {
+      ts.setConf(conf);
+      fail("expected ACL exception");
+    } catch (DelegationTokenStore.TokenStoreException e) {
+      assertEquals(e.getCause().getClass(),
+          KeeperException.NoAuthException.class);
+    }
+  }
+
+  public void testAclInvalid() throws Exception {
+    String ZK_PATH = "/zktokenstore-testAclInvalid";
+    String aclString = "sasl:hive/host@TEST.DOMAIN:cdrwa, fail-parse-ignored";
+    Configuration conf = createConf(ZK_PATH);
+    conf.set(
+        HadoopThriftAuthBridge20S.Server.DELEGATION_TOKEN_STORE_ZK_ACL,
+        aclString);
+
+    List<ACL> aclList = ZooKeeperTokenStore.parseACLs(aclString);
+    assertEquals(1, aclList.size());
+
+    ZooKeeperTokenStore ts = new ZooKeeperTokenStore();
+    try {
+      ts.setConf(conf);
+      fail("expected ACL exception");
+    } catch (DelegationTokenStore.TokenStoreException e) {
+      assertEquals(e.getCause().getClass(),
+          KeeperException.InvalidACLException.class);
+    }
+  }
+
+  public void testAclPositive() throws Exception {
+    String ZK_PATH = "/zktokenstore-testAcl";
+    Configuration conf = createConf(ZK_PATH);
+    conf.set(
+        HadoopThriftAuthBridge20S.Server.DELEGATION_TOKEN_STORE_ZK_ACL,
+        "world:anyone:cdrwa,ip:127.0.0.1:cdrwa");
+    ZooKeeperTokenStore ts = new ZooKeeperTokenStore();
+    ts.setConf(conf);
+    List<ACL> acl = zkClient.getACL(ZK_PATH, new Stat());
+    assertEquals(2, acl.size());
+  }
+
+}