You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ha...@apache.org on 2011/12/07 03:31:00 UTC
svn commit: r1211275 - in /hive/trunk/shims: ./
src/0.20S/java/org/apache/hadoop/hive/thrift/
src/0.20S/java/org/apache/hadoop/security/
src/0.20S/java/org/apache/hadoop/security/token/
src/0.20S/java/org/apache/hadoop/security/token/delegation/ src/te...
Author: hashutosh
Date: Wed Dec 7 02:31:00 2011
New Revision: 1211275
URL: http://svn.apache.org/viewvc?rev=1211275&view=rev
Log:
HIVE-2467 : HA Support for Metastore Server (Thomas Weise via Ashutosh Chauhan)
Added:
hive/trunk/shims/src/0.20S/java/org/apache/hadoop/hive/thrift/MemoryTokenStore.java
hive/trunk/shims/src/0.20S/java/org/apache/hadoop/hive/thrift/TokenStoreDelegationTokenSecretManager.java
hive/trunk/shims/src/0.20S/java/org/apache/hadoop/hive/thrift/ZooKeeperTokenStore.java
hive/trunk/shims/src/0.20S/java/org/apache/hadoop/security/
hive/trunk/shims/src/0.20S/java/org/apache/hadoop/security/token/
hive/trunk/shims/src/0.20S/java/org/apache/hadoop/security/token/delegation/
hive/trunk/shims/src/0.20S/java/org/apache/hadoop/security/token/delegation/HiveDelegationTokenSupport.java
Modified:
hive/trunk/shims/ivy.xml
hive/trunk/shims/src/0.20S/java/org/apache/hadoop/hive/thrift/HadoopThriftAuthBridge20S.java
hive/trunk/shims/src/test/org/apache/hadoop/hive/thrift/TestHadoop20SAuthBridge.java
Modified: hive/trunk/shims/ivy.xml
URL: http://svn.apache.org/viewvc/hive/trunk/shims/ivy.xml?rev=1211275&r1=1211274&r2=1211275&view=diff
==============================================================================
--- hive/trunk/shims/ivy.xml (original)
+++ hive/trunk/shims/ivy.xml Wed Dec 7 02:31:00 2011
@@ -33,6 +33,10 @@
<dependency org="org.apache.hadoop" name="hadoop-core" rev="${hadoop.security.version}">
<artifact name="hadoop" type="source" ext="tar.gz"/>
</dependency>
+ <dependency org="org.apache.zookeeper" name="zookeeper"
+ rev="${zookeeper.version}" transitive="false">
+ <include type="jar"/>
+ </dependency>
<dependency org="org.apache.thrift" name="libthrift" rev="${libthrift.version}"
transitive="false"/>
<dependency org="commons-logging" name="commons-logging" rev="${commons-logging.version}"
Modified: hive/trunk/shims/src/0.20S/java/org/apache/hadoop/hive/thrift/HadoopThriftAuthBridge20S.java
URL: http://svn.apache.org/viewvc/hive/trunk/shims/src/0.20S/java/org/apache/hadoop/hive/thrift/HadoopThriftAuthBridge20S.java?rev=1211275&r1=1211274&r2=1211275&view=diff
==============================================================================
--- hive/trunk/shims/src/0.20S/java/org/apache/hadoop/hive/thrift/HadoopThriftAuthBridge20S.java (original)
+++ hive/trunk/shims/src/0.20S/java/org/apache/hadoop/hive/thrift/HadoopThriftAuthBridge20S.java Wed Dec 7 02:31:00 2011
@@ -35,6 +35,7 @@ import javax.security.sasl.SaslException
import javax.security.sasl.SaslServer;
import org.apache.commons.codec.binary.Base64;
+import org.apache.commons.lang.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
@@ -48,6 +49,7 @@ import org.apache.hadoop.security.author
import org.apache.hadoop.security.token.SecretManager.InvalidToken;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenIdentifier;
+import org.apache.hadoop.util.ReflectionUtils;
import org.apache.thrift.TException;
import org.apache.thrift.TProcessor;
import org.apache.thrift.protocol.TProtocol;
@@ -330,6 +332,14 @@ import org.apache.thrift.transport.TTran
"hive.cluster.delegation.token.max-lifetime";
public static final long DELEGATION_TOKEN_MAX_LIFETIME_DEFAULT =
7*24*60*60*1000; // 7 days
+ public static final String DELEGATION_TOKEN_STORE_CLS =
+ "hive.cluster.delegation.token.store.class";
+ public static final String DELEGATION_TOKEN_STORE_ZK_CONNECT_STR =
+ "hive.cluster.delegation.token.store.zookeeper.connectString";
+ public static final String DELEGATION_TOKEN_STORE_ZK_ROOT_NODE =
+ "hive.cluster.delegation.token.store.zookeeper.rootNode";
+ public static final String DELEGATION_TOKEN_STORE_ZK_ROOT_NODE_DEFAULT =
+ "/hive/cluster/delegation";
public Server() throws TTransportException {
try {
@@ -404,6 +414,23 @@ import org.apache.thrift.transport.TTran
return new TUGIAssumingProcessor(processor, secretManager);
}
+ protected TokenStoreDelegationTokenSecretManager.TokenStore getTokenStore(Configuration conf)
+ throws IOException {
+ String tokenStoreClassName = conf.get(DELEGATION_TOKEN_STORE_CLS, "");
+ if (StringUtils.isBlank(tokenStoreClassName)) {
+ return new MemoryTokenStore();
+ }
+ try {
+ Class<? extends TokenStoreDelegationTokenSecretManager.TokenStore> storeClass = Class
+ .forName(tokenStoreClassName).asSubclass(
+ TokenStoreDelegationTokenSecretManager.TokenStore.class);
+ return ReflectionUtils.newInstance(storeClass, conf);
+ } catch (ClassNotFoundException e) {
+ throw new IOException("Error initializing delegation token store: " + tokenStoreClassName,
+ e);
+ }
+ }
+
@Override
public void startDelegationTokenSecretManager(Configuration conf)
throws IOException{
@@ -416,11 +443,11 @@ import org.apache.thrift.transport.TTran
long tokenRenewInterval =
conf.getLong(DELEGATION_TOKEN_RENEW_INTERVAL_KEY,
DELEGATION_TOKEN_RENEW_INTERVAL_DEFAULT);
- secretManager =
- new DelegationTokenSecretManager(secretKeyInterval,
- tokenMaxLifetime,
- tokenRenewInterval,
- DELEGATION_TOKEN_GC_INTERVAL);
+
+ secretManager = new TokenStoreDelegationTokenSecretManager(secretKeyInterval,
+ tokenMaxLifetime,
+ tokenRenewInterval,
+ DELEGATION_TOKEN_GC_INTERVAL, getTokenStore(conf));
secretManager.startThreads();
}
Added: hive/trunk/shims/src/0.20S/java/org/apache/hadoop/hive/thrift/MemoryTokenStore.java
URL: http://svn.apache.org/viewvc/hive/trunk/shims/src/0.20S/java/org/apache/hadoop/hive/thrift/MemoryTokenStore.java?rev=1211275&view=auto
==============================================================================
--- hive/trunk/shims/src/0.20S/java/org/apache/hadoop/hive/thrift/MemoryTokenStore.java (added)
+++ hive/trunk/shims/src/0.20S/java/org/apache/hadoop/hive/thrift/MemoryTokenStore.java Wed Dec 7 02:31:00 2011
@@ -0,0 +1,99 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.thrift;
+
+import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenSecretManager.DelegationTokenInformation;
+
+/**
+ * Default in-memory token store implementation.
+ */
+public class MemoryTokenStore implements TokenStoreDelegationTokenSecretManager.TokenStore {
+
+ private final java.util.concurrent.ConcurrentHashMap<Integer, String> masterKeys
+ = new java.util.concurrent.ConcurrentHashMap<Integer, String>();
+
+ private final java.util.concurrent.ConcurrentHashMap<DelegationTokenIdentifier, DelegationTokenInformation> tokens
+ = new java.util.concurrent.ConcurrentHashMap<DelegationTokenIdentifier, DelegationTokenInformation>();
+
+ private final AtomicInteger masterKeySeq = new AtomicInteger();
+
+ @Override
+ public void setConf(Configuration conf) {
+ }
+
+ @Override
+ public Configuration getConf() {
+ return null;
+ }
+
+ @Override
+ public int addMasterKey(String s) {
+ int keySeq = masterKeySeq.getAndIncrement();
+ masterKeys.putIfAbsent(keySeq, s);
+ return keySeq;
+ }
+
+ @Override
+ public void updateMasterKey(int keySeq, String s) {
+ masterKeys.put(keySeq, s);
+ }
+
+ @Override
+ public boolean removeMasterKey(int keySeq) {
+ return masterKeys.remove(keySeq) != null;
+ }
+
+ @Override
+ public String[] getMasterKeys() {
+ return masterKeys.values().toArray(new String[0]);
+ }
+
+ @Override
+ public boolean addToken(DelegationTokenIdentifier tokenIdentifier,
+ DelegationTokenInformation token) {
+ DelegationTokenInformation tokenInfo = tokens.putIfAbsent(tokenIdentifier, token);
+ return (tokenInfo == null);
+ }
+
+ @Override
+ public boolean removeToken(DelegationTokenIdentifier tokenIdentifier) {
+ DelegationTokenInformation tokenInfo = tokens.remove(tokenIdentifier);
+ return tokenInfo != null;
+ }
+
+ @Override
+ public DelegationTokenInformation getToken(DelegationTokenIdentifier tokenIdentifier) {
+ return tokens.get(tokenIdentifier);
+ }
+
+ @Override
+ public List<DelegationTokenIdentifier> getAllDelegationTokenIdentifiers() {
+ List<DelegationTokenIdentifier> result = new java.util.ArrayList<DelegationTokenIdentifier>(
+ tokens.size());
+ for (DelegationTokenIdentifier id : tokens.keySet()) {
+ result.add(id);
+ }
+ return result;
+ }
+
+}
Added: hive/trunk/shims/src/0.20S/java/org/apache/hadoop/hive/thrift/TokenStoreDelegationTokenSecretManager.java
URL: http://svn.apache.org/viewvc/hive/trunk/shims/src/0.20S/java/org/apache/hadoop/hive/thrift/TokenStoreDelegationTokenSecretManager.java?rev=1211275&view=auto
==============================================================================
--- hive/trunk/shims/src/0.20S/java/org/apache/hadoop/hive/thrift/TokenStoreDelegationTokenSecretManager.java (added)
+++ hive/trunk/shims/src/0.20S/java/org/apache/hadoop/hive/thrift/TokenStoreDelegationTokenSecretManager.java Wed Dec 7 02:31:00 2011
@@ -0,0 +1,407 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.thrift;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.lang.reflect.Method;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.commons.codec.binary.Base64;
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenSecretManager;
+import org.apache.hadoop.security.token.delegation.DelegationKey;
+import org.apache.hadoop.security.token.delegation.HiveDelegationTokenSupport;
+import org.apache.hadoop.util.Daemon;
+import org.apache.hadoop.util.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Extension of {@link DelegationTokenSecretManager} to support alternative to default in-memory
+ * token management for fail-over and clustering through plug-able token store (ZooKeeper etc.).
+ * Delegation tokens will be retrieved from the store on-demand and (unlike base class behavior) not
+ * cached in memory. This avoids complexities related to token expiration. The security token is
+ * needed only at the time the transport is opened (as opposed to per interface operation). The
+ * assumption therefore is low cost of interprocess token retrieval (for random read efficient store
+ * such as ZooKeeper) compared to overhead of synchronizing per-process in-memory token caches.
+ * The wrapper incorporates the token store abstraction within the limitations of current
+ * Hive/Hadoop dependency (.20S) with minimum code duplication.
+ * Eventually this should be supported by Hadoop security directly.
+ */
+public class TokenStoreDelegationTokenSecretManager extends DelegationTokenSecretManager {
+
+ private static final Logger LOGGER =
+ LoggerFactory.getLogger(TokenStoreDelegationTokenSecretManager.class.getName());
+
+ /**
+ * Exception for internal token store errors that typically cannot be handled by the caller.
+ */
+ public static class TokenStoreError extends RuntimeException {
+ private static final long serialVersionUID = -8693819817623074083L;
+
+ public TokenStoreError(Throwable cause) {
+ super(cause);
+ }
+
+ public TokenStoreError(String message, Throwable cause) {
+ super(message, cause);
+ }
+ }
+
+ /**
+ * Interface for pluggable token store that can be implemented as shared store with external
+ * storage (for example with ZooKeeper for HA).
+ * Internal, store specific errors are translated into {@link TokenStoreError}.
+ */
+ public static interface TokenStore extends Configurable {
+ /**
+ * Add new master key. The token store assigns and returns the sequence number.
+ * Caller needs to use the identifier to update the key (since it is embedded in the key).
+ *
+ * @param s
+ * @return sequence number for new key
+ */
+ int addMasterKey(String s) throws TokenStoreError;
+
+ void updateMasterKey(int keySeq, String s) throws TokenStoreError;
+
+ /**
+ * Remove key for given id.
+ * @param keySeq
+ * @return false if key no longer present, true otherwise.
+ */
+ boolean removeMasterKey(int keySeq);
+
+ String[] getMasterKeys() throws TokenStoreError;
+
+ /**
+ * Add token. If identifier is already present, token won't be added.
+ * @param tokenIdentifier
+ * @param token
+ * @return true if token was added, false for existing identifier
+ */
+ boolean addToken(DelegationTokenIdentifier tokenIdentifier,
+ DelegationTokenInformation token) throws TokenStoreError;
+
+ /**
+ * Get token. Returns null if the token does not exist.
+ * @param tokenIdentifier
+ * @return
+ */
+ DelegationTokenInformation getToken(DelegationTokenIdentifier tokenIdentifier)
+ throws TokenStoreError;
+
+ /**
+ * Remove token. Ignores token does not exist.
+ * @param tokenIdentifier
+ */
+ boolean removeToken(DelegationTokenIdentifier tokenIdentifier) throws TokenStoreError;
+
+ /**
+ * List of all token identifiers in the store. This is used to remove expired tokens
+ * and a potential scalability improvement would be to partition by master key id
+ * @return
+ */
+ List<DelegationTokenIdentifier> getAllDelegationTokenIdentifiers();
+
+ }
+
+ final private long keyUpdateInterval;
+ final private long tokenRemoverScanInterval;
+ private Thread tokenRemoverThread;
+
+ final private TokenStore tokenStore;
+
+ public TokenStoreDelegationTokenSecretManager(long delegationKeyUpdateInterval,
+ long delegationTokenMaxLifetime, long delegationTokenRenewInterval,
+ long delegationTokenRemoverScanInterval, TokenStore sharedStore) {
+ super(delegationKeyUpdateInterval, delegationTokenMaxLifetime, delegationTokenRenewInterval,
+ delegationTokenRemoverScanInterval);
+ this.keyUpdateInterval = delegationKeyUpdateInterval;
+ this.tokenRemoverScanInterval = delegationTokenRemoverScanInterval;
+
+ this.tokenStore = sharedStore;
+ }
+
+ protected DelegationTokenIdentifier getTokenIdentifier(Token<DelegationTokenIdentifier> token)
+ throws IOException {
+ // turn bytes back into identifier for cache lookup
+ ByteArrayInputStream buf = new ByteArrayInputStream(token.getIdentifier());
+ DataInputStream in = new DataInputStream(buf);
+ DelegationTokenIdentifier id = createIdentifier();
+ id.readFields(in);
+ return id;
+ }
+
+ protected Map<Integer, DelegationKey> reloadKeys() {
+ // read keys from token store
+ String[] allKeys = tokenStore.getMasterKeys();
+ Map<Integer, DelegationKey> keys
+ = new java.util.HashMap<Integer, DelegationKey>(allKeys.length);
+ for (String keyStr : allKeys) {
+ DelegationKey key = new DelegationKey();
+ try {
+ decodeWritable(key, keyStr);
+ keys.put(key.getKeyId(), key);
+ } catch (IOException ex) {
+ LOGGER.error("Failed to load master key.", ex);
+ }
+ }
+ synchronized (this) {
+ super.allKeys.clear();
+ super.allKeys.putAll(keys);
+ }
+ return keys;
+ }
+
+ @Override
+ public byte[] retrievePassword(DelegationTokenIdentifier identifier)
+ throws org.apache.hadoop.security.token.SecretManager.InvalidToken {
+ DelegationTokenInformation info = this.tokenStore.getToken(identifier);
+ if (info == null) {
+ throw new InvalidToken("token expired or does not exist: " + identifier);
+ }
+ // must reuse super as info.getPassword is not accessible
+ synchronized (this) {
+ try {
+ super.currentTokens.put(identifier, info);
+ return super.retrievePassword(identifier);
+ } finally {
+ super.currentTokens.remove(identifier);
+ }
+ }
+ }
+
+ @Override
+ public DelegationTokenIdentifier cancelToken(Token<DelegationTokenIdentifier> token,
+ String canceller) throws IOException {
+ DelegationTokenIdentifier id = getTokenIdentifier(token);
+ LOGGER.info("Token cancelation requested for identifier: "+id);
+ this.tokenStore.removeToken(id);
+ return id;
+ }
+
+ /**
+ * Create the password and add it to shared store.
+ */
+ @Override
+ protected byte[] createPassword(DelegationTokenIdentifier id) {
+ byte[] password;
+ DelegationTokenInformation info;
+ synchronized (this) {
+ password = super.createPassword(id);
+ // add new token to shared store
+ // need to persist expiration along with password
+ info = super.currentTokens.remove(id);
+ if (info == null) {
+ throw new IllegalStateException("Failed to retrieve token after creation");
+ }
+ }
+ this.tokenStore.addToken(id, info);
+ return password;
+ }
+
+ @Override
+ public long renewToken(Token<DelegationTokenIdentifier> token,
+ String renewer) throws InvalidToken, IOException {
+ // since renewal is KERBEROS authenticated token may not be cached
+ final DelegationTokenIdentifier id = getTokenIdentifier(token);
+ DelegationTokenInformation tokenInfo = this.tokenStore.getToken(id);
+ if (tokenInfo == null) {
+ throw new InvalidToken("token does not exist: " + id); // no token found
+ }
+ // ensure associated master key is available
+ if (!super.allKeys.containsKey(id.getMasterKeyId())) {
+ LOGGER.info("Unknown master key (id={}), (re)loading keys from token store.",
+ id.getMasterKeyId());
+ reloadKeys();
+ }
+ // reuse super renewal logic
+ synchronized (this) {
+ super.currentTokens.put(id, tokenInfo);
+ try {
+ return super.renewToken(token, renewer);
+ } finally {
+ super.currentTokens.remove(id);
+ }
+ }
+ }
+
+ public static String encodeWritable(Writable key) throws IOException {
+ ByteArrayOutputStream bos = new ByteArrayOutputStream();
+ DataOutputStream dos = new DataOutputStream(bos);
+ key.write(dos);
+ dos.flush();
+ return Base64.encodeBase64URLSafeString(bos.toByteArray());
+ }
+
+ public static void decodeWritable(Writable w, String idStr) throws IOException {
+ DataInputStream in = new DataInputStream(new ByteArrayInputStream(Base64.decodeBase64(idStr)));
+ w.readFields(in);
+ }
+
+ /**
+ * Synchronize master key updates / sequence generation for multiple nodes.
+ * NOTE: {@Link AbstractDelegationTokenSecretManager} keeps currentKey private, so we need
+ * to utilize this "hook" to manipulate the key through the object reference.
+ * This .20S workaround should cease to exist when Hadoop supports token store.
+ */
+ @Override
+ protected void logUpdateMasterKey(DelegationKey key) throws IOException {
+ int keySeq = this.tokenStore.addMasterKey(encodeWritable(key));
+ // update key with assigned identifier
+ DelegationKey keyWithSeq = new DelegationKey(keySeq, key.getExpiryDate(), key.getKey());
+ String keyStr = encodeWritable(keyWithSeq);
+ this.tokenStore.updateMasterKey(keySeq, keyStr);
+ decodeWritable(key, keyStr);
+ LOGGER.info("New master key with key id={}", key.getKeyId());
+ super.logUpdateMasterKey(key);
+ }
+
+ @Override
+ public synchronized void startThreads() throws IOException {
+ try {
+ // updateCurrentKey needs to be called to initialize the master key
+ // (there should be a null check added in the future in rollMasterKey)
+ // updateCurrentKey();
+ Method m = AbstractDelegationTokenSecretManager.class.getDeclaredMethod("updateCurrentKey");
+ m.setAccessible(true);
+ m.invoke(this);
+ } catch (Exception e) {
+ throw new IOException("Failed to initialize master key", e);
+ }
+ running = true;
+ tokenRemoverThread = new Daemon(new ExpiredTokenRemover());
+ tokenRemoverThread.start();
+ }
+
+ @Override
+ public synchronized void stopThreads() {
+ if (LOGGER.isDebugEnabled()) {
+ LOGGER.debug("Stopping expired delegation token remover thread");
+ }
+ running = false;
+ if (tokenRemoverThread != null) {
+ tokenRemoverThread.interrupt();
+ }
+ }
+
+ /**
+ * Remove expired tokens. Replaces logic in {@link AbstractDelegationTokenSecretManager}
+ * that cannot be reused due to private method access. Logic here can more efficiently
+ * deal with external token store by only loading into memory the minimum data needed.
+ */
+ protected void removeExpiredTokens() {
+ long now = System.currentTimeMillis();
+ Iterator<DelegationTokenIdentifier> i = tokenStore.getAllDelegationTokenIdentifiers()
+ .iterator();
+ while (i.hasNext()) {
+ DelegationTokenIdentifier id = i.next();
+ if (now > id.getMaxDate()) {
+ this.tokenStore.removeToken(id); // no need to look at token info
+ } else {
+ // get token info to check renew date
+ DelegationTokenInformation tokenInfo = tokenStore.getToken(id);
+ if (tokenInfo != null) {
+ if (now > tokenInfo.getRenewDate()) {
+ this.tokenStore.removeToken(id);
+ }
+ }
+ }
+ }
+ }
+
+ /**
+ * Extension of rollMasterKey to remove expired keys from store.
+ * @throws IOException
+ */
+ protected void rollMasterKeyExt() throws IOException {
+ Map<Integer, DelegationKey> keys = reloadKeys();
+ int currentKeyId = super.currentId;
+ HiveDelegationTokenSupport.rollMasterKey(TokenStoreDelegationTokenSecretManager.this);
+ List<DelegationKey> keysAfterRoll = Arrays.asList(getAllKeys());
+ for (DelegationKey key : keysAfterRoll) {
+ keys.remove(key.getKeyId());
+ if (key.getKeyId() == currentKeyId) {
+ tokenStore.updateMasterKey(currentKeyId, encodeWritable(key));
+ }
+ }
+ for (DelegationKey expiredKey : keys.values()) {
+ LOGGER.info("Removing expired key id={}", expiredKey.getKeyId());
+ tokenStore.removeMasterKey(expiredKey.getKeyId());
+ }
+ }
+
+
+ /**
+ * Cloned from {@link AbstractDelegationTokenSecretManager} to deal with private access
+ * restriction (there would not be an need to clone the remove thread if the remove logic was
+ * protected/extensible).
+ */
+ protected class ExpiredTokenRemover extends Thread {
+ private long lastMasterKeyUpdate;
+ private long lastTokenCacheCleanup;
+
+ @Override
+ public void run() {
+ LOGGER.info("Starting expired delegation token remover thread, "
+ + "tokenRemoverScanInterval=" + tokenRemoverScanInterval
+ / (60 * 1000) + " min(s)");
+ try {
+ while (running) {
+ long now = System.currentTimeMillis();
+ if (lastMasterKeyUpdate + keyUpdateInterval < now) {
+ try {
+ rollMasterKeyExt();
+ lastMasterKeyUpdate = now;
+ } catch (IOException e) {
+ LOGGER.error("Master key updating failed. "
+ + StringUtils.stringifyException(e));
+ }
+ }
+ if (lastTokenCacheCleanup + tokenRemoverScanInterval < now) {
+ removeExpiredTokens();
+ lastTokenCacheCleanup = now;
+ }
+ try {
+ Thread.sleep(5000); // 5 seconds
+ } catch (InterruptedException ie) {
+ LOGGER
+ .error("InterruptedExcpetion recieved for ExpiredTokenRemover thread "
+ + ie);
+ }
+ }
+ } catch (Throwable t) {
+ LOGGER.error("ExpiredTokenRemover thread received unexpected exception. "
+ + t, t);
+ Runtime.getRuntime().exit(-1);
+ }
+ }
+ }
+
+}
Added: hive/trunk/shims/src/0.20S/java/org/apache/hadoop/hive/thrift/ZooKeeperTokenStore.java
URL: http://svn.apache.org/viewvc/hive/trunk/shims/src/0.20S/java/org/apache/hadoop/hive/thrift/ZooKeeperTokenStore.java?rev=1211275&view=auto
==============================================================================
--- hive/trunk/shims/src/0.20S/java/org/apache/hadoop/hive/thrift/ZooKeeperTokenStore.java (added)
+++ hive/trunk/shims/src/0.20S/java/org/apache/hadoop/hive/thrift/ZooKeeperTokenStore.java Wed Dec 7 02:31:00 2011
@@ -0,0 +1,323 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.thrift;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.thrift.TokenStoreDelegationTokenSecretManager.TokenStoreError;
+import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenSecretManager.DelegationTokenInformation;
+import org.apache.hadoop.security.token.delegation.HiveDelegationTokenSupport;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.ZooDefs.Ids;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.ZooKeeper.States;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * ZooKeeper token store implementation.
+ */
+public class ZooKeeperTokenStore implements TokenStoreDelegationTokenSecretManager.TokenStore {
+
+ private static final Logger LOGGER =
+ LoggerFactory.getLogger(ZooKeeperTokenStore.class.getName());
+
+ private static final String ZK_SEQ_FORMAT = "%010d";
+ private static final String NODE_KEYS = "/keys";
+ private static final String NODE_TOKENS = "/tokens";
+
+ private String rootNode = "";
+ private volatile ZooKeeper zkSession;
+ private String zkConnectString;
+ private final int zkSessionTimeout = 3000;
+
+ private class ZooKeeperWatcher implements Watcher {
+ public void process(org.apache.zookeeper.WatchedEvent event) {
+ LOGGER.info(event.toString());
+ if (event.getState() == Watcher.Event.KeeperState.Expired) {
+ LOGGER.warn("ZooKeeper session expired, discarding connection");
+ try {
+ zkSession.close();
+ } catch (Throwable e) {
+ LOGGER.warn("Failed to close connection on expired session", e);
+ }
+ }
+ }
+ }
+
+ /**
+ * Default constructor for dynamic instantiation w/ Configurable
+ * (ReflectionUtils does not support Configuration constructor injection).
+ */
+ protected ZooKeeperTokenStore() {
+ }
+
+ public ZooKeeperTokenStore(String hostPort) {
+ this.zkConnectString = hostPort;
+ init();
+ }
+
+ private ZooKeeper getSession() {
+ if (zkSession == null || zkSession.getState() == States.CLOSED) {
+ synchronized (this) {
+ if (zkSession == null || zkSession.getState() == States.CLOSED) {
+ try {
+ zkSession = new ZooKeeper(this.zkConnectString, this.zkSessionTimeout,
+ new ZooKeeperWatcher());
+ } catch (IOException ex) {
+ throw new TokenStoreError("Token store error.", ex);
+ }
+ }
+ }
+ }
+ return zkSession;
+ }
+
+ private static String ensurePath(ZooKeeper zk, String path) throws KeeperException,
+ InterruptedException {
+ String[] pathComps = StringUtils.splitByWholeSeparator(path, "/");
+ String currentPath = "";
+ for (String pathComp : pathComps) {
+ currentPath += "/" + pathComp;
+ try {
+ String node = zk.create(currentPath, new byte[0], Ids.OPEN_ACL_UNSAFE,
+ CreateMode.PERSISTENT);
+ LOGGER.info("Created path: " + node);
+ } catch (KeeperException.NodeExistsException e) {
+ }
+ }
+ return currentPath;
+ }
+
+ private void init() {
+ if (this.zkConnectString == null) {
+ throw new IllegalStateException("Not initialized");
+ }
+
+ if (this.zkSession != null) {
+ try {
+ this.zkSession.close();
+ } catch (InterruptedException ex) {
+ LOGGER.warn("Failed to close existing session.", ex);
+ }
+ }
+
+ ZooKeeper zk = getSession();
+ try {
+ ensurePath(zk, rootNode + NODE_KEYS);
+ ensurePath(zk, rootNode + NODE_TOKENS);
+ } catch (Exception e) {
+ throw new TokenStoreError("Failed to validate token path.", e);
+ }
+ }
+
+ @Override
+ public void setConf(Configuration conf) {
+ if (conf != null) {
+ this.zkConnectString = conf.get(
+ HadoopThriftAuthBridge20S.Server.DELEGATION_TOKEN_STORE_ZK_CONNECT_STR, null);
+ this.rootNode = conf.get(
+ HadoopThriftAuthBridge20S.Server.DELEGATION_TOKEN_STORE_ZK_ROOT_NODE,
+ HadoopThriftAuthBridge20S.Server.DELEGATION_TOKEN_STORE_ZK_ROOT_NODE_DEFAULT);
+ }
+ init();
+ }
+
+ @Override
+ public Configuration getConf() {
+ return null; // not required
+ }
+
+ private Map<Integer, byte[]> getAllKeys() throws KeeperException,
+ InterruptedException {
+
+ String masterKeyNode = rootNode + NODE_KEYS;
+ ZooKeeper zk = getSession();
+ List<String> nodes = zk.getChildren(masterKeyNode, false);
+ Map<Integer, byte[]> result = new HashMap<Integer, byte[]>();
+ for (String node : nodes) {
+ byte[] data = zk.getData(masterKeyNode + "/" + node, false, null);
+ if (data != null) {
+ result.put(getSeq(node), data);
+ }
+ }
+ return result;
+ }
+
+ private int getSeq(String path) {
+ String[] pathComps = path.split("/");
+ return Integer.parseInt(pathComps[pathComps.length-1]);
+ }
+
+ @Override
+ public int addMasterKey(String s) {
+ try {
+ ZooKeeper zk = getSession();
+ String newNode = zk.create(rootNode + NODE_KEYS + "/", s.getBytes(), Ids.OPEN_ACL_UNSAFE,
+ CreateMode.PERSISTENT_SEQUENTIAL);
+ LOGGER.info("Added key {}", newNode);
+ return getSeq(newNode);
+ } catch (KeeperException ex) {
+ throw new TokenStoreError(ex);
+ } catch (InterruptedException ex) {
+ throw new TokenStoreError(ex);
+ }
+ }
+
+ @Override
+ public void updateMasterKey(int keySeq, String s) {
+ try {
+ ZooKeeper zk = getSession();
+ zk.setData(rootNode + NODE_KEYS + "/" + String.format(ZK_SEQ_FORMAT, keySeq), s.getBytes(),
+ -1);
+ } catch (KeeperException ex) {
+ throw new TokenStoreError(ex);
+ } catch (InterruptedException ex) {
+ throw new TokenStoreError(ex);
+ }
+ }
+
+ @Override
+ public boolean removeMasterKey(int keySeq) {
+ try {
+ ZooKeeper zk = getSession();
+ zk.delete(rootNode + NODE_KEYS + "/" + String.format(ZK_SEQ_FORMAT, keySeq), -1);
+ return true;
+ } catch (KeeperException.NoNodeException ex) {
+ return false;
+ } catch (KeeperException ex) {
+ throw new TokenStoreError(ex);
+ } catch (InterruptedException ex) {
+ throw new TokenStoreError(ex);
+ }
+ }
+
+ @Override
+ public String[] getMasterKeys() {
+ try {
+ Map<Integer, byte[]> allKeys = getAllKeys();
+ String[] result = new String[allKeys.size()];
+ int resultIdx = 0;
+ for (byte[] keyBytes : allKeys.values()) {
+ result[resultIdx++] = new String(keyBytes);
+ }
+ return result;
+ } catch (KeeperException ex) {
+ throw new TokenStoreError(ex);
+ } catch (InterruptedException ex) {
+ throw new TokenStoreError(ex);
+ }
+ }
+
+
+ private String getTokenPath(DelegationTokenIdentifier tokenIdentifier) {
+ try {
+ return rootNode + NODE_TOKENS + "/"
+ + TokenStoreDelegationTokenSecretManager.encodeWritable(tokenIdentifier);
+ } catch (IOException ex) {
+ throw new TokenStoreError("Failed to encode token identifier", ex);
+ }
+ }
+
+ @Override
+ public boolean addToken(DelegationTokenIdentifier tokenIdentifier,
+ DelegationTokenInformation token) {
+ try {
+ ZooKeeper zk = getSession();
+ byte[] tokenBytes = HiveDelegationTokenSupport.encodeDelegationTokenInformation(token);
+ String newNode = zk.create(getTokenPath(tokenIdentifier),
+ tokenBytes, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+ LOGGER.info("Added token: {}", newNode);
+ return true;
+ } catch (KeeperException.NodeExistsException ex) {
+ return false;
+ } catch (KeeperException ex) {
+ throw new TokenStoreError(ex);
+ } catch (InterruptedException ex) {
+ throw new TokenStoreError(ex);
+ }
+ }
+
+ @Override
+ public boolean removeToken(DelegationTokenIdentifier tokenIdentifier) {
+ try {
+ ZooKeeper zk = getSession();
+ zk.delete(getTokenPath(tokenIdentifier), -1);
+ return true;
+ } catch (KeeperException.NoNodeException ex) {
+ return false;
+ } catch (KeeperException ex) {
+ throw new TokenStoreError(ex);
+ } catch (InterruptedException ex) {
+ throw new TokenStoreError(ex);
+ }
+ }
+
+ @Override
+ public DelegationTokenInformation getToken(DelegationTokenIdentifier tokenIdentifier) {
+ try {
+ ZooKeeper zk = getSession();
+ byte[] tokenBytes = zk.getData(getTokenPath(tokenIdentifier), false, null);
+ try {
+ return HiveDelegationTokenSupport.decodeDelegationTokenInformation(tokenBytes);
+ } catch (Exception ex) {
+ throw new TokenStoreError("Failed to decode token", ex);
+ }
+ } catch (KeeperException.NoNodeException ex) {
+ return null;
+ } catch (KeeperException ex) {
+ throw new TokenStoreError(ex);
+ } catch (InterruptedException ex) {
+ throw new TokenStoreError(ex);
+ }
+ }
+
+ @Override
+ public List<DelegationTokenIdentifier> getAllDelegationTokenIdentifiers() {
+ String containerNode = rootNode + NODE_TOKENS;
+ final List<String> nodes;
+ try {
+ nodes = getSession().getChildren(containerNode, false);
+ } catch (KeeperException ex) {
+ throw new TokenStoreError(ex);
+ } catch (InterruptedException ex) {
+ throw new TokenStoreError(ex);
+ }
+ List<DelegationTokenIdentifier> result = new java.util.ArrayList<DelegationTokenIdentifier>(
+ nodes.size());
+ for (String node : nodes) {
+ DelegationTokenIdentifier id = new DelegationTokenIdentifier();
+ try {
+ TokenStoreDelegationTokenSecretManager.decodeWritable(id, node);
+ result.add(id);
+ } catch (Exception e) {
+ LOGGER.warn("Failed to decode token '{}'", node);
+ }
+ }
+ return result;
+ }
+
+}
Added: hive/trunk/shims/src/0.20S/java/org/apache/hadoop/security/token/delegation/HiveDelegationTokenSupport.java
URL: http://svn.apache.org/viewvc/hive/trunk/shims/src/0.20S/java/org/apache/hadoop/security/token/delegation/HiveDelegationTokenSupport.java?rev=1211275&view=auto
==============================================================================
--- hive/trunk/shims/src/0.20S/java/org/apache/hadoop/security/token/delegation/HiveDelegationTokenSupport.java (added)
+++ hive/trunk/shims/src/0.20S/java/org/apache/hadoop/security/token/delegation/HiveDelegationTokenSupport.java Wed Dec 7 02:31:00 2011
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.security.token.delegation;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+
+import org.apache.hadoop.io.WritableUtils;
+import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenSecretManager.DelegationTokenInformation;
+
+/**
+ * Workaround for serialization of {@link DelegationTokenInformation} through package access.
+ * Future version of Hadoop should add this to DelegationTokenInformation itself.
+ */
+public final class HiveDelegationTokenSupport {
+
+ private HiveDelegationTokenSupport() {}
+
+ public static byte[] encodeDelegationTokenInformation(DelegationTokenInformation token) {
+ try {
+ ByteArrayOutputStream bos = new ByteArrayOutputStream();
+ DataOutputStream out = new DataOutputStream(bos);
+ WritableUtils.writeVInt(out, token.password.length);
+ out.write(token.password);
+ out.writeLong(token.renewDate);
+ out.flush();
+ return bos.toByteArray();
+ } catch (IOException ex) {
+ throw new RuntimeException("Failed to encode token.", ex);
+ }
+ }
+
+ public static DelegationTokenInformation decodeDelegationTokenInformation(byte[] tokenBytes)
+ throws IOException {
+ DataInputStream in = new DataInputStream(new ByteArrayInputStream(tokenBytes));
+ DelegationTokenInformation token = new DelegationTokenInformation(0, null);
+ int len = WritableUtils.readVInt(in);
+ token.password = new byte[len];
+ in.readFully(token.password);
+ token.renewDate = in.readLong();
+ return token;
+ }
+
+ public static void rollMasterKey(
+ AbstractDelegationTokenSecretManager<? extends AbstractDelegationTokenIdentifier> mgr)
+ throws IOException {
+ mgr.rollMasterKey();
+ }
+
+}
Modified: hive/trunk/shims/src/test/org/apache/hadoop/hive/thrift/TestHadoop20SAuthBridge.java
URL: http://svn.apache.org/viewvc/hive/trunk/shims/src/test/org/apache/hadoop/hive/thrift/TestHadoop20SAuthBridge.java?rev=1211275&r1=1211274&r2=1211275&view=diff
==============================================================================
--- hive/trunk/shims/src/test/org/apache/hadoop/hive/thrift/TestHadoop20SAuthBridge.java (original)
+++ hive/trunk/shims/src/test/org/apache/hadoop/hive/thrift/TestHadoop20SAuthBridge.java Wed Dec 7 02:31:00 2011
@@ -39,6 +39,7 @@ import org.apache.hadoop.hive.metastore.
import org.apache.hadoop.hive.metastore.MetaStoreUtils;
import org.apache.hadoop.hive.metastore.api.Database;
import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.thrift.TokenStoreDelegationTokenSecretManager.TokenStore;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.security.SaslRpcServer;
import org.apache.hadoop.security.SaslRpcServer.AuthMethod;
@@ -46,7 +47,10 @@ import org.apache.hadoop.security.UserGr
import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod;
import org.apache.hadoop.security.authorize.AuthorizationException;
import org.apache.hadoop.security.authorize.ProxyUsers;
+import org.apache.hadoop.security.token.SecretManager.InvalidToken;
import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenSecretManager.DelegationTokenInformation;
+import org.apache.hadoop.security.token.delegation.DelegationKey;
import org.apache.hadoop.util.StringUtils;
import org.apache.thrift.transport.TSaslServerTransport;
import org.apache.thrift.transport.TTransportException;
@@ -78,12 +82,19 @@ public class TestHadoop20SAuthBridge ext
return new TUGIAssumingTransportFactory(transFactory, realUgi);
}
+ static TokenStore TOKEN_STORE = new MemoryTokenStore();
+ //static TokenStore TOKEN_STORE = new ZooKeeperTokenStore("localhost:2181");
+
+ @Override
+ protected TokenStore getTokenStore(Configuration conf) throws IOException {
+ return TOKEN_STORE;
+ }
}
}
-
+
private HiveConf conf;
-
+
private void configureSuperUserIPAddresses(Configuration conf,
String superUserShortName) throws IOException {
ArrayList<String> ipList = new ArrayList<String>();
@@ -107,7 +118,7 @@ public class TestHadoop20SAuthBridge ext
conf.setStrings(ProxyUsers.getProxySuperuserIpConfKey(superUserShortName),
builder.toString());
}
-
+
public void setup(final int port) throws Exception {
System.setProperty(HiveConf.ConfVars.METASTORE_USE_THRIFT_SASL.varname,
"true");
@@ -120,28 +131,105 @@ public class TestHadoop20SAuthBridge ext
MetaStoreUtils.startMetaStore(port, new MyHadoopThriftAuthBridge20S());
}
+ /**
+ * Test delegation token store/load from shared store.
+ * @throws Exception
+ */
+ public void testDelegationTokenSharedStore() throws Exception {
+ UserGroupInformation clientUgi = UserGroupInformation.getCurrentUser();
+
+ TokenStoreDelegationTokenSecretManager tokenManager =
+ new TokenStoreDelegationTokenSecretManager(0, 60*60*1000, 60*60*1000, 0,
+ MyHadoopThriftAuthBridge20S.Server.TOKEN_STORE);
+ // initializes current key
+ tokenManager.startThreads();
+ tokenManager.stopThreads();
+
+ String tokenStrForm = tokenManager.getDelegationToken(clientUgi.getShortUserName());
+ Token<DelegationTokenIdentifier> t= new Token<DelegationTokenIdentifier>();
+ t.decodeFromUrlString(tokenStrForm);
+
+ //check whether the username in the token is what we expect
+ DelegationTokenIdentifier d = new DelegationTokenIdentifier();
+ d.readFields(new DataInputStream(new ByteArrayInputStream(
+ t.getIdentifier())));
+ assertTrue("Usernames don't match",
+ clientUgi.getShortUserName().equals(d.getUser().getShortUserName()));
+
+ DelegationTokenInformation tokenInfo = MyHadoopThriftAuthBridge20S.Server.TOKEN_STORE
+ .getToken(d);
+ assertNotNull("token not in store", tokenInfo);
+ assertFalse("duplicate token add",
+ MyHadoopThriftAuthBridge20S.Server.TOKEN_STORE.addToken(d, tokenInfo));
+
+ // check keys are copied from token store when token is loaded
+ TokenStoreDelegationTokenSecretManager anotherManager =
+ new TokenStoreDelegationTokenSecretManager(0, 0, 0, 0,
+ MyHadoopThriftAuthBridge20S.Server.TOKEN_STORE);
+ assertEquals("master keys empty on init", 0,
+ anotherManager.getAllKeys().length);
+ assertNotNull("token loaded",
+ anotherManager.retrievePassword(d));
+ anotherManager.renewToken(t, clientUgi.getShortUserName());
+ assertEquals("master keys not loaded from store",
+ MyHadoopThriftAuthBridge20S.Server.TOKEN_STORE.getMasterKeys().length,
+ anotherManager.getAllKeys().length);
+
+ // cancel the delegation token
+ tokenManager.cancelDelegationToken(tokenStrForm);
+ assertNull("token not removed from store after cancel",
+ MyHadoopThriftAuthBridge20S.Server.TOKEN_STORE.getToken(d));
+ assertFalse("token removed (again)",
+ MyHadoopThriftAuthBridge20S.Server.TOKEN_STORE.removeToken(d));
+ try {
+ anotherManager.retrievePassword(d);
+ fail("InvalidToken expected after cancel");
+ } catch (InvalidToken ex) {
+ // expected
+ }
+
+ // token expiration
+ MyHadoopThriftAuthBridge20S.Server.TOKEN_STORE.addToken(d,
+ new DelegationTokenInformation(0, t.getPassword()));
+ assertNotNull(MyHadoopThriftAuthBridge20S.Server.TOKEN_STORE.getToken(d));
+ anotherManager.removeExpiredTokens();
+ assertNull("Expired token not removed",
+ MyHadoopThriftAuthBridge20S.Server.TOKEN_STORE.getToken(d));
+
+ // key expiration - create an already expired key
+ anotherManager.startThreads(); // generates initial key
+ anotherManager.stopThreads();
+ DelegationKey expiredKey = new DelegationKey(-1, 0, anotherManager.getAllKeys()[0].getKey());
+ anotherManager.logUpdateMasterKey(expiredKey); // updates key with sequence number
+ assertTrue("expired key not in allKeys",
+ anotherManager.reloadKeys().containsKey(expiredKey.getKeyId()));
+ anotherManager.rollMasterKeyExt();
+ assertFalse("Expired key not removed",
+ anotherManager.reloadKeys().containsKey(expiredKey.getKeyId()));
+ }
+
public void testSaslWithHiveMetaStore() throws Exception {
setup(10000);
UserGroupInformation clientUgi = UserGroupInformation.getCurrentUser();
obtainTokenAndAddIntoUGI(clientUgi, null);
obtainTokenAndAddIntoUGI(clientUgi, "tokenForFooTablePartition");
}
-
+
public void testMetastoreProxyUser() throws Exception {
setup(10010);
-
+
final String proxyUserName = "proxyUser";
- //set the configuration up such that proxyUser can act on
+ //set the configuration up such that proxyUser can act on
//behalf of all users belonging to the group foo_bar_group (
//a dummy group)
String[] groupNames =
new String[] { "foo_bar_group" };
setGroupsInConf(groupNames, proxyUserName);
-
- final UserGroupInformation delegationTokenUser =
+
+ final UserGroupInformation delegationTokenUser =
UserGroupInformation.getCurrentUser();
-
- final UserGroupInformation proxyUserUgi =
+
+ final UserGroupInformation proxyUserUgi =
UserGroupInformation.createRemoteUser(proxyUserName);
String tokenStrForm = proxyUserUgi.doAs(new PrivilegedExceptionAction<String>() {
public String run() throws Exception {
@@ -154,11 +242,11 @@ public class TestHadoop20SAuthBridge ext
}
}
});
- assertTrue("Expected the getDelegationToken call to fail",
+ assertTrue("Expected the getDelegationToken call to fail",
tokenStrForm == null);
-
- //set the configuration up such that proxyUser can act on
- //behalf of all users belonging to the real group(s) that the
+
+ //set the configuration up such that proxyUser can act on
+ //behalf of all users belonging to the real group(s) that the
//user running the test belongs to
setGroupsInConf(UserGroupInformation.getCurrentUser().getGroupNames(),
proxyUserName);
@@ -173,7 +261,7 @@ public class TestHadoop20SAuthBridge ext
}
}
});
- assertTrue("Expected the getDelegationToken call to not fail",
+ assertTrue("Expected the getDelegationToken call to not fail",
tokenStrForm != null);
Token<DelegationTokenIdentifier> t= new Token<DelegationTokenIdentifier>();
t.decodeFromUrlString(tokenStrForm);
@@ -181,12 +269,12 @@ public class TestHadoop20SAuthBridge ext
DelegationTokenIdentifier d = new DelegationTokenIdentifier();
d.readFields(new DataInputStream(new ByteArrayInputStream(
t.getIdentifier())));
- assertTrue("Usernames don't match",
+ assertTrue("Usernames don't match",
delegationTokenUser.getShortUserName().equals(d.getUser().getShortUserName()));
-
+
}
-
- private void setGroupsInConf(String[] groupNames, String proxyUserName)
+
+ private void setGroupsInConf(String[] groupNames, String proxyUserName)
throws IOException {
conf.set(
ProxyUsers.getProxySuperuserGroupConfKey(proxyUserName),
@@ -194,21 +282,21 @@ public class TestHadoop20SAuthBridge ext
configureSuperUserIPAddresses(conf, proxyUserName);
ProxyUsers.refreshSuperUserGroupsConfiguration(conf);
}
-
- private String getDelegationTokenStr(UserGroupInformation ownerUgi,
+
+ private String getDelegationTokenStr(UserGroupInformation ownerUgi,
UserGroupInformation realUgi) throws Exception {
//obtain a token by directly invoking the metastore operation(without going
//through the thrift interface). Obtaining a token makes the secret manager
//aware of the user and that it gave the token to the user
- //also set the authentication method explicitly to KERBEROS. Since the
+ //also set the authentication method explicitly to KERBEROS. Since the
//metastore checks whether the authentication method is KERBEROS or not
- //for getDelegationToken, and the testcases don't use
+ //for getDelegationToken, and the testcases don't use
//kerberos, this needs to be done
HadoopThriftAuthBridge20S.Server.authenticationMethod
.set(AuthenticationMethod.KERBEROS);
- HadoopThriftAuthBridge20S.Server.remoteAddress.set(InetAddress.getLocalHost());
+ HadoopThriftAuthBridge20S.Server.remoteAddress.set(InetAddress.getLocalHost());
return
- HiveMetaStore.getDelegationToken(ownerUgi.getShortUserName(),
+ HiveMetaStore.getDelegationToken(ownerUgi.getShortUserName(),
realUgi.getShortUserName());
}
@@ -217,14 +305,14 @@ public class TestHadoop20SAuthBridge ext
String tokenStrForm = getDelegationTokenStr(clientUgi, clientUgi);
Token<DelegationTokenIdentifier> t= new Token<DelegationTokenIdentifier>();
t.decodeFromUrlString(tokenStrForm);
-
+
//check whether the username in the token is what we expect
DelegationTokenIdentifier d = new DelegationTokenIdentifier();
d.readFields(new DataInputStream(new ByteArrayInputStream(
t.getIdentifier())));
- assertTrue("Usernames don't match",
+ assertTrue("Usernames don't match",
clientUgi.getShortUserName().equals(d.getUser().getShortUserName()));
-
+
if (tokenSig != null) {
conf.set("hive.metastore.token.signature", tokenSig);
t.setService(new Text(tokenSig));
@@ -247,7 +335,7 @@ public class TestHadoop20SAuthBridge ext
//try out some metastore operations
createDBAndVerifyExistence(hiveClient);
-
+
//check that getDelegationToken fails since we are not authenticating
//over kerberos
boolean pass = false;