You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ha...@apache.org on 2014/11/27 02:07:35 UTC
svn commit: r1641980 [4/4] - in /hive/trunk: ./
beeline/src/test/org/apache/hive/beeline/
common/src/java/org/apache/hadoop/hive/common/
common/src/java/org/apache/hadoop/hive/conf/
hcatalog/core/src/main/java/org/apache/hive/hcatalog/common/ hcatalog/...
Added: hive/trunk/shims/common/src/main/java/org/apache/hadoop/hive/thrift/TokenStoreDelegationTokenSecretManager.java
URL: http://svn.apache.org/viewvc/hive/trunk/shims/common/src/main/java/org/apache/hadoop/hive/thrift/TokenStoreDelegationTokenSecretManager.java?rev=1641980&view=auto
==============================================================================
--- hive/trunk/shims/common/src/main/java/org/apache/hadoop/hive/thrift/TokenStoreDelegationTokenSecretManager.java (added)
+++ hive/trunk/shims/common/src/main/java/org/apache/hadoop/hive/thrift/TokenStoreDelegationTokenSecretManager.java Thu Nov 27 01:07:32 2014
@@ -0,0 +1,338 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.thrift;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.lang.reflect.Method;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.commons.codec.binary.Base64;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenSecretManager;
+import org.apache.hadoop.security.token.delegation.DelegationKey;
+import org.apache.hadoop.security.token.delegation.HiveDelegationTokenSupport;
+import org.apache.hadoop.util.Daemon;
+import org.apache.hadoop.util.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Extension of {@link DelegationTokenSecretManager} to support alternative to default in-memory
+ * token management for fail-over and clustering through plug-able token store (ZooKeeper etc.).
+ * Delegation tokens will be retrieved from the store on-demand and (unlike base class behavior) not
+ * cached in memory. This avoids complexities related to token expiration. The security token is
+ * needed only at the time the transport is opened (as opposed to per interface operation). The
+ * assumption therefore is low cost of interprocess token retrieval (for random read efficient store
+ * such as ZooKeeper) compared to overhead of synchronizing per-process in-memory token caches.
+ * The wrapper incorporates the token store abstraction within the limitations of current
+ * Hive/Hadoop dependency (.20S) with minimum code duplication.
+ * Eventually this should be supported by Hadoop security directly.
+ */
+public class TokenStoreDelegationTokenSecretManager extends DelegationTokenSecretManager {
+
+ private static final Logger LOGGER =
+ LoggerFactory.getLogger(TokenStoreDelegationTokenSecretManager.class.getName());
+
+ final private long keyUpdateInterval;
+ final private long tokenRemoverScanInterval;
+ private Thread tokenRemoverThread;
+
+ final private DelegationTokenStore tokenStore;
+
+ public TokenStoreDelegationTokenSecretManager(long delegationKeyUpdateInterval,
+ long delegationTokenMaxLifetime, long delegationTokenRenewInterval,
+ long delegationTokenRemoverScanInterval,
+ DelegationTokenStore sharedStore) {
+ super(delegationKeyUpdateInterval, delegationTokenMaxLifetime, delegationTokenRenewInterval,
+ delegationTokenRemoverScanInterval);
+ this.keyUpdateInterval = delegationKeyUpdateInterval;
+ this.tokenRemoverScanInterval = delegationTokenRemoverScanInterval;
+
+ this.tokenStore = sharedStore;
+ }
+
+ protected DelegationTokenIdentifier getTokenIdentifier(Token<DelegationTokenIdentifier> token)
+ throws IOException {
+ // turn bytes back into identifier for cache lookup
+ ByteArrayInputStream buf = new ByteArrayInputStream(token.getIdentifier());
+ DataInputStream in = new DataInputStream(buf);
+ DelegationTokenIdentifier id = createIdentifier();
+ id.readFields(in);
+ return id;
+ }
+
+ protected Map<Integer, DelegationKey> reloadKeys() {
+ // read keys from token store
+ String[] allKeys = tokenStore.getMasterKeys();
+ Map<Integer, DelegationKey> keys
+ = new HashMap<Integer, DelegationKey>(allKeys.length);
+ for (String keyStr : allKeys) {
+ DelegationKey key = new DelegationKey();
+ try {
+ decodeWritable(key, keyStr);
+ keys.put(key.getKeyId(), key);
+ } catch (IOException ex) {
+ LOGGER.error("Failed to load master key.", ex);
+ }
+ }
+ synchronized (this) {
+ super.allKeys.clear();
+ super.allKeys.putAll(keys);
+ }
+ return keys;
+ }
+
+ @Override
+ public byte[] retrievePassword(DelegationTokenIdentifier identifier) throws InvalidToken {
+ DelegationTokenInformation info = this.tokenStore.getToken(identifier);
+ if (info == null) {
+ throw new InvalidToken("token expired or does not exist: " + identifier);
+ }
+ // must reuse super as info.getPassword is not accessible
+ synchronized (this) {
+ try {
+ super.currentTokens.put(identifier, info);
+ return super.retrievePassword(identifier);
+ } finally {
+ super.currentTokens.remove(identifier);
+ }
+ }
+ }
+
+ @Override
+ public DelegationTokenIdentifier cancelToken(Token<DelegationTokenIdentifier> token,
+ String canceller) throws IOException {
+ DelegationTokenIdentifier id = getTokenIdentifier(token);
+ LOGGER.info("Token cancelation requested for identifier: "+id);
+ this.tokenStore.removeToken(id);
+ return id;
+ }
+
+ /**
+ * Create the password and add it to shared store.
+ */
+ @Override
+ protected byte[] createPassword(DelegationTokenIdentifier id) {
+ byte[] password;
+ DelegationTokenInformation info;
+ synchronized (this) {
+ password = super.createPassword(id);
+ // add new token to shared store
+ // need to persist expiration along with password
+ info = super.currentTokens.remove(id);
+ if (info == null) {
+ throw new IllegalStateException("Failed to retrieve token after creation");
+ }
+ }
+ this.tokenStore.addToken(id, info);
+ return password;
+ }
+
+ @Override
+ public long renewToken(Token<DelegationTokenIdentifier> token,
+ String renewer) throws InvalidToken, IOException {
+ // since renewal is KERBEROS authenticated token may not be cached
+ final DelegationTokenIdentifier id = getTokenIdentifier(token);
+ DelegationTokenInformation tokenInfo = this.tokenStore.getToken(id);
+ if (tokenInfo == null) {
+ throw new InvalidToken("token does not exist: " + id); // no token found
+ }
+ // ensure associated master key is available
+ if (!super.allKeys.containsKey(id.getMasterKeyId())) {
+ LOGGER.info("Unknown master key (id={}), (re)loading keys from token store.",
+ id.getMasterKeyId());
+ reloadKeys();
+ }
+ // reuse super renewal logic
+ synchronized (this) {
+ super.currentTokens.put(id, tokenInfo);
+ try {
+ return super.renewToken(token, renewer);
+ } finally {
+ super.currentTokens.remove(id);
+ }
+ }
+ }
+
+ public static String encodeWritable(Writable key) throws IOException {
+ ByteArrayOutputStream bos = new ByteArrayOutputStream();
+ DataOutputStream dos = new DataOutputStream(bos);
+ key.write(dos);
+ dos.flush();
+ return Base64.encodeBase64URLSafeString(bos.toByteArray());
+ }
+
+ public static void decodeWritable(Writable w, String idStr) throws IOException {
+ DataInputStream in = new DataInputStream(new ByteArrayInputStream(Base64.decodeBase64(idStr)));
+ w.readFields(in);
+ }
+
+ /**
+ * Synchronize master key updates / sequence generation for multiple nodes.
+ * NOTE: {@Link AbstractDelegationTokenSecretManager} keeps currentKey private, so we need
+ * to utilize this "hook" to manipulate the key through the object reference.
+ * This .20S workaround should cease to exist when Hadoop supports token store.
+ */
+ @Override
+ protected void logUpdateMasterKey(DelegationKey key) throws IOException {
+ int keySeq = this.tokenStore.addMasterKey(encodeWritable(key));
+ // update key with assigned identifier
+ DelegationKey keyWithSeq = new DelegationKey(keySeq, key.getExpiryDate(), key.getKey());
+ String keyStr = encodeWritable(keyWithSeq);
+ this.tokenStore.updateMasterKey(keySeq, keyStr);
+ decodeWritable(key, keyStr);
+ LOGGER.info("New master key with key id={}", key.getKeyId());
+ super.logUpdateMasterKey(key);
+ }
+
+ @Override
+ public synchronized void startThreads() throws IOException {
+ try {
+ // updateCurrentKey needs to be called to initialize the master key
+ // (there should be a null check added in the future in rollMasterKey)
+ // updateCurrentKey();
+ Method m = AbstractDelegationTokenSecretManager.class.getDeclaredMethod("updateCurrentKey");
+ m.setAccessible(true);
+ m.invoke(this);
+ } catch (Exception e) {
+ throw new IOException("Failed to initialize master key", e);
+ }
+ running = true;
+ tokenRemoverThread = new Daemon(new ExpiredTokenRemover());
+ tokenRemoverThread.start();
+ }
+
+ @Override
+ public synchronized void stopThreads() {
+ if (LOGGER.isDebugEnabled()) {
+ LOGGER.debug("Stopping expired delegation token remover thread");
+ }
+ running = false;
+ if (tokenRemoverThread != null) {
+ tokenRemoverThread.interrupt();
+ }
+ }
+
+ /**
+ * Remove expired tokens. Replaces logic in {@link AbstractDelegationTokenSecretManager}
+ * that cannot be reused due to private method access. Logic here can more efficiently
+ * deal with external token store by only loading into memory the minimum data needed.
+ */
+ protected void removeExpiredTokens() {
+ long now = System.currentTimeMillis();
+ Iterator<DelegationTokenIdentifier> i = tokenStore.getAllDelegationTokenIdentifiers()
+ .iterator();
+ while (i.hasNext()) {
+ DelegationTokenIdentifier id = i.next();
+ if (now > id.getMaxDate()) {
+ this.tokenStore.removeToken(id); // no need to look at token info
+ } else {
+ // get token info to check renew date
+ DelegationTokenInformation tokenInfo = tokenStore.getToken(id);
+ if (tokenInfo != null) {
+ if (now > tokenInfo.getRenewDate()) {
+ this.tokenStore.removeToken(id);
+ }
+ }
+ }
+ }
+ }
+
+ /**
+ * Extension of rollMasterKey to remove expired keys from store.
+ *
+ * @throws IOException
+ */
+ protected void rollMasterKeyExt() throws IOException {
+ Map<Integer, DelegationKey> keys = reloadKeys();
+ int currentKeyId = super.currentId;
+ HiveDelegationTokenSupport.rollMasterKey(TokenStoreDelegationTokenSecretManager.this);
+ List<DelegationKey> keysAfterRoll = Arrays.asList(getAllKeys());
+ for (DelegationKey key : keysAfterRoll) {
+ keys.remove(key.getKeyId());
+ if (key.getKeyId() == currentKeyId) {
+ tokenStore.updateMasterKey(currentKeyId, encodeWritable(key));
+ }
+ }
+ for (DelegationKey expiredKey : keys.values()) {
+ LOGGER.info("Removing expired key id={}", expiredKey.getKeyId());
+ try {
+ tokenStore.removeMasterKey(expiredKey.getKeyId());
+ } catch (Exception e) {
+ LOGGER.error("Error removing expired key id={}", expiredKey.getKeyId(), e);
+ }
+ }
+ }
+
+ /**
+ * Cloned from {@link AbstractDelegationTokenSecretManager} to deal with private access
+ * restriction (there would not be an need to clone the remove thread if the remove logic was
+ * protected/extensible).
+ */
+ protected class ExpiredTokenRemover extends Thread {
+ private long lastMasterKeyUpdate;
+ private long lastTokenCacheCleanup;
+
+ @Override
+ public void run() {
+ LOGGER.info("Starting expired delegation token remover thread, "
+ + "tokenRemoverScanInterval=" + tokenRemoverScanInterval
+ / (60 * 1000) + " min(s)");
+ try {
+ while (running) {
+ long now = System.currentTimeMillis();
+ if (lastMasterKeyUpdate + keyUpdateInterval < now) {
+ try {
+ rollMasterKeyExt();
+ lastMasterKeyUpdate = now;
+ } catch (IOException e) {
+ LOGGER.error("Master key updating failed. "
+ + StringUtils.stringifyException(e));
+ }
+ }
+ if (lastTokenCacheCleanup + tokenRemoverScanInterval < now) {
+ removeExpiredTokens();
+ lastTokenCacheCleanup = now;
+ }
+ try {
+ Thread.sleep(5000); // 5 seconds
+ } catch (InterruptedException ie) {
+ LOGGER
+ .error("InterruptedExcpetion recieved for ExpiredTokenRemover thread "
+ + ie);
+ }
+ }
+ } catch (Throwable t) {
+ LOGGER.error("ExpiredTokenRemover thread received unexpected exception. "
+ + t, t);
+ Runtime.getRuntime().exit(-1);
+ }
+ }
+ }
+
+}
Added: hive/trunk/shims/common/src/main/java/org/apache/hadoop/hive/thrift/client/TUGIAssumingTransport.java
URL: http://svn.apache.org/viewvc/hive/trunk/shims/common/src/main/java/org/apache/hadoop/hive/thrift/client/TUGIAssumingTransport.java?rev=1641980&view=auto
==============================================================================
--- hive/trunk/shims/common/src/main/java/org/apache/hadoop/hive/thrift/client/TUGIAssumingTransport.java (added)
+++ hive/trunk/shims/common/src/main/java/org/apache/hadoop/hive/thrift/client/TUGIAssumingTransport.java Thu Nov 27 01:07:32 2014
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.thrift.client;
+
+import java.io.IOException;
+import java.security.PrivilegedExceptionAction;
+
+import org.apache.hadoop.hive.thrift.TFilterTransport;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.thrift.transport.TTransport;
+import org.apache.thrift.transport.TTransportException;
+
+/**
+ * The Thrift SASL transports call Sasl.createSaslServer and Sasl.createSaslClient
+ * inside open(). So, we need to assume the correct UGI when the transport is opened
+ * so that the SASL mechanisms have access to the right principal. This transport
+ * wraps the Sasl transports to set up the right UGI context for open().
+ *
+ * This is used on the client side, where the API explicitly opens a transport to
+ * the server.
+ */
+ public class TUGIAssumingTransport extends TFilterTransport {
+ protected UserGroupInformation ugi;
+
+ public TUGIAssumingTransport(TTransport wrapped, UserGroupInformation ugi) {
+ super(wrapped);
+ this.ugi = ugi;
+ }
+
+ @Override
+ public void open() throws TTransportException {
+ try {
+ ugi.doAs(new PrivilegedExceptionAction<Void>() {
+ public Void run() {
+ try {
+ wrapped.open();
+ } catch (TTransportException tte) {
+ // Wrap the transport exception in an RTE, since UGI.doAs() then goes
+ // and unwraps this for us out of the doAs block. We then unwrap one
+ // more time in our catch clause to get back the TTE. (ugh)
+ throw new RuntimeException(tte);
+ }
+ return null;
+ }
+ });
+ } catch (IOException ioe) {
+ throw new RuntimeException("Received an ioe we never threw!", ioe);
+ } catch (InterruptedException ie) {
+ throw new RuntimeException("Received an ie we never threw!", ie);
+ } catch (RuntimeException rte) {
+ if (rte.getCause() instanceof TTransportException) {
+ throw (TTransportException)rte.getCause();
+ } else {
+ throw rte;
+ }
+ }
+ }
+ }
Added: hive/trunk/shims/common/src/main/java/org/apache/hadoop/security/token/delegation/DelegationTokenSelector.java
URL: http://svn.apache.org/viewvc/hive/trunk/shims/common/src/main/java/org/apache/hadoop/security/token/delegation/DelegationTokenSelector.java?rev=1641980&view=auto
==============================================================================
--- hive/trunk/shims/common/src/main/java/org/apache/hadoop/security/token/delegation/DelegationTokenSelector.java (added)
+++ hive/trunk/shims/common/src/main/java/org/apache/hadoop/security/token/delegation/DelegationTokenSelector.java Thu Nov 27 01:07:32 2014
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.security.token.delegation;
+
+import org.apache.hadoop.hive.thrift.DelegationTokenIdentifier;
+import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenSelector;
+
+/**
+ * A delegation token that is specialized for Hive
+ */
+
+public class DelegationTokenSelector
+ extends AbstractDelegationTokenSelector<DelegationTokenIdentifier>{
+
+ public DelegationTokenSelector() {
+ super(DelegationTokenIdentifier.HIVE_DELEGATION_KIND);
+ }
+}
Added: hive/trunk/shims/common/src/main/java/org/apache/hadoop/security/token/delegation/HiveDelegationTokenSupport.java
URL: http://svn.apache.org/viewvc/hive/trunk/shims/common/src/main/java/org/apache/hadoop/security/token/delegation/HiveDelegationTokenSupport.java?rev=1641980&view=auto
==============================================================================
--- hive/trunk/shims/common/src/main/java/org/apache/hadoop/security/token/delegation/HiveDelegationTokenSupport.java (added)
+++ hive/trunk/shims/common/src/main/java/org/apache/hadoop/security/token/delegation/HiveDelegationTokenSupport.java Thu Nov 27 01:07:32 2014
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.security.token.delegation;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+
+import org.apache.hadoop.io.WritableUtils;
+import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenSecretManager.DelegationTokenInformation;
+
+/**
+ * Workaround for serialization of {@link DelegationTokenInformation} through package access.
+ * Future version of Hadoop should add this to DelegationTokenInformation itself.
+ */
+public final class HiveDelegationTokenSupport {
+
+ private HiveDelegationTokenSupport() {}
+
+ public static byte[] encodeDelegationTokenInformation(DelegationTokenInformation token) {
+ try {
+ ByteArrayOutputStream bos = new ByteArrayOutputStream();
+ DataOutputStream out = new DataOutputStream(bos);
+ WritableUtils.writeVInt(out, token.password.length);
+ out.write(token.password);
+ out.writeLong(token.renewDate);
+ out.flush();
+ return bos.toByteArray();
+ } catch (IOException ex) {
+ throw new RuntimeException("Failed to encode token.", ex);
+ }
+ }
+
+ public static DelegationTokenInformation decodeDelegationTokenInformation(byte[] tokenBytes)
+ throws IOException {
+ DataInputStream in = new DataInputStream(new ByteArrayInputStream(tokenBytes));
+ DelegationTokenInformation token = new DelegationTokenInformation(0, null);
+ int len = WritableUtils.readVInt(in);
+ token.password = new byte[len];
+ in.readFully(token.password);
+ token.renewDate = in.readLong();
+ return token;
+ }
+
+ public static void rollMasterKey(
+ AbstractDelegationTokenSecretManager<? extends AbstractDelegationTokenIdentifier> mgr)
+ throws IOException {
+ mgr.rollMasterKey();
+ }
+
+}
Modified: hive/trunk/shims/pom.xml
URL: http://svn.apache.org/viewvc/hive/trunk/shims/pom.xml?rev=1641980&r1=1641979&r2=1641980&view=diff
==============================================================================
--- hive/trunk/shims/pom.xml (original)
+++ hive/trunk/shims/pom.xml Thu Nov 27 01:07:32 2014
@@ -33,7 +33,6 @@
<modules>
<module>common</module>
- <module>0.20</module>
<module>common-secure</module>
<module>0.20S</module>
<module>0.23</module>