You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ha...@apache.org on 2012/02/25 05:21:13 UTC
svn commit: r1293530 - in /hive/trunk:
common/src/java/org/apache/hadoop/hive/conf/ conf/
shims/src/common-secure/java/org/apache/hadoop/hive/thrift/
shims/src/test/org/apache/hadoop/hive/thrift/
Author: hashutosh
Date: Sat Feb 25 04:21:11 2012
New Revision: 1293530
URL: http://svn.apache.org/viewvc?rev=1293530&view=rev
Log:
HIVE-2712: Make ZooKeeper token store ACL configurable (thw via hashutosh)
Added:
hive/trunk/shims/src/common-secure/java/org/apache/hadoop/hive/thrift/DelegationTokenStore.java
hive/trunk/shims/src/test/org/apache/hadoop/hive/thrift/TestZooKeeperTokenStore.java
Modified:
hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
hive/trunk/conf/hive-default.xml.template
hive/trunk/shims/src/common-secure/java/org/apache/hadoop/hive/thrift/HadoopThriftAuthBridge20S.java
hive/trunk/shims/src/common-secure/java/org/apache/hadoop/hive/thrift/MemoryTokenStore.java
hive/trunk/shims/src/common-secure/java/org/apache/hadoop/hive/thrift/TokenStoreDelegationTokenSecretManager.java
hive/trunk/shims/src/common-secure/java/org/apache/hadoop/hive/thrift/ZooKeeperTokenStore.java
hive/trunk/shims/src/test/org/apache/hadoop/hive/thrift/TestHadoop20SAuthBridge.java
Modified: hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
URL: http://svn.apache.org/viewvc/hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java?rev=1293530&r1=1293529&r2=1293530&view=diff
==============================================================================
--- hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java (original)
+++ hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java Sat Feb 25 04:21:11 2012
@@ -263,6 +263,15 @@ public class HiveConf extends Configurat
METASTORE_KERBEROS_PRINCIPAL("hive.metastore.kerberos.principal",
"hive-metastore/_HOST@EXAMPLE.COM"),
METASTORE_USE_THRIFT_SASL("hive.metastore.sasl.enabled", false),
+ METASTORE_CLUSTER_DELEGATION_TOKEN_STORE_CLS(
+ "hive.cluster.delegation.token.store.class",
+ "org.apache.hadoop.hive.thrift.MemoryTokenStore"),
+ METASTORE_CLUSTER_DELEGATION_TOKEN_STORE_ZK_CONNECTSTR(
+ "hive.cluster.delegation.token.store.zookeeper.connectString", ""),
+ METASTORE_CLUSTER_DELEGATION_TOKEN_STORE_ZK_ZNODE(
+ "hive.cluster.delegation.token.store.zookeeper.znode", "/hive/cluster/delegation"),
+ METASTORE_CLUSTER_DELEGATION_TOKEN_STORE_ZK_ACL(
+ "hive.cluster.delegation.token.store.zookeeper.acl", ""),
METASTORE_CACHE_PINOBJTYPES("hive.metastore.cache.pinobjtypes", "Table,StorageDescriptor,SerDeInfo,Partition,Database,Type,FieldSchema,Order"),
METASTORE_CONNECTION_POOLING_TYPE("datanucleus.connectionPoolingType", "DBCP"),
METASTORE_VALIDATE_TABLES("datanucleus.validateTables", false),
Modified: hive/trunk/conf/hive-default.xml.template
URL: http://svn.apache.org/viewvc/hive/trunk/conf/hive-default.xml.template?rev=1293530&r1=1293529&r2=1293530&view=diff
==============================================================================
--- hive/trunk/conf/hive-default.xml.template (original)
+++ hive/trunk/conf/hive-default.xml.template Sat Feb 25 04:21:11 2012
@@ -754,6 +754,30 @@
</property>
<property>
+ <name>hive.cluster.delegation.token.store.class</name>
+ <value>org.apache.hadoop.hive.thrift.MemoryTokenStore</value>
+ <description>The delegation token store implementation. Set to org.apache.hadoop.hive.thrift.ZooKeeperTokenStore for load-balanced cluster.</description>
+</property>
+
+<property>
+ <name>hive.cluster.delegation.token.store.zookeeper.connectString</name>
+ <value>localhost:2181</value>
+ <description>The ZooKeeper token store connect string.</description>
+</property>
+
+<property>
+ <name>hive.cluster.delegation.token.store.zookeeper.znode</name>
+ <value>/hive/cluster/delegation</value>
+ <description>The root path for token store data.</description>
+</property>
+
+<property>
+ <name>hive.cluster.delegation.token.store.zookeeper.acl</name>
+ <value>sasl:hive/host1@EXAMPLE.COM:cdrwa,sasl:hive/host2@EXAMPLE.COM:cdrwa</value>
+ <description>ACL for token store entries. List comma separated all server principals for the cluster.</description>
+</property>
+
+<property>
<name>hive.metastore.cache.pinobjtypes</name>
<value>Table,StorageDescriptor,SerDeInfo,Partition,Database,Type,FieldSchema,Order</value>
<description>List of comma separated metastore object types that should be pinned in the cache</description>
Added: hive/trunk/shims/src/common-secure/java/org/apache/hadoop/hive/thrift/DelegationTokenStore.java
URL: http://svn.apache.org/viewvc/hive/trunk/shims/src/common-secure/java/org/apache/hadoop/hive/thrift/DelegationTokenStore.java?rev=1293530&view=auto
==============================================================================
--- hive/trunk/shims/src/common-secure/java/org/apache/hadoop/hive/thrift/DelegationTokenStore.java (added)
+++ hive/trunk/shims/src/common-secure/java/org/apache/hadoop/hive/thrift/DelegationTokenStore.java Sat Feb 25 04:21:11 2012
@@ -0,0 +1,110 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.thrift;
+
+import java.util.List;
+
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenSecretManager.DelegationTokenInformation;
+
+/**
+ * Interface for pluggable token store that can be implemented with shared external
+ * storage for load balancing and high availability (for example using ZooKeeper).
+ * Internal, store specific errors are translated into {@link TokenStoreException}.
+ */
+public interface DelegationTokenStore extends Configurable {
+
+ /**
+ * Exception for internal token store errors that typically cannot be handled by the caller.
+ */
+ public static class TokenStoreException extends RuntimeException {
+ private static final long serialVersionUID = -8693819817623074083L;
+
+ public TokenStoreException(Throwable cause) {
+ super(cause);
+ }
+
+ public TokenStoreException(String message, Throwable cause) {
+ super(message, cause);
+ }
+ }
+
+ /**
+ * Add new master key. The token store assigns and returns the sequence number.
+ * Caller needs to use the identifier to update the key (since it is embedded in the key).
+ *
+ * @param s
+ * @return sequence number for new key
+ */
+ int addMasterKey(String s) throws TokenStoreException;
+
+ /**
+ * Update master key (for expiration and setting store assigned sequence within key)
+ * @param keySeq
+ * @param s
+ * @throws TokenStoreException
+ */
+ void updateMasterKey(int keySeq, String s) throws TokenStoreException;
+
+ /**
+ * Remove key for given id.
+ * @param keySeq
+ * @return false if key no longer present, true otherwise.
+ */
+ boolean removeMasterKey(int keySeq);
+
+ /**
+ * Return all master keys.
+ * @return
+ * @throws TokenStoreException
+ */
+ String[] getMasterKeys() throws TokenStoreException;
+
+ /**
+ * Add token. If identifier is already present, token won't be added.
+ * @param tokenIdentifier
+ * @param token
+ * @return true if token was added, false for existing identifier
+ */
+ boolean addToken(DelegationTokenIdentifier tokenIdentifier,
+ DelegationTokenInformation token) throws TokenStoreException;
+
+ /**
+ * Get token. Returns null if the token does not exist.
+ * @param tokenIdentifier
+ * @return
+ */
+ DelegationTokenInformation getToken(DelegationTokenIdentifier tokenIdentifier)
+ throws TokenStoreException;
+
+ /**
+ * Remove token. Return value can be used by caller to detect concurrency.
+ * @param tokenIdentifier
+ * @return true if token was removed, false if it was already removed.
+ * @throws TokenStoreException
+ */
+ boolean removeToken(DelegationTokenIdentifier tokenIdentifier) throws TokenStoreException;
+
+ /**
+ * List of all token identifiers in the store. This is used to remove expired tokens
+ * and a potential scalability improvement would be to partition by master key id
+ * @return
+ */
+ List<DelegationTokenIdentifier> getAllDelegationTokenIdentifiers();
+
+}
Modified: hive/trunk/shims/src/common-secure/java/org/apache/hadoop/hive/thrift/HadoopThriftAuthBridge20S.java
URL: http://svn.apache.org/viewvc/hive/trunk/shims/src/common-secure/java/org/apache/hadoop/hive/thrift/HadoopThriftAuthBridge20S.java?rev=1293530&r1=1293529&r2=1293530&view=diff
==============================================================================
--- hive/trunk/shims/src/common-secure/java/org/apache/hadoop/hive/thrift/HadoopThriftAuthBridge20S.java (original)
+++ hive/trunk/shims/src/common-secure/java/org/apache/hadoop/hive/thrift/HadoopThriftAuthBridge20S.java Sat Feb 25 04:21:11 2012
@@ -213,9 +213,11 @@ import org.apache.thrift.transport.TTran
"hive.cluster.delegation.token.store.class";
public static final String DELEGATION_TOKEN_STORE_ZK_CONNECT_STR =
"hive.cluster.delegation.token.store.zookeeper.connectString";
- public static final String DELEGATION_TOKEN_STORE_ZK_ROOT_NODE =
- "hive.cluster.delegation.token.store.zookeeper.rootNode";
- public static final String DELEGATION_TOKEN_STORE_ZK_ROOT_NODE_DEFAULT =
+ public static final String DELEGATION_TOKEN_STORE_ZK_ZNODE =
+ "hive.cluster.delegation.token.store.zookeeper.znode";
+ public static final String DELEGATION_TOKEN_STORE_ZK_ACL =
+ "hive.cluster.delegation.token.store.zookeeper.acl";
+ public static final String DELEGATION_TOKEN_STORE_ZK_ZNODE_DEFAULT =
"/hive/cluster/delegation";
public Server() throws TTransportException {
@@ -291,16 +293,16 @@ import org.apache.thrift.transport.TTran
return new TUGIAssumingProcessor(processor, secretManager);
}
- protected TokenStoreDelegationTokenSecretManager.TokenStore getTokenStore(Configuration conf)
+ protected DelegationTokenStore getTokenStore(Configuration conf)
throws IOException {
String tokenStoreClassName = conf.get(DELEGATION_TOKEN_STORE_CLS, "");
if (StringUtils.isBlank(tokenStoreClassName)) {
return new MemoryTokenStore();
}
try {
- Class<? extends TokenStoreDelegationTokenSecretManager.TokenStore> storeClass = Class
+ Class<? extends DelegationTokenStore> storeClass = Class
.forName(tokenStoreClassName).asSubclass(
- TokenStoreDelegationTokenSecretManager.TokenStore.class);
+ DelegationTokenStore.class);
return ReflectionUtils.newInstance(storeClass, conf);
} catch (ClassNotFoundException e) {
throw new IOException("Error initializing delegation token store: " + tokenStoreClassName,
Modified: hive/trunk/shims/src/common-secure/java/org/apache/hadoop/hive/thrift/MemoryTokenStore.java
URL: http://svn.apache.org/viewvc/hive/trunk/shims/src/common-secure/java/org/apache/hadoop/hive/thrift/MemoryTokenStore.java?rev=1293530&r1=1293529&r2=1293530&view=diff
==============================================================================
--- hive/trunk/shims/src/common-secure/java/org/apache/hadoop/hive/thrift/MemoryTokenStore.java (original)
+++ hive/trunk/shims/src/common-secure/java/org/apache/hadoop/hive/thrift/MemoryTokenStore.java Sat Feb 25 04:21:11 2012
@@ -18,7 +18,10 @@
package org.apache.hadoop.hive.thrift;
+import java.util.ArrayList;
import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hadoop.conf.Configuration;
@@ -27,29 +30,31 @@ import org.apache.hadoop.security.token.
/**
* Default in-memory token store implementation.
*/
-public class MemoryTokenStore implements TokenStoreDelegationTokenSecretManager.TokenStore {
+public class MemoryTokenStore implements DelegationTokenStore {
- private final java.util.concurrent.ConcurrentHashMap<Integer, String> masterKeys
- = new java.util.concurrent.ConcurrentHashMap<Integer, String>();
+ private final Map<Integer, String> masterKeys
+ = new ConcurrentHashMap<Integer, String>();
- private final java.util.concurrent.ConcurrentHashMap<DelegationTokenIdentifier, DelegationTokenInformation> tokens
- = new java.util.concurrent.ConcurrentHashMap<DelegationTokenIdentifier, DelegationTokenInformation>();
+ private final ConcurrentHashMap<DelegationTokenIdentifier, DelegationTokenInformation> tokens
+ = new ConcurrentHashMap<DelegationTokenIdentifier, DelegationTokenInformation>();
private final AtomicInteger masterKeySeq = new AtomicInteger();
+ private Configuration conf;
@Override
public void setConf(Configuration conf) {
+ this.conf = conf;
}
@Override
public Configuration getConf() {
- return null;
+ return this.conf;
}
@Override
public int addMasterKey(String s) {
int keySeq = masterKeySeq.getAndIncrement();
- masterKeys.putIfAbsent(keySeq, s);
+ masterKeys.put(keySeq, s);
return keySeq;
}
@@ -88,7 +93,7 @@ public class MemoryTokenStore implements
@Override
public List<DelegationTokenIdentifier> getAllDelegationTokenIdentifiers() {
- List<DelegationTokenIdentifier> result = new java.util.ArrayList<DelegationTokenIdentifier>(
+ List<DelegationTokenIdentifier> result = new ArrayList<DelegationTokenIdentifier>(
tokens.size());
for (DelegationTokenIdentifier id : tokens.keySet()) {
result.add(id);
Modified: hive/trunk/shims/src/common-secure/java/org/apache/hadoop/hive/thrift/TokenStoreDelegationTokenSecretManager.java
URL: http://svn.apache.org/viewvc/hive/trunk/shims/src/common-secure/java/org/apache/hadoop/hive/thrift/TokenStoreDelegationTokenSecretManager.java?rev=1293530&r1=1293529&r2=1293530&view=diff
==============================================================================
--- hive/trunk/shims/src/common-secure/java/org/apache/hadoop/hive/thrift/TokenStoreDelegationTokenSecretManager.java (original)
+++ hive/trunk/shims/src/common-secure/java/org/apache/hadoop/hive/thrift/TokenStoreDelegationTokenSecretManager.java Sat Feb 25 04:21:11 2012
@@ -25,12 +25,12 @@ import java.io.DataOutputStream;
import java.io.IOException;
import java.lang.reflect.Method;
import java.util.Arrays;
+import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import org.apache.commons.codec.binary.Base64;
-import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenSecretManager;
@@ -58,88 +58,16 @@ public class TokenStoreDelegationTokenSe
private static final Logger LOGGER =
LoggerFactory.getLogger(TokenStoreDelegationTokenSecretManager.class.getName());
- /**
- * Exception for internal token store errors that typically cannot be handled by the caller.
- */
- public static class TokenStoreError extends RuntimeException {
- private static final long serialVersionUID = -8693819817623074083L;
-
- public TokenStoreError(Throwable cause) {
- super(cause);
- }
-
- public TokenStoreError(String message, Throwable cause) {
- super(message, cause);
- }
- }
-
- /**
- * Interface for pluggable token store that can be implemented as shared store with external
- * storage (for example with ZooKeeper for HA).
- * Internal, store specific errors are translated into {@link TokenStoreError}.
- */
- public static interface TokenStore extends Configurable {
- /**
- * Add new master key. The token store assigns and returns the sequence number.
- * Caller needs to use the identifier to update the key (since it is embedded in the key).
- *
- * @param s
- * @return sequence number for new key
- */
- int addMasterKey(String s) throws TokenStoreError;
-
- void updateMasterKey(int keySeq, String s) throws TokenStoreError;
-
- /**
- * Remove key for given id.
- * @param keySeq
- * @return false if key no longer present, true otherwise.
- */
- boolean removeMasterKey(int keySeq);
-
- String[] getMasterKeys() throws TokenStoreError;
-
- /**
- * Add token. If identifier is already present, token won't be added.
- * @param tokenIdentifier
- * @param token
- * @return true if token was added, false for existing identifier
- */
- boolean addToken(DelegationTokenIdentifier tokenIdentifier,
- DelegationTokenInformation token) throws TokenStoreError;
-
- /**
- * Get token. Returns null if the token does not exist.
- * @param tokenIdentifier
- * @return
- */
- DelegationTokenInformation getToken(DelegationTokenIdentifier tokenIdentifier)
- throws TokenStoreError;
-
- /**
- * Remove token. Ignores token does not exist.
- * @param tokenIdentifier
- */
- boolean removeToken(DelegationTokenIdentifier tokenIdentifier) throws TokenStoreError;
-
- /**
- * List of all token identifiers in the store. This is used to remove expired tokens
- * and a potential scalability improvement would be to partition by master key id
- * @return
- */
- List<DelegationTokenIdentifier> getAllDelegationTokenIdentifiers();
-
- }
-
final private long keyUpdateInterval;
final private long tokenRemoverScanInterval;
private Thread tokenRemoverThread;
- final private TokenStore tokenStore;
+ final private DelegationTokenStore tokenStore;
public TokenStoreDelegationTokenSecretManager(long delegationKeyUpdateInterval,
long delegationTokenMaxLifetime, long delegationTokenRenewInterval,
- long delegationTokenRemoverScanInterval, TokenStore sharedStore) {
+ long delegationTokenRemoverScanInterval,
+ DelegationTokenStore sharedStore) {
super(delegationKeyUpdateInterval, delegationTokenMaxLifetime, delegationTokenRenewInterval,
delegationTokenRemoverScanInterval);
this.keyUpdateInterval = delegationKeyUpdateInterval;
@@ -162,7 +90,7 @@ public class TokenStoreDelegationTokenSe
// read keys from token store
String[] allKeys = tokenStore.getMasterKeys();
Map<Integer, DelegationKey> keys
- = new java.util.HashMap<Integer, DelegationKey>(allKeys.length);
+ = new HashMap<Integer, DelegationKey>(allKeys.length);
for (String keyStr : allKeys) {
DelegationKey key = new DelegationKey();
try {
@@ -180,8 +108,7 @@ public class TokenStoreDelegationTokenSe
}
@Override
- public byte[] retrievePassword(DelegationTokenIdentifier identifier)
- throws org.apache.hadoop.security.token.SecretManager.InvalidToken {
+ public byte[] retrievePassword(DelegationTokenIdentifier identifier) throws InvalidToken {
DelegationTokenInformation info = this.tokenStore.getToken(identifier);
if (info == null) {
throw new InvalidToken("token expired or does not exist: " + identifier);
Modified: hive/trunk/shims/src/common-secure/java/org/apache/hadoop/hive/thrift/ZooKeeperTokenStore.java
URL: http://svn.apache.org/viewvc/hive/trunk/shims/src/common-secure/java/org/apache/hadoop/hive/thrift/ZooKeeperTokenStore.java?rev=1293530&r1=1293529&r2=1293530&view=diff
==============================================================================
--- hive/trunk/shims/src/common-secure/java/org/apache/hadoop/hive/thrift/ZooKeeperTokenStore.java (original)
+++ hive/trunk/shims/src/common-secure/java/org/apache/hadoop/hive/thrift/ZooKeeperTokenStore.java Sat Feb 25 04:21:11 2012
@@ -19,33 +19,36 @@
package org.apache.hadoop.hive.thrift;
import java.io.IOException;
+import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hive.thrift.TokenStoreDelegationTokenSecretManager.TokenStoreError;
import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenSecretManager.DelegationTokenInformation;
import org.apache.hadoop.security.token.delegation.HiveDelegationTokenSupport;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.ZooKeeper.States;
+import org.apache.zookeeper.data.ACL;
+import org.apache.zookeeper.data.Id;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* ZooKeeper token store implementation.
*/
-public class ZooKeeperTokenStore implements TokenStoreDelegationTokenSecretManager.TokenStore {
+public class ZooKeeperTokenStore implements DelegationTokenStore {
private static final Logger LOGGER =
LoggerFactory.getLogger(ZooKeeperTokenStore.class.getName());
- private static final String ZK_SEQ_FORMAT = "%010d";
+ protected static final String ZK_SEQ_FORMAT = "%010d";
private static final String NODE_KEYS = "/keys";
private static final String NODE_TOKENS = "/tokens";
@@ -53,6 +56,8 @@ public class ZooKeeperTokenStore impleme
private volatile ZooKeeper zkSession;
private String zkConnectString;
private final int zkSessionTimeout = 3000;
+ private List<ACL> newNodeAcl = Ids.OPEN_ACL_UNSAFE;
+
private class ZooKeeperWatcher implements Watcher {
public void process(org.apache.zookeeper.WatchedEvent event) {
@@ -88,7 +93,7 @@ public class ZooKeeperTokenStore impleme
zkSession = new ZooKeeper(this.zkConnectString, this.zkSessionTimeout,
new ZooKeeperWatcher());
} catch (IOException ex) {
- throw new TokenStoreError("Token store error.", ex);
+ throw new TokenStoreException("Token store error.", ex);
}
}
}
@@ -96,14 +101,23 @@ public class ZooKeeperTokenStore impleme
return zkSession;
}
- private static String ensurePath(ZooKeeper zk, String path) throws KeeperException,
+ /**
+ * Create a path if it does not already exist ("mkdir -p")
+ * @param zk ZooKeeper session
+ * @param path string with '/' separator
+ * @param acl list of ACL entries
+ * @return
+ * @throws KeeperException
+ * @throws InterruptedException
+ */
+ public static String ensurePath(ZooKeeper zk, String path, List<ACL> acl) throws KeeperException,
InterruptedException {
String[] pathComps = StringUtils.splitByWholeSeparator(path, "/");
String currentPath = "";
for (String pathComp : pathComps) {
currentPath += "/" + pathComp;
try {
- String node = zk.create(currentPath, new byte[0], Ids.OPEN_ACL_UNSAFE,
+ String node = zk.create(currentPath, new byte[0], acl,
CreateMode.PERSISTENT);
LOGGER.info("Created path: " + node);
} catch (KeeperException.NodeExistsException e) {
@@ -112,6 +126,67 @@ public class ZooKeeperTokenStore impleme
return currentPath;
}
+ /**
+ * Parse ACL permission string, from ZooKeeperMain private method
+ * @param permString
+ * @return
+ */
+ public static int getPermFromString(String permString) {
+ int perm = 0;
+ for (int i = 0; i < permString.length(); i++) {
+ switch (permString.charAt(i)) {
+ case 'r':
+ perm |= ZooDefs.Perms.READ;
+ break;
+ case 'w':
+ perm |= ZooDefs.Perms.WRITE;
+ break;
+ case 'c':
+ perm |= ZooDefs.Perms.CREATE;
+ break;
+ case 'd':
+ perm |= ZooDefs.Perms.DELETE;
+ break;
+ case 'a':
+ perm |= ZooDefs.Perms.ADMIN;
+ break;
+ default:
+ LOGGER.error("Unknown perm type: " + permString.charAt(i));
+ }
+ }
+ return perm;
+ }
+
+ /**
+ * Parse comma separated list of ACL entries to secure generated nodes, e.g.
+ * <code>sasl:hive/host1@MY.DOMAIN:cdrwa,sasl:hive/host2@MY.DOMAIN:cdrwa</code>
+ * @param aclString
+ * @return ACL list
+ */
+ public static List<ACL> parseACLs(String aclString) {
+ String[] aclComps = StringUtils.splitByWholeSeparator(aclString, ",");
+ List<ACL> acl = new ArrayList<ACL>(aclComps.length);
+ for (String a : aclComps) {
+ if (StringUtils.isBlank(a)) {
+ continue;
+ }
+ a = a.trim();
+ // from ZooKeeperMain private method
+ int firstColon = a.indexOf(':');
+ int lastColon = a.lastIndexOf(':');
+ if (firstColon == -1 || lastColon == -1 || firstColon == lastColon) {
+ LOGGER.error(a + " does not have the form scheme:id:perm");
+ continue;
+ }
+ ACL newAcl = new ACL();
+ newAcl.setId(new Id(a.substring(0, firstColon), a.substring(
+ firstColon + 1, lastColon)));
+ newAcl.setPerms(getPermFromString(a.substring(lastColon + 1)));
+ acl.add(newAcl);
+ }
+ return acl;
+ }
+
private void init() {
if (this.zkConnectString == null) {
throw new IllegalStateException("Not initialized");
@@ -127,21 +202,26 @@ public class ZooKeeperTokenStore impleme
ZooKeeper zk = getSession();
try {
- ensurePath(zk, rootNode + NODE_KEYS);
- ensurePath(zk, rootNode + NODE_TOKENS);
+ ensurePath(zk, rootNode + NODE_KEYS, newNodeAcl);
+ ensurePath(zk, rootNode + NODE_TOKENS, newNodeAcl);
} catch (Exception e) {
- throw new TokenStoreError("Failed to validate token path.", e);
+ throw new TokenStoreException("Failed to validate token path.", e);
}
}
@Override
public void setConf(Configuration conf) {
- if (conf != null) {
- this.zkConnectString = conf.get(
- HadoopThriftAuthBridge20S.Server.DELEGATION_TOKEN_STORE_ZK_CONNECT_STR, null);
- this.rootNode = conf.get(
- HadoopThriftAuthBridge20S.Server.DELEGATION_TOKEN_STORE_ZK_ROOT_NODE,
- HadoopThriftAuthBridge20S.Server.DELEGATION_TOKEN_STORE_ZK_ROOT_NODE_DEFAULT);
+ if (conf == null) {
+ throw new IllegalArgumentException("conf is null");
+ }
+ this.zkConnectString = conf.get(
+ HadoopThriftAuthBridge20S.Server.DELEGATION_TOKEN_STORE_ZK_CONNECT_STR, null);
+ this.rootNode = conf.get(
+ HadoopThriftAuthBridge20S.Server.DELEGATION_TOKEN_STORE_ZK_ZNODE,
+ HadoopThriftAuthBridge20S.Server.DELEGATION_TOKEN_STORE_ZK_ZNODE_DEFAULT);
+ String csv = conf.get(HadoopThriftAuthBridge20S.Server.DELEGATION_TOKEN_STORE_ZK_ACL, null);
+ if (StringUtils.isNotBlank(csv)) {
+ this.newNodeAcl = parseACLs(csv);
}
init();
}
@@ -176,14 +256,14 @@ public class ZooKeeperTokenStore impleme
public int addMasterKey(String s) {
try {
ZooKeeper zk = getSession();
- String newNode = zk.create(rootNode + NODE_KEYS + "/", s.getBytes(), Ids.OPEN_ACL_UNSAFE,
+ String newNode = zk.create(rootNode + NODE_KEYS + "/", s.getBytes(), newNodeAcl,
CreateMode.PERSISTENT_SEQUENTIAL);
LOGGER.info("Added key {}", newNode);
return getSeq(newNode);
} catch (KeeperException ex) {
- throw new TokenStoreError(ex);
+ throw new TokenStoreException(ex);
} catch (InterruptedException ex) {
- throw new TokenStoreError(ex);
+ throw new TokenStoreException(ex);
}
}
@@ -194,9 +274,9 @@ public class ZooKeeperTokenStore impleme
zk.setData(rootNode + NODE_KEYS + "/" + String.format(ZK_SEQ_FORMAT, keySeq), s.getBytes(),
-1);
} catch (KeeperException ex) {
- throw new TokenStoreError(ex);
+ throw new TokenStoreException(ex);
} catch (InterruptedException ex) {
- throw new TokenStoreError(ex);
+ throw new TokenStoreException(ex);
}
}
@@ -209,9 +289,9 @@ public class ZooKeeperTokenStore impleme
} catch (KeeperException.NoNodeException ex) {
return false;
} catch (KeeperException ex) {
- throw new TokenStoreError(ex);
+ throw new TokenStoreException(ex);
} catch (InterruptedException ex) {
- throw new TokenStoreError(ex);
+ throw new TokenStoreException(ex);
}
}
@@ -226,9 +306,9 @@ public class ZooKeeperTokenStore impleme
}
return result;
} catch (KeeperException ex) {
- throw new TokenStoreError(ex);
+ throw new TokenStoreException(ex);
} catch (InterruptedException ex) {
- throw new TokenStoreError(ex);
+ throw new TokenStoreException(ex);
}
}
@@ -238,7 +318,7 @@ public class ZooKeeperTokenStore impleme
return rootNode + NODE_TOKENS + "/"
+ TokenStoreDelegationTokenSecretManager.encodeWritable(tokenIdentifier);
} catch (IOException ex) {
- throw new TokenStoreError("Failed to encode token identifier", ex);
+ throw new TokenStoreException("Failed to encode token identifier", ex);
}
}
@@ -249,15 +329,15 @@ public class ZooKeeperTokenStore impleme
ZooKeeper zk = getSession();
byte[] tokenBytes = HiveDelegationTokenSupport.encodeDelegationTokenInformation(token);
String newNode = zk.create(getTokenPath(tokenIdentifier),
- tokenBytes, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+ tokenBytes, newNodeAcl, CreateMode.PERSISTENT);
LOGGER.info("Added token: {}", newNode);
return true;
} catch (KeeperException.NodeExistsException ex) {
return false;
} catch (KeeperException ex) {
- throw new TokenStoreError(ex);
+ throw new TokenStoreException(ex);
} catch (InterruptedException ex) {
- throw new TokenStoreError(ex);
+ throw new TokenStoreException(ex);
}
}
@@ -270,9 +350,9 @@ public class ZooKeeperTokenStore impleme
} catch (KeeperException.NoNodeException ex) {
return false;
} catch (KeeperException ex) {
- throw new TokenStoreError(ex);
+ throw new TokenStoreException(ex);
} catch (InterruptedException ex) {
- throw new TokenStoreError(ex);
+ throw new TokenStoreException(ex);
}
}
@@ -284,14 +364,14 @@ public class ZooKeeperTokenStore impleme
try {
return HiveDelegationTokenSupport.decodeDelegationTokenInformation(tokenBytes);
} catch (Exception ex) {
- throw new TokenStoreError("Failed to decode token", ex);
+ throw new TokenStoreException("Failed to decode token", ex);
}
} catch (KeeperException.NoNodeException ex) {
return null;
} catch (KeeperException ex) {
- throw new TokenStoreError(ex);
+ throw new TokenStoreException(ex);
} catch (InterruptedException ex) {
- throw new TokenStoreError(ex);
+ throw new TokenStoreException(ex);
}
}
@@ -302,9 +382,9 @@ public class ZooKeeperTokenStore impleme
try {
nodes = getSession().getChildren(containerNode, false);
} catch (KeeperException ex) {
- throw new TokenStoreError(ex);
+ throw new TokenStoreException(ex);
} catch (InterruptedException ex) {
- throw new TokenStoreError(ex);
+ throw new TokenStoreException(ex);
}
List<DelegationTokenIdentifier> result = new java.util.ArrayList<DelegationTokenIdentifier>(
nodes.size());
Modified: hive/trunk/shims/src/test/org/apache/hadoop/hive/thrift/TestHadoop20SAuthBridge.java
URL: http://svn.apache.org/viewvc/hive/trunk/shims/src/test/org/apache/hadoop/hive/thrift/TestHadoop20SAuthBridge.java?rev=1293530&r1=1293529&r2=1293530&view=diff
==============================================================================
--- hive/trunk/shims/src/test/org/apache/hadoop/hive/thrift/TestHadoop20SAuthBridge.java (original)
+++ hive/trunk/shims/src/test/org/apache/hadoop/hive/thrift/TestHadoop20SAuthBridge.java Sat Feb 25 04:21:11 2012
@@ -39,7 +39,6 @@ import org.apache.hadoop.hive.metastore.
import org.apache.hadoop.hive.metastore.MetaStoreUtils;
import org.apache.hadoop.hive.metastore.api.Database;
import org.apache.hadoop.hive.metastore.api.MetaException;
-import org.apache.hadoop.hive.thrift.TokenStoreDelegationTokenSecretManager.TokenStore;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.security.SaslRpcServer;
import org.apache.hadoop.security.SaslRpcServer.AuthMethod;
@@ -82,11 +81,10 @@ public class TestHadoop20SAuthBridge ext
return new TUGIAssumingTransportFactory(transFactory, realUgi);
}
- static TokenStore TOKEN_STORE = new MemoryTokenStore();
- //static TokenStore TOKEN_STORE = new ZooKeeperTokenStore("localhost:2181");
+ static DelegationTokenStore TOKEN_STORE = new MemoryTokenStore();
@Override
- protected TokenStore getTokenStore(Configuration conf) throws IOException {
+ protected DelegationTokenStore getTokenStore(Configuration conf) throws IOException {
return TOKEN_STORE;
}
}
Added: hive/trunk/shims/src/test/org/apache/hadoop/hive/thrift/TestZooKeeperTokenStore.java
URL: http://svn.apache.org/viewvc/hive/trunk/shims/src/test/org/apache/hadoop/hive/thrift/TestZooKeeperTokenStore.java?rev=1293530&view=auto
==============================================================================
--- hive/trunk/shims/src/test/org/apache/hadoop/hive/thrift/TestZooKeeperTokenStore.java (added)
+++ hive/trunk/shims/src/test/org/apache/hadoop/hive/thrift/TestZooKeeperTokenStore.java Sat Feb 25 04:21:11 2012
@@ -0,0 +1,170 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.thrift;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.List;
+
+import junit.framework.TestCase;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenSecretManager.DelegationTokenInformation;
+import org.apache.hadoop.security.token.delegation.HiveDelegationTokenSupport;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.data.ACL;
+import org.apache.zookeeper.data.Stat;
+import org.junit.Assert;
+
+public class TestZooKeeperTokenStore extends TestCase {
+
+ private MiniZooKeeperCluster zkCluster = null;
+ private ZooKeeper zkClient = null;
+
+ @Override
+ protected void setUp() throws Exception {
+ File zkDataDir = new File(System.getProperty("java.io.tmpdir"));
+ if (this.zkCluster != null) {
+ throw new IOException("Cluster already running");
+ }
+ this.zkCluster = new MiniZooKeeperCluster();
+ this.zkCluster.startup(zkDataDir);
+ this.zkClient = new ZooKeeper("localhost:"
+ + this.zkCluster.getClientPort(), 300, null);
+ }
+
+ @Override
+ protected void tearDown() throws Exception {
+ this.zkClient.close();
+ this.zkCluster.shutdown();
+ this.zkCluster = null;
+ }
+
+ private Configuration createConf(String zkPath) {
+ Configuration conf = new Configuration();
+ conf.set(
+ HadoopThriftAuthBridge20S.Server.DELEGATION_TOKEN_STORE_ZK_CONNECT_STR,
+ "localhost:" + this.zkCluster.getClientPort());
+ conf.set(
+ HadoopThriftAuthBridge20S.Server.DELEGATION_TOKEN_STORE_ZK_ZNODE,
+ zkPath);
+ return conf;
+ }
+
+ public void testTokenStorage() throws Exception {
+ String ZK_PATH = "/zktokenstore-testTokenStorage";
+ ZooKeeperTokenStore ts = new ZooKeeperTokenStore();
+ ts.setConf(createConf(ZK_PATH));
+
+ int keySeq = ts.addMasterKey("key1Data");
+ byte[] keyBytes = zkClient.getData(
+ ZK_PATH
+ + "/keys/"
+ + String.format(ZooKeeperTokenStore.ZK_SEQ_FORMAT,
+ keySeq), false, null);
+ assertNotNull(keyBytes);
+ assertEquals(new String(keyBytes), "key1Data");
+
+ int keySeq2 = ts.addMasterKey("key2Data");
+ assertEquals("keys sequential", keySeq + 1, keySeq2);
+ assertEquals("expected number keys", 2, ts.getMasterKeys().length);
+
+ ts.removeMasterKey(keySeq);
+ assertEquals("expected number keys", 1, ts.getMasterKeys().length);
+
+ // tokens
+ DelegationTokenIdentifier tokenId = new DelegationTokenIdentifier(
+ new Text("owner"), new Text("renewer"), new Text("realUser"));
+ DelegationTokenInformation tokenInfo = new DelegationTokenInformation(
+ 99, "password".getBytes());
+ ts.addToken(tokenId, tokenInfo);
+ DelegationTokenInformation tokenInfoRead = ts.getToken(tokenId);
+ assertEquals(tokenInfo.getRenewDate(), tokenInfoRead.getRenewDate());
+ assertNotSame(tokenInfo, tokenInfoRead);
+ Assert.assertArrayEquals(HiveDelegationTokenSupport
+ .encodeDelegationTokenInformation(tokenInfo),
+ HiveDelegationTokenSupport
+ .encodeDelegationTokenInformation(tokenInfoRead));
+
+ List<DelegationTokenIdentifier> allIds = ts
+ .getAllDelegationTokenIdentifiers();
+ assertEquals(1, allIds.size());
+ Assert.assertEquals(TokenStoreDelegationTokenSecretManager
+ .encodeWritable(tokenId),
+ TokenStoreDelegationTokenSecretManager.encodeWritable(allIds
+ .get(0)));
+
+ assertTrue(ts.removeToken(tokenId));
+ assertEquals(0, ts.getAllDelegationTokenIdentifiers().size());
+ }
+
+ public void testAclNoAuth() throws Exception {
+ String ZK_PATH = "/zktokenstore-testAclNoAuth";
+ Configuration conf = createConf(ZK_PATH);
+ conf.set(
+ HadoopThriftAuthBridge20S.Server.DELEGATION_TOKEN_STORE_ZK_ACL,
+ "ip:127.0.0.1:r");
+
+ ZooKeeperTokenStore ts = new ZooKeeperTokenStore();
+ try {
+ ts.setConf(conf);
+ fail("expected ACL exception");
+ } catch (DelegationTokenStore.TokenStoreException e) {
+ assertEquals(e.getCause().getClass(),
+ KeeperException.NoAuthException.class);
+ }
+ }
+
+ public void testAclInvalid() throws Exception {
+ String ZK_PATH = "/zktokenstore-testAclInvalid";
+ String aclString = "sasl:hive/host@TEST.DOMAIN:cdrwa, fail-parse-ignored";
+ Configuration conf = createConf(ZK_PATH);
+ conf.set(
+ HadoopThriftAuthBridge20S.Server.DELEGATION_TOKEN_STORE_ZK_ACL,
+ aclString);
+
+ List<ACL> aclList = ZooKeeperTokenStore.parseACLs(aclString);
+ assertEquals(1, aclList.size());
+
+ ZooKeeperTokenStore ts = new ZooKeeperTokenStore();
+ try {
+ ts.setConf(conf);
+ fail("expected ACL exception");
+ } catch (DelegationTokenStore.TokenStoreException e) {
+ assertEquals(e.getCause().getClass(),
+ KeeperException.InvalidACLException.class);
+ }
+ }
+
+ public void testAclPositive() throws Exception {
+ String ZK_PATH = "/zktokenstore-testAcl";
+ Configuration conf = createConf(ZK_PATH);
+ conf.set(
+ HadoopThriftAuthBridge20S.Server.DELEGATION_TOKEN_STORE_ZK_ACL,
+ "world:anyone:cdrwa,ip:127.0.0.1:cdrwa");
+ ZooKeeperTokenStore ts = new ZooKeeperTokenStore();
+ ts.setConf(conf);
+ List<ACL> acl = zkClient.getACL(ZK_PATH, new Stat());
+ assertEquals(2, acl.size());
+ }
+
+}