You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ha...@apache.org on 2014/11/27 02:07:35 UTC

svn commit: r1641980 [4/4] - in /hive/trunk: ./ beeline/src/test/org/apache/hive/beeline/ common/src/java/org/apache/hadoop/hive/common/ common/src/java/org/apache/hadoop/hive/conf/ hcatalog/core/src/main/java/org/apache/hive/hcatalog/common/ hcatalog/...

Added: hive/trunk/shims/common/src/main/java/org/apache/hadoop/hive/thrift/TokenStoreDelegationTokenSecretManager.java
URL: http://svn.apache.org/viewvc/hive/trunk/shims/common/src/main/java/org/apache/hadoop/hive/thrift/TokenStoreDelegationTokenSecretManager.java?rev=1641980&view=auto
==============================================================================
--- hive/trunk/shims/common/src/main/java/org/apache/hadoop/hive/thrift/TokenStoreDelegationTokenSecretManager.java (added)
+++ hive/trunk/shims/common/src/main/java/org/apache/hadoop/hive/thrift/TokenStoreDelegationTokenSecretManager.java Thu Nov 27 01:07:32 2014
@@ -0,0 +1,338 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.thrift;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.lang.reflect.Method;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.commons.codec.binary.Base64;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenSecretManager;
+import org.apache.hadoop.security.token.delegation.DelegationKey;
+import org.apache.hadoop.security.token.delegation.HiveDelegationTokenSupport;
+import org.apache.hadoop.util.Daemon;
+import org.apache.hadoop.util.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Extension of {@link DelegationTokenSecretManager} to support alternative to default in-memory
+ * token management for fail-over and clustering through plug-able token store (ZooKeeper etc.).
+ * Delegation tokens will be retrieved from the store on-demand and (unlike base class behavior) not
+ * cached in memory. This avoids complexities related to token expiration. The security token is
+ * needed only at the time the transport is opened (as opposed to per interface operation). The
+ * assumption therefore is low cost of interprocess token retrieval (for random read efficient store
+ * such as ZooKeeper) compared to overhead of synchronizing per-process in-memory token caches.
+ * The wrapper incorporates the token store abstraction within the limitations of current
+ * Hive/Hadoop dependency (.20S) with minimum code duplication.
+ * Eventually this should be supported by Hadoop security directly.
+ */
+public class TokenStoreDelegationTokenSecretManager extends DelegationTokenSecretManager {
+
+  private static final Logger LOGGER =
+      LoggerFactory.getLogger(TokenStoreDelegationTokenSecretManager.class.getName());
+
+  final private long keyUpdateInterval;
+  final private long tokenRemoverScanInterval;
+  private Thread tokenRemoverThread;
+
+  final private DelegationTokenStore tokenStore;
+
+  public TokenStoreDelegationTokenSecretManager(long delegationKeyUpdateInterval,
+      long delegationTokenMaxLifetime, long delegationTokenRenewInterval,
+      long delegationTokenRemoverScanInterval,
+      DelegationTokenStore sharedStore) {
+    super(delegationKeyUpdateInterval, delegationTokenMaxLifetime, delegationTokenRenewInterval,
+        delegationTokenRemoverScanInterval);
+    this.keyUpdateInterval = delegationKeyUpdateInterval;
+    this.tokenRemoverScanInterval = delegationTokenRemoverScanInterval;
+
+    this.tokenStore = sharedStore;
+  }
+
+  protected DelegationTokenIdentifier getTokenIdentifier(Token<DelegationTokenIdentifier> token)
+      throws IOException {
+    // turn bytes back into identifier for cache lookup
+    ByteArrayInputStream buf = new ByteArrayInputStream(token.getIdentifier());
+    DataInputStream in = new DataInputStream(buf);
+    DelegationTokenIdentifier id = createIdentifier();
+    id.readFields(in);
+    return id;
+  }
+
+  protected Map<Integer, DelegationKey> reloadKeys() {
+    // read keys from token store
+    String[] allKeys = tokenStore.getMasterKeys();
+    Map<Integer, DelegationKey> keys
+        = new HashMap<Integer, DelegationKey>(allKeys.length);
+    for (String keyStr : allKeys) {
+      DelegationKey key = new DelegationKey();
+      try {
+        decodeWritable(key, keyStr);
+        keys.put(key.getKeyId(), key);
+      } catch (IOException ex) {
+        LOGGER.error("Failed to load master key.", ex);
+      }
+    }
+    synchronized (this) {
+        super.allKeys.clear();
+        super.allKeys.putAll(keys);
+    }
+    return keys;
+  }
+
+  @Override
+  public byte[] retrievePassword(DelegationTokenIdentifier identifier) throws InvalidToken {
+      DelegationTokenInformation info = this.tokenStore.getToken(identifier);
+      if (info == null) {
+          throw new InvalidToken("token expired or does not exist: " + identifier);
+      }
+      // must reuse super as info.getPassword is not accessible
+      synchronized (this) {
+        try {
+          super.currentTokens.put(identifier, info);
+          return super.retrievePassword(identifier);
+        } finally {
+          super.currentTokens.remove(identifier);
+        }
+      }
+  }
+
+  @Override
+  public DelegationTokenIdentifier cancelToken(Token<DelegationTokenIdentifier> token,
+      String canceller) throws IOException {
+    DelegationTokenIdentifier id = getTokenIdentifier(token);
+    LOGGER.info("Token cancelation requested for identifier: "+id);
+    this.tokenStore.removeToken(id);
+    return id;
+  }
+
+  /**
+   * Create the password and add it to shared store.
+   */
+  @Override
+  protected byte[] createPassword(DelegationTokenIdentifier id) {
+    byte[] password;
+    DelegationTokenInformation info;
+    synchronized (this) {
+      password = super.createPassword(id);
+      // add new token to shared store
+      // need to persist expiration along with password
+      info = super.currentTokens.remove(id);
+      if (info == null) {
+        throw new IllegalStateException("Failed to retrieve token after creation");
+      }
+    }
+    this.tokenStore.addToken(id, info);
+    return password;
+  }
+
+  @Override
+  public long renewToken(Token<DelegationTokenIdentifier> token,
+      String renewer) throws InvalidToken, IOException {
+    // since renewal is KERBEROS authenticated token may not be cached
+    final DelegationTokenIdentifier id = getTokenIdentifier(token);
+    DelegationTokenInformation tokenInfo = this.tokenStore.getToken(id);
+    if (tokenInfo == null) {
+        throw new InvalidToken("token does not exist: " + id); // no token found
+    }
+    // ensure associated master key is available
+    if (!super.allKeys.containsKey(id.getMasterKeyId())) {
+      LOGGER.info("Unknown master key (id={}), (re)loading keys from token store.",
+        id.getMasterKeyId());
+      reloadKeys();
+    }
+    // reuse super renewal logic
+    synchronized (this) {
+      super.currentTokens.put(id,  tokenInfo);
+      try {
+        return super.renewToken(token, renewer);
+      } finally {
+        super.currentTokens.remove(id);
+      }
+    }
+  }
+
+  public static String encodeWritable(Writable key) throws IOException {
+    ByteArrayOutputStream bos = new ByteArrayOutputStream();
+    DataOutputStream dos = new DataOutputStream(bos);
+    key.write(dos);
+    dos.flush();
+    return Base64.encodeBase64URLSafeString(bos.toByteArray());
+  }
+
+  public static void decodeWritable(Writable w, String idStr) throws IOException {
+    DataInputStream in = new DataInputStream(new ByteArrayInputStream(Base64.decodeBase64(idStr)));
+    w.readFields(in);
+  }
+
+  /**
+   * Synchronize master key updates / sequence generation for multiple nodes.
+   * NOTE: {@Link AbstractDelegationTokenSecretManager} keeps currentKey private, so we need
+   * to utilize this "hook" to manipulate the key through the object reference.
+   * This .20S workaround should cease to exist when Hadoop supports token store.
+   */
+  @Override
+  protected void logUpdateMasterKey(DelegationKey key) throws IOException {
+    int keySeq = this.tokenStore.addMasterKey(encodeWritable(key));
+    // update key with assigned identifier
+    DelegationKey keyWithSeq = new DelegationKey(keySeq, key.getExpiryDate(), key.getKey());
+    String keyStr = encodeWritable(keyWithSeq);
+    this.tokenStore.updateMasterKey(keySeq, keyStr);
+    decodeWritable(key, keyStr);
+    LOGGER.info("New master key with key id={}", key.getKeyId());
+    super.logUpdateMasterKey(key);
+  }
+
+  @Override
+  public synchronized void startThreads() throws IOException {
+    try {
+      // updateCurrentKey needs to be called to initialize the master key
+      // (there should be a null check added in the future in rollMasterKey)
+      // updateCurrentKey();
+      Method m = AbstractDelegationTokenSecretManager.class.getDeclaredMethod("updateCurrentKey");
+      m.setAccessible(true);
+      m.invoke(this);
+    } catch (Exception e) {
+      throw new IOException("Failed to initialize master key", e);
+    }
+    running = true;
+    tokenRemoverThread = new Daemon(new ExpiredTokenRemover());
+    tokenRemoverThread.start();
+  }
+
+  @Override
+  public synchronized void stopThreads() {
+    if (LOGGER.isDebugEnabled()) {
+      LOGGER.debug("Stopping expired delegation token remover thread");
+    }
+    running = false;
+    if (tokenRemoverThread != null) {
+      tokenRemoverThread.interrupt();
+    }
+  }
+
+  /**
+   * Remove expired tokens. Replaces logic in {@link AbstractDelegationTokenSecretManager}
+   * that cannot be reused due to private method access. Logic here can more efficiently
+   * deal with external token store by only loading into memory the minimum data needed.
+   */
+  protected void removeExpiredTokens() {
+    long now = System.currentTimeMillis();
+    Iterator<DelegationTokenIdentifier> i = tokenStore.getAllDelegationTokenIdentifiers()
+        .iterator();
+    while (i.hasNext()) {
+      DelegationTokenIdentifier id = i.next();
+      if (now > id.getMaxDate()) {
+        this.tokenStore.removeToken(id); // no need to look at token info
+      } else {
+        // get token info to check renew date
+        DelegationTokenInformation tokenInfo = tokenStore.getToken(id);
+        if (tokenInfo != null) {
+          if (now > tokenInfo.getRenewDate()) {
+            this.tokenStore.removeToken(id);
+          }
+        }
+      }
+    }
+  }
+
+  /**
+   * Extension of rollMasterKey to remove expired keys from store.
+   *
+   * @throws IOException
+   */
+  protected void rollMasterKeyExt() throws IOException {
+    Map<Integer, DelegationKey> keys = reloadKeys();
+    int currentKeyId = super.currentId;
+    HiveDelegationTokenSupport.rollMasterKey(TokenStoreDelegationTokenSecretManager.this);
+    List<DelegationKey> keysAfterRoll = Arrays.asList(getAllKeys());
+    for (DelegationKey key : keysAfterRoll) {
+      keys.remove(key.getKeyId());
+      if (key.getKeyId() == currentKeyId) {
+        tokenStore.updateMasterKey(currentKeyId, encodeWritable(key));
+      }
+    }
+    for (DelegationKey expiredKey : keys.values()) {
+      LOGGER.info("Removing expired key id={}", expiredKey.getKeyId());
+      try {
+        tokenStore.removeMasterKey(expiredKey.getKeyId());
+      } catch (Exception e) {
+        LOGGER.error("Error removing expired key id={}", expiredKey.getKeyId(), e);
+      }
+    }
+  }
+
+  /**
+   * Cloned from {@link AbstractDelegationTokenSecretManager} to deal with private access
+   * restriction (there would not be an need to clone the remove thread if the remove logic was
+   * protected/extensible).
+   */
+  protected class ExpiredTokenRemover extends Thread {
+    private long lastMasterKeyUpdate;
+    private long lastTokenCacheCleanup;
+
+    @Override
+    public void run() {
+      LOGGER.info("Starting expired delegation token remover thread, "
+          + "tokenRemoverScanInterval=" + tokenRemoverScanInterval
+          / (60 * 1000) + " min(s)");
+      try {
+        while (running) {
+          long now = System.currentTimeMillis();
+          if (lastMasterKeyUpdate + keyUpdateInterval < now) {
+            try {
+              rollMasterKeyExt();
+              lastMasterKeyUpdate = now;
+            } catch (IOException e) {
+              LOGGER.error("Master key updating failed. "
+                  + StringUtils.stringifyException(e));
+            }
+          }
+          if (lastTokenCacheCleanup + tokenRemoverScanInterval < now) {
+            removeExpiredTokens();
+            lastTokenCacheCleanup = now;
+          }
+          try {
+            Thread.sleep(5000); // 5 seconds
+          } catch (InterruptedException ie) {
+            LOGGER
+            .error("InterruptedExcpetion recieved for ExpiredTokenRemover thread "
+                + ie);
+          }
+        }
+      } catch (Throwable t) {
+        LOGGER.error("ExpiredTokenRemover thread received unexpected exception. "
+            + t, t);
+        Runtime.getRuntime().exit(-1);
+      }
+    }
+  }
+
+}

Added: hive/trunk/shims/common/src/main/java/org/apache/hadoop/hive/thrift/client/TUGIAssumingTransport.java
URL: http://svn.apache.org/viewvc/hive/trunk/shims/common/src/main/java/org/apache/hadoop/hive/thrift/client/TUGIAssumingTransport.java?rev=1641980&view=auto
==============================================================================
--- hive/trunk/shims/common/src/main/java/org/apache/hadoop/hive/thrift/client/TUGIAssumingTransport.java (added)
+++ hive/trunk/shims/common/src/main/java/org/apache/hadoop/hive/thrift/client/TUGIAssumingTransport.java Thu Nov 27 01:07:32 2014
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.thrift.client;
+
+import java.io.IOException;
+import java.security.PrivilegedExceptionAction;
+
+import org.apache.hadoop.hive.thrift.TFilterTransport;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.thrift.transport.TTransport;
+import org.apache.thrift.transport.TTransportException;
+
+/**
+  * The Thrift SASL transports call Sasl.createSaslServer and Sasl.createSaslClient
+  * inside open(). So, we need to assume the correct UGI when the transport is opened
+  * so that the SASL mechanisms have access to the right principal. This transport
+  * wraps the Sasl transports to set up the right UGI context for open().
+  *
+  * This is used on the client side, where the API explicitly opens a transport to
+  * the server.
+  */
+ public class TUGIAssumingTransport extends TFilterTransport {
+   protected UserGroupInformation ugi;
+
+   public TUGIAssumingTransport(TTransport wrapped, UserGroupInformation ugi) {
+     super(wrapped);
+     this.ugi = ugi;
+   }
+
+   @Override
+   public void open() throws TTransportException {
+     try {
+       ugi.doAs(new PrivilegedExceptionAction<Void>() {
+         public Void run() {
+           try {
+             wrapped.open();
+           } catch (TTransportException tte) {
+             // Wrap the transport exception in an RTE, since UGI.doAs() then goes
+             // and unwraps this for us out of the doAs block. We then unwrap one
+             // more time in our catch clause to get back the TTE. (ugh)
+             throw new RuntimeException(tte);
+           }
+           return null;
+         }
+       });
+     } catch (IOException ioe) {
+       throw new RuntimeException("Received an ioe we never threw!", ioe);
+     } catch (InterruptedException ie) {
+       throw new RuntimeException("Received an ie we never threw!", ie);
+     } catch (RuntimeException rte) {
+       if (rte.getCause() instanceof TTransportException) {
+         throw (TTransportException)rte.getCause();
+       } else {
+         throw rte;
+       }
+     }
+   }
+ }

Added: hive/trunk/shims/common/src/main/java/org/apache/hadoop/security/token/delegation/DelegationTokenSelector.java
URL: http://svn.apache.org/viewvc/hive/trunk/shims/common/src/main/java/org/apache/hadoop/security/token/delegation/DelegationTokenSelector.java?rev=1641980&view=auto
==============================================================================
--- hive/trunk/shims/common/src/main/java/org/apache/hadoop/security/token/delegation/DelegationTokenSelector.java (added)
+++ hive/trunk/shims/common/src/main/java/org/apache/hadoop/security/token/delegation/DelegationTokenSelector.java Thu Nov 27 01:07:32 2014
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.security.token.delegation;
+
+import org.apache.hadoop.hive.thrift.DelegationTokenIdentifier;
+import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenSelector;
+
+/**
+ * A delegation token that is specialized for Hive
+ */
+
+public class DelegationTokenSelector
+    extends AbstractDelegationTokenSelector<DelegationTokenIdentifier>{
+
+  public DelegationTokenSelector() {
+    super(DelegationTokenIdentifier.HIVE_DELEGATION_KIND);
+  }
+}

Added: hive/trunk/shims/common/src/main/java/org/apache/hadoop/security/token/delegation/HiveDelegationTokenSupport.java
URL: http://svn.apache.org/viewvc/hive/trunk/shims/common/src/main/java/org/apache/hadoop/security/token/delegation/HiveDelegationTokenSupport.java?rev=1641980&view=auto
==============================================================================
--- hive/trunk/shims/common/src/main/java/org/apache/hadoop/security/token/delegation/HiveDelegationTokenSupport.java (added)
+++ hive/trunk/shims/common/src/main/java/org/apache/hadoop/security/token/delegation/HiveDelegationTokenSupport.java Thu Nov 27 01:07:32 2014
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.security.token.delegation;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+
+import org.apache.hadoop.io.WritableUtils;
+import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenSecretManager.DelegationTokenInformation;
+
+/**
+ * Workaround for serialization of {@link DelegationTokenInformation} through package access.
+ * Future version of Hadoop should add this to DelegationTokenInformation itself.
+ */
+public final class HiveDelegationTokenSupport {
+
+  private HiveDelegationTokenSupport() {}
+
+  public static byte[] encodeDelegationTokenInformation(DelegationTokenInformation token) {
+    try {
+      ByteArrayOutputStream bos = new ByteArrayOutputStream();
+      DataOutputStream out = new DataOutputStream(bos);
+      WritableUtils.writeVInt(out, token.password.length);
+      out.write(token.password);
+      out.writeLong(token.renewDate);
+      out.flush();
+      return bos.toByteArray();
+    } catch (IOException ex) {
+      throw new RuntimeException("Failed to encode token.", ex);
+    }
+  }
+
+  public static DelegationTokenInformation decodeDelegationTokenInformation(byte[] tokenBytes)
+      throws IOException {
+    DataInputStream in = new DataInputStream(new ByteArrayInputStream(tokenBytes));
+    DelegationTokenInformation token = new DelegationTokenInformation(0, null);
+    int len = WritableUtils.readVInt(in);
+    token.password = new byte[len];
+    in.readFully(token.password);
+    token.renewDate = in.readLong();
+    return token;
+  }
+
+  public static void rollMasterKey(
+      AbstractDelegationTokenSecretManager<? extends AbstractDelegationTokenIdentifier> mgr)
+      throws IOException {
+    mgr.rollMasterKey();
+  }
+
+}

Modified: hive/trunk/shims/pom.xml
URL: http://svn.apache.org/viewvc/hive/trunk/shims/pom.xml?rev=1641980&r1=1641979&r2=1641980&view=diff
==============================================================================
--- hive/trunk/shims/pom.xml (original)
+++ hive/trunk/shims/pom.xml Thu Nov 27 01:07:32 2014
@@ -33,7 +33,6 @@
 
   <modules>
     <module>common</module>
-    <module>0.20</module>
     <module>common-secure</module>
     <module>0.20S</module>
     <module>0.23</module>