You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by na...@apache.org on 2011/01/19 18:28:30 UTC

svn commit: r1060876 [4/4] - in /hive/trunk: ./ metastore/if/ metastore/src/gen/thrift/gen-cpp/ metastore/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/ metastore/src/gen/thrift/gen-php/hive_metastore/ metastore/src/gen/thrift/gen-py...

Added: hive/trunk/shims/src/0.20S/java/org/apache/hadoop/hive/thrift/DelegationTokenSecretManager.java
URL: http://svn.apache.org/viewvc/hive/trunk/shims/src/0.20S/java/org/apache/hadoop/hive/thrift/DelegationTokenSecretManager.java?rev=1060876&view=auto
==============================================================================
--- hive/trunk/shims/src/0.20S/java/org/apache/hadoop/hive/thrift/DelegationTokenSecretManager.java (added)
+++ hive/trunk/shims/src/0.20S/java/org/apache/hadoop/hive/thrift/DelegationTokenSecretManager.java Wed Jan 19 17:28:28 2011
@@ -0,0 +1,94 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.thrift;
+
+import java.io.IOException;
+
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenSecretManager;
+
+/**
+ * A Hive specific delegation token secret manager.
+ * The secret manager is responsible for generating and accepting the password
+ * for each token.
+ */
+public class DelegationTokenSecretManager
+    extends AbstractDelegationTokenSecretManager<DelegationTokenIdentifier> {
+
+  /**
+   * Create a secret manager
+   * @param delegationKeyUpdateInterval the number of seconds for rolling new
+   *        secret keys.
+   * @param delegationTokenMaxLifetime the maximum lifetime of the delegation
+   *        tokens
+   * @param delegationTokenRenewInterval how often the tokens must be renewed
+   * @param delegationTokenRemoverScanInterval how often the tokens are scanned
+   *        for expired tokens
+   */
+  public DelegationTokenSecretManager(long delegationKeyUpdateInterval,
+                                      long delegationTokenMaxLifetime,
+                                      long delegationTokenRenewInterval,
+                                      long delegationTokenRemoverScanInterval) {
+    super(delegationKeyUpdateInterval, delegationTokenMaxLifetime,
+          delegationTokenRenewInterval, delegationTokenRemoverScanInterval);
+  }
+
+  @Override
+  public DelegationTokenIdentifier createIdentifier() {
+    return new DelegationTokenIdentifier();
+  }
+
+  public synchronized void cancelDelegationToken(String tokenStrForm) throws IOException {
+    Token<DelegationTokenIdentifier> t= new Token<DelegationTokenIdentifier>();
+    t.decodeFromUrlString(tokenStrForm);
+    String user = UserGroupInformation.getCurrentUser().getUserName();
+    cancelToken(t, user);
+  }
+
+  public synchronized long renewDelegationToken(String tokenStrForm) throws IOException {
+    Token<DelegationTokenIdentifier> t= new Token<DelegationTokenIdentifier>();
+    t.decodeFromUrlString(tokenStrForm);
+    String user = UserGroupInformation.getCurrentUser().getUserName();
+    return renewToken(t, user);
+  }
+
+  public synchronized String getDelegationToken(String renewer, String tokenSignature) throws IOException {
+    UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
+    Text owner = new Text(ugi.getUserName());
+    Text realUser = null;
+    if (ugi.getRealUser() != null) {
+      realUser = new Text(ugi.getRealUser().getUserName());
+    }
+    DelegationTokenIdentifier ident =
+      new DelegationTokenIdentifier(owner, new Text(renewer), realUser);
+    Token<DelegationTokenIdentifier> t = new Token<DelegationTokenIdentifier>(
+        ident, this);
+    if(tokenSignature != null) {
+      t.setService(new Text(tokenSignature));
+    }
+    return t.encodeToUrlString();
+  }
+
+  public synchronized String getDelegationToken(String renewer) throws IOException {
+    return getDelegationToken(renewer, null);
+  }
+}
+

Added: hive/trunk/shims/src/0.20S/java/org/apache/hadoop/hive/thrift/DelegationTokenSelector.java
URL: http://svn.apache.org/viewvc/hive/trunk/shims/src/0.20S/java/org/apache/hadoop/hive/thrift/DelegationTokenSelector.java?rev=1060876&view=auto
==============================================================================
--- hive/trunk/shims/src/0.20S/java/org/apache/hadoop/hive/thrift/DelegationTokenSelector.java (added)
+++ hive/trunk/shims/src/0.20S/java/org/apache/hadoop/hive/thrift/DelegationTokenSelector.java Wed Jan 19 17:28:28 2011
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.thrift;
+
+import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenSelector;
+
+/**
+ * A delegation token that is specialized for Hive
+ */
+
+public class DelegationTokenSelector
+    extends AbstractDelegationTokenSelector<DelegationTokenIdentifier>{
+
+  public DelegationTokenSelector() {
+    super(DelegationTokenIdentifier.HIVE_DELEGATION_KIND);
+  }
+}

Modified: hive/trunk/shims/src/0.20S/java/org/apache/hadoop/hive/thrift/HadoopThriftAuthBridge20S.java
URL: http://svn.apache.org/viewvc/hive/trunk/shims/src/0.20S/java/org/apache/hadoop/hive/thrift/HadoopThriftAuthBridge20S.java?rev=1060876&r1=1060875&r2=1060876&view=diff
==============================================================================
--- hive/trunk/shims/src/0.20S/java/org/apache/hadoop/hive/thrift/HadoopThriftAuthBridge20S.java (original)
+++ hive/trunk/shims/src/0.20S/java/org/apache/hadoop/hive/thrift/HadoopThriftAuthBridge20S.java Wed Jan 19 17:28:28 2011
@@ -15,21 +15,34 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
 package org.apache.hadoop.hive.thrift;
 
- import java.io.IOException;
+import java.io.IOException;
 import java.security.PrivilegedAction;
 import java.security.PrivilegedExceptionAction;
 
+import javax.security.auth.callback.Callback;
+import javax.security.auth.callback.CallbackHandler;
+import javax.security.auth.callback.NameCallback;
+import javax.security.auth.callback.PasswordCallback;
+import javax.security.auth.callback.UnsupportedCallbackException;
+import javax.security.sasl.AuthorizeCallback;
+import javax.security.sasl.RealmCallback;
+import javax.security.sasl.RealmChoiceCallback;
 import javax.security.sasl.SaslException;
+import javax.security.sasl.SaslServer;
 
+import org.apache.commons.codec.binary.Base64;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.security.SaslRpcServer;
 import org.apache.hadoop.security.SecurityUtil;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.SaslRpcServer.AuthMethod;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.TokenIdentifier;
+import org.apache.hadoop.security.token.SecretManager.InvalidToken;
 import org.apache.thrift.TException;
 import org.apache.thrift.TProcessor;
 import org.apache.thrift.protocol.TProtocol;
@@ -66,14 +79,27 @@ import org.apache.thrift.transport.TTran
       * @param serverPrincipal The Kerberos principal of the target server.
       * @param underlyingTransport The underlying transport mechanism, usually a TSocket.
       */
+
      @Override
      public TTransport createClientTransport(
        String principalConfig, String host,
-       String methodStr, TTransport underlyingTransport)
+        String methodStr, String tokenStrForm, TTransport underlyingTransport)
        throws IOException {
        AuthMethod method = AuthMethod.valueOf(AuthMethod.class, methodStr);
 
+       TTransport saslTransport = null;
        switch (method) {
+         case DIGEST:
+           Token<DelegationTokenIdentifier> t= new Token<DelegationTokenIdentifier>();
+           t.decodeFromUrlString(tokenStrForm);
+           saslTransport = new TSaslClientTransport(
+            method.getMechanismName(),
+            null,
+            null, SaslRpcServer.SASL_DEFAULT_REALM,
+            SaslRpcServer.SASL_PROPS, new SaslClientCallbackHandler(t),
+            underlyingTransport);
+           return new TUGIAssumingTransport(saslTransport, UserGroupInformation.getCurrentUser());
+
          case KERBEROS:
            String serverPrincipal = SecurityUtil.getServerPrincipal(principalConfig, host);
            String names[] = SaslRpcServer.splitKerberosName(serverPrincipal);
@@ -83,7 +109,7 @@ import org.apache.thrift.transport.TTran
                  + serverPrincipal);
            }
            try {
-             TTransport saslTransport = new TSaslClientTransport(
+             saslTransport = new TSaslClientTransport(
                method.getMechanismName(),
                null,
                names[0], names[1],
@@ -95,16 +121,219 @@ import org.apache.thrift.transport.TTran
            }
 
          default:
-           throw new IOException("Unsupported authentication method: " +method);
+        throw new IOException("Unsupported authentication method: " + method);
+       }
+     }
+    private static class SaslClientCallbackHandler implements CallbackHandler {
+      private final String userName;
+      private final char[] userPassword;
+
+      public SaslClientCallbackHandler(Token<? extends TokenIdentifier> token) {
+        this.userName = encodeIdentifier(token.getIdentifier());
+        this.userPassword = encodePassword(token.getPassword());
+      }
+
+      public void handle(Callback[] callbacks)
+      throws UnsupportedCallbackException {
+        NameCallback nc = null;
+        PasswordCallback pc = null;
+        RealmCallback rc = null;
+        for (Callback callback : callbacks) {
+          if (callback instanceof RealmChoiceCallback) {
+            continue;
+          } else if (callback instanceof NameCallback) {
+            nc = (NameCallback) callback;
+          } else if (callback instanceof PasswordCallback) {
+            pc = (PasswordCallback) callback;
+          } else if (callback instanceof RealmCallback) {
+            rc = (RealmCallback) callback;
+          } else {
+            throw new UnsupportedCallbackException(callback,
+                "Unrecognized SASL client callback");
+          }
+        }
+        if (nc != null) {
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("SASL client callback: setting username: " + userName);
+          }
+          nc.setName(userName);
+        }
+        if (pc != null) {
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("SASL client callback: setting userPassword");
+          }
+          pc.setPassword(userPassword);
+        }
+        if (rc != null) {
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("SASL client callback: setting realm: "
+                + rc.getDefaultText());
+          }
+          rc.setText(rc.getDefaultText());
+        }
+      }
+
+      static String encodeIdentifier(byte[] identifier) {
+        return new String(Base64.encodeBase64(identifier));
+      }
+
+      static char[] encodePassword(byte[] password) {
+        return new String(Base64.encodeBase64(password)).toCharArray();
+       }
+     }
+    /**
+      * The Thrift SASL transports call Sasl.createSaslServer and Sasl.createSaslClient
+      * inside open(). So, we need to assume the correct UGI when the transport is opened
+      * so that the SASL mechanisms have access to the right principal. This transport
+      * wraps the Sasl transports to set up the right UGI context for open().
+      *
+      * This is used on the client side, where the API explicitly opens a transport to
+      * the server.
+      */
+     private static class TUGIAssumingTransport extends TFilterTransport {
+       private final UserGroupInformation ugi;
+
+       public TUGIAssumingTransport(TTransport wrapped, UserGroupInformation ugi) {
+         super(wrapped);
+         this.ugi = ugi;
+       }
+
+       @Override
+       public void open() throws TTransportException {
+         try {
+           ugi.doAs(new PrivilegedExceptionAction<Void>() {
+             public Void run() {
+               try {
+                 wrapped.open();
+               } catch (TTransportException tte) {
+                 // Wrap the transport exception in an RTE, since UGI.doAs() then goes
+                 // and unwraps this for us out of the doAs block. We then unwrap one
+                 // more time in our catch clause to get back the TTE. (ugh)
+                 throw new RuntimeException(tte);
+               }
+               return null;
+             }
+           });
+         } catch (IOException ioe) {
+           assert false : "Never thrown!";
+           throw new RuntimeException("Received an ioe we never threw!", ioe);
+         } catch (InterruptedException ie) {
+           assert false : "We never expect to see an InterruptedException thrown in this block";
+           throw new RuntimeException("Received an ie we never threw!", ie);
+         } catch (RuntimeException rte) {
+           if (rte.getCause() instanceof TTransportException) {
+             throw (TTransportException)rte.getCause();
+           } else {
+             throw rte;
+           }
+         }
+       }
+     }
+    /**
+      * Transport that simply wraps another transport.
+      * This is the equivalent of FilterInputStream for Thrift transports.
+      */
+     private static class TFilterTransport extends TTransport {
+       protected final TTransport wrapped;
+
+       public TFilterTransport(TTransport wrapped) {
+         this.wrapped = wrapped;
+       }
+
+       @Override
+       public void open() throws TTransportException {
+         wrapped.open();
+       }
+
+       @Override
+       public boolean isOpen() {
+         return wrapped.isOpen();
+       }
+
+       @Override
+       public boolean peek() {
+         return wrapped.peek();
+       }
+
+       @Override
+       public void close() {
+         wrapped.close();
+       }
+
+       @Override
+       public int read(byte[] buf, int off, int len) throws TTransportException {
+         return wrapped.read(buf, off, len);
+       }
+
+       @Override
+       public int readAll(byte[] buf, int off, int len) throws TTransportException {
+         return wrapped.readAll(buf, off, len);
+       }
+
+       @Override
+       public void write(byte[] buf) throws TTransportException {
+         wrapped.write(buf);
+       }
+
+       @Override
+       public void write(byte[] buf, int off, int len) throws TTransportException {
+         wrapped.write(buf, off, len);
+       }
+
+       @Override
+       public void flush() throws TTransportException {
+         wrapped.flush();
+       }
+
+       @Override
+       public byte[] getBuffer() {
+         return wrapped.getBuffer();
+       }
+
+       @Override
+       public int getBufferPosition() {
+         return wrapped.getBufferPosition();
+       }
+
+       @Override
+       public int getBytesRemainingInBuffer() {
+         return wrapped.getBytesRemainingInBuffer();
+       }
+
+       @Override
+       public void consumeBuffer(int len) {
+         wrapped.consumeBuffer(len);
        }
      }
    }
 
    public static class Server extends HadoopThriftAuthBridge.Server {
-     private final UserGroupInformation realUgi;
+     final UserGroupInformation realUgi;
+     DelegationTokenSecretManager secretManager;
+     private final static long DELEGATION_TOKEN_GC_INTERVAL = 3600000; // 1 hour
+     //Delegation token related keys
+     public static final String  DELEGATION_KEY_UPDATE_INTERVAL_KEY =
+       "hive.cluster.delegation.key.update-interval";
+     public static final long    DELEGATION_KEY_UPDATE_INTERVAL_DEFAULT =
+       24*60*60*1000; // 1 day
+     public static final String  DELEGATION_TOKEN_RENEW_INTERVAL_KEY =
+       "hive.cluster.delegation.token.renew-interval";
+     public static final long    DELEGATION_TOKEN_RENEW_INTERVAL_DEFAULT =
+       24*60*60*1000;  // 1 day
+     public static final String  DELEGATION_TOKEN_MAX_LIFETIME_KEY =
+       "hive.cluster.delegation.token.max-lifetime";
+     public static final long    DELEGATION_TOKEN_MAX_LIFETIME_DEFAULT =
+       7*24*60*60*1000; // 7 days
 
+     public Server() throws TTransportException {
+       try {
+         realUgi = UserGroupInformation.getCurrentUser();
+       } catch (IOException ioe) {
+         throw new TTransportException(ioe);
+       }
+     }
      /**
-      * TODO: javadoc
+      * Create a server with a kerberos keytab/principal.
       */
      private Server(String keytabFile, String principalConf)
        throws TTransportException {
@@ -143,7 +372,7 @@ import org.apache.thrift.transport.TTran
        String kerberosName = realUgi.getUserName();
        final String names[] = SaslRpcServer.splitKerberosName(kerberosName);
        if (names.length != 3) {
-         throw new TTransportException("Kerberos principal should have 3 parts: "  +kerberosName);
+         throw new TTransportException("Kerberos principal should have 3 parts: " + kerberosName);
        }
 
        TSaslServerTransport.Factory transFactory = new TSaslServerTransport.Factory();
@@ -152,6 +381,9 @@ import org.apache.thrift.transport.TTran
          names[0], names[1],  // two parts of kerberos principal
          SaslRpcServer.SASL_PROPS,
          new SaslRpcServer.SaslGssCallbackHandler());
+       transFactory.addServerDefinition(AuthMethod.DIGEST.getMechanismName(),
+          null, SaslRpcServer.SASL_DEFAULT_REALM,
+          SaslRpcServer.SASL_PROPS, new SaslDigestCallbackHandler(secretManager));
 
        return new TUGIAssumingTransportFactory(transFactory, realUgi);
      }
@@ -163,7 +395,123 @@ import org.apache.thrift.transport.TTran
       */
      @Override
      public TProcessor wrapProcessor(TProcessor processor) {
-       return new TUGIAssumingProcessor(processor);
+      return new TUGIAssumingProcessor(processor, secretManager);
+     }
+
+     @Override
+     public void startDelegationTokenSecretManager(Configuration conf)
+     throws IOException{
+       long secretKeyInterval =
+         conf.getLong(DELEGATION_KEY_UPDATE_INTERVAL_KEY,
+                        DELEGATION_KEY_UPDATE_INTERVAL_DEFAULT);
+       long tokenMaxLifetime =
+           conf.getLong(DELEGATION_TOKEN_MAX_LIFETIME_KEY,
+                        DELEGATION_TOKEN_MAX_LIFETIME_DEFAULT);
+       long tokenRenewInterval =
+           conf.getLong(DELEGATION_TOKEN_RENEW_INTERVAL_KEY,
+                        DELEGATION_TOKEN_RENEW_INTERVAL_DEFAULT);
+       secretManager =
+           new DelegationTokenSecretManager(secretKeyInterval,
+                                            tokenMaxLifetime,
+                                            tokenRenewInterval,
+                                            DELEGATION_TOKEN_GC_INTERVAL);
+       secretManager.startThreads();
+     }
+
+     @Override
+     public String getDelegationToken(String renewer) throws IOException {
+       return secretManager.getDelegationToken(renewer);
+     }
+
+     @Override
+     public long renewDelegationToken(String tokenStrForm) throws IOException {
+       return secretManager.renewDelegationToken(tokenStrForm);
+     }
+
+     @Override
+     public String getDelegationToken(String renewer, String token_signature)
+     throws IOException {
+       return secretManager.getDelegationToken(renewer, token_signature);
+     }
+
+     @Override
+     public void cancelDelegationToken(String tokenStrForm) throws IOException {
+       secretManager.cancelDelegationToken(tokenStrForm);
+     }
+
+
+    /** CallbackHandler for SASL DIGEST-MD5 mechanism */
+    // This code is pretty much completely based on Hadoop's
+    // SaslRpcServer.SaslDigestCallbackHandler - the only reason we could not
+    // use that Hadoop class as-is was because it needs a Server.Connection object
+    // which is relevant in hadoop rpc but not here in the metastore - so the
+    // code below does not deal with the Connection Server.object.
+    static class SaslDigestCallbackHandler implements CallbackHandler {
+      private final DelegationTokenSecretManager secretManager;
+
+      public SaslDigestCallbackHandler(
+          DelegationTokenSecretManager secretManager) {
+        this.secretManager = secretManager;
+      }
+
+      private char[] getPassword(DelegationTokenIdentifier tokenid) throws InvalidToken {
+        return encodePassword(secretManager.retrievePassword(tokenid));
+      }
+
+      private char[] encodePassword(byte[] password) {
+        return new String(Base64.encodeBase64(password)).toCharArray();
+      }
+      /** {@inheritDoc} */
+      @Override
+      public void handle(Callback[] callbacks) throws InvalidToken,
+      UnsupportedCallbackException {
+        NameCallback nc = null;
+        PasswordCallback pc = null;
+        AuthorizeCallback ac = null;
+        for (Callback callback : callbacks) {
+          if (callback instanceof AuthorizeCallback) {
+            ac = (AuthorizeCallback) callback;
+          } else if (callback instanceof NameCallback) {
+            nc = (NameCallback) callback;
+          } else if (callback instanceof PasswordCallback) {
+            pc = (PasswordCallback) callback;
+          } else if (callback instanceof RealmCallback) {
+            continue; // realm is ignored
+          } else {
+            throw new UnsupportedCallbackException(callback,
+            "Unrecognized SASL DIGEST-MD5 Callback");
+          }
+        }
+        if (pc != null) {
+          DelegationTokenIdentifier tokenIdentifier = SaslRpcServer.
+          getIdentifier(nc.getDefaultName(), secretManager);
+          char[] password = getPassword(tokenIdentifier);
+
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("SASL server DIGEST-MD5 callback: setting password "
+                + "for client: " + tokenIdentifier.getUser());
+          }
+          pc.setPassword(password);
+        }
+        if (ac != null) {
+          String authid = ac.getAuthenticationID();
+          String authzid = ac.getAuthorizationID();
+          if (authid.equals(authzid)) {
+            ac.setAuthorized(true);
+          } else {
+            ac.setAuthorized(false);
+          }
+          if (ac.isAuthorized()) {
+            if (LOG.isDebugEnabled()) {
+              String username =
+                SaslRpcServer.getIdentifier(authzid, secretManager).getUser().getUserName();
+              LOG.debug("SASL server DIGEST-MD5 callback: setting "
+                  + "canonicalized client ID: " + username);
+            }
+            ac.setAuthorizedID(authzid);
+          }
+        }
+      }
      }
 
      /**
@@ -175,22 +523,35 @@ import org.apache.thrift.transport.TTran
       */
      private class TUGIAssumingProcessor implements TProcessor {
        final TProcessor wrapped;
-
-       TUGIAssumingProcessor(TProcessor wrapped) {
+       DelegationTokenSecretManager secretManager;
+       TUGIAssumingProcessor(TProcessor wrapped, DelegationTokenSecretManager secretManager) {
          this.wrapped = wrapped;
+         this.secretManager = secretManager;
        }
 
        public boolean process(final TProtocol inProt, final TProtocol outProt) throws TException {
          TTransport trans = inProt.getTransport();
          if (!(trans instanceof TSaslServerTransport)) {
-           throw new TException("Unexpected non-SASL transport "  +trans.getClass());
+           throw new TException("Unexpected non-SASL transport " + trans.getClass());
          }
          TSaslServerTransport saslTrans = (TSaslServerTransport)trans;
-         String authId = saslTrans.getSaslServer().getAuthorizationID();
+         SaslServer saslServer = saslTrans.getSaslServer();
+         String authId = saslServer.getAuthorizationID();
+         LOG.debug("AUTH ID ======>" + authId);
+         String endUser = authId;
 
+         if(saslServer.getMechanismName().equals("DIGEST-MD5")) {
+           try {
+             TokenIdentifier tokenId = SaslRpcServer.getIdentifier(authId,
+                 secretManager);
+             endUser = tokenId.getUser().getUserName();
+           } catch (InvalidToken e) {
+             throw new TException(e.getMessage());
+           }
+         }
          try {
            UserGroupInformation clientUgi = UserGroupInformation.createProxyUser(
-               authId,  UserGroupInformation.getLoginUser());
+              endUser, UserGroupInformation.getLoginUser());
            return clientUgi.doAs(new PrivilegedExceptionAction<Boolean>() {
                public Boolean run() {
                  try {
@@ -213,160 +574,33 @@ import org.apache.thrift.transport.TTran
        }
      }
 
-   }
-
-   /**
-    * A TransportFactory that wraps another one, but assumes a specified UGI
-    * before calling through.
-    *
-    * This is used on the server side to assume the server's Principal when accepting
-    * clients.
-    */
-   private static class TUGIAssumingTransportFactory extends TTransportFactory {
-     private final UserGroupInformation ugi;
-     private final TTransportFactory wrapped;
-
-     public TUGIAssumingTransportFactory(TTransportFactory wrapped, UserGroupInformation ugi) {
-       assert wrapped != null;
-       assert ugi != null;
-
-       this.wrapped = wrapped;
-       this.ugi = ugi;
-     }
-
-     @Override
-     public TTransport getTransport(final TTransport trans) {
-       return ugi.doAs(new PrivilegedAction<TTransport>() {
-         public TTransport run() {
-           return wrapped.getTransport(trans);
-         }
-       });
-     }
-   }
+    /**
+      * A TransportFactory that wraps another one, but assumes a specified UGI
+      * before calling through.
+      *
+      * This is used on the server side to assume the server's Principal when accepting
+      * clients.
+      */
+     static class TUGIAssumingTransportFactory extends TTransportFactory {
+       private final UserGroupInformation ugi;
+       private final TTransportFactory wrapped;
+
+       public TUGIAssumingTransportFactory(TTransportFactory wrapped, UserGroupInformation ugi) {
+         assert wrapped != null;
+         assert ugi != null;
 
-   /**
-    * The Thrift SASL transports call Sasl.createSaslServer and Sasl.createSaslClient
-    * inside open(). So, we need to assume the correct UGI when the transport is opened
-    * so that the SASL mechanisms have access to the right principal. This transport
-    * wraps the Sasl transports to set up the right UGI context for open().
-    *
-    * This is used on the client side, where the API explicitly opens a transport to
-    * the server.
-    */
-   private static class TUGIAssumingTransport extends TFilterTransport {
-     private final UserGroupInformation ugi;
-
-     public TUGIAssumingTransport(TTransport wrapped, UserGroupInformation ugi) {
-       super(wrapped);
-       this.ugi = ugi;
-     }
+         this.wrapped = wrapped;
+         this.ugi = ugi;
+       }
 
-     @Override
-     public void open() throws TTransportException {
-       try {
-         ugi.doAs(new PrivilegedExceptionAction<Void>() {
-           public Void run() {
-             try {
-               wrapped.open();
-             } catch (TTransportException tte) {
-               // Wrap the transport exception in an RTE, since UGI.doAs() then goes
-               // and unwraps this for us out of the doAs block. We then unwrap one
-               // more time in our catch clause to get back the TTE. (ugh)
-               throw new RuntimeException(tte);
-             }
-             return null;
+       @Override
+       public TTransport getTransport(final TTransport trans) {
+         return ugi.doAs(new PrivilegedAction<TTransport>() {
+           public TTransport run() {
+             return wrapped.getTransport(trans);
            }
          });
-       } catch (IOException ioe) {
-         assert false : "Never thrown!";
-         throw new RuntimeException("Received an ioe we never threw!", ioe);
-       } catch (InterruptedException ie) {
-         assert false : "We never expect to see an InterruptedException thrown in this block";
-         throw new RuntimeException("Received an ie we never threw!", ie);
-       } catch (RuntimeException rte) {
-         if (rte.getCause() instanceof TTransportException) {
-           throw (TTransportException)rte.getCause();
-         } else {
-           throw rte;
-         }
        }
      }
    }
-
-   /**
-    * Transport that simply wraps another transport.
-    * This is the equivalent of FilterInputStream for Thrift transports.
-    */
-   private static class TFilterTransport extends TTransport {
-     protected final TTransport wrapped;
-
-     public TFilterTransport(TTransport wrapped) {
-       this.wrapped = wrapped;
-     }
-
-     @Override
-     public void open() throws TTransportException {
-       wrapped.open();
-     }
-
-     @Override
-     public boolean isOpen() {
-       return wrapped.isOpen();
-     }
-
-     @Override
-     public boolean peek() {
-       return wrapped.peek();
-     }
-
-     @Override
-     public void close() {
-       wrapped.close();
-     }
-
-     @Override
-     public int read(byte[] buf, int off, int len) throws TTransportException {
-       return wrapped.read(buf, off, len);
-     }
-
-     @Override
-     public int readAll(byte[] buf, int off, int len) throws TTransportException {
-       return wrapped.readAll(buf, off, len);
-     }
-
-     @Override
-     public void write(byte[] buf) throws TTransportException {
-       wrapped.write(buf);
-     }
-
-     @Override
-     public void write(byte[] buf, int off, int len) throws TTransportException {
-       wrapped.write(buf, off, len);
-     }
-
-     @Override
-     public void flush() throws TTransportException {
-       wrapped.flush();
-     }
-
-     @Override
-     public byte[] getBuffer() {
-       return wrapped.getBuffer();
-     }
-
-     @Override
-     public int getBufferPosition() {
-       return wrapped.getBufferPosition();
-     }
-
-     @Override
-     public int getBytesRemainingInBuffer() {
-       return wrapped.getBytesRemainingInBuffer();
-     }
-
-     @Override
-     public void consumeBuffer(int len) {
-       wrapped.consumeBuffer(len);
-     }
-   }
  }

Modified: hive/trunk/shims/src/common/java/org/apache/hadoop/hive/shims/HadoopShims.java
URL: http://svn.apache.org/viewvc/hive/trunk/shims/src/common/java/org/apache/hadoop/hive/shims/HadoopShims.java?rev=1060876&r1=1060875&r2=1060876&view=diff
==============================================================================
--- hive/trunk/shims/src/common/java/org/apache/hadoop/hive/shims/HadoopShims.java (original)
+++ hive/trunk/shims/src/common/java/org/apache/hadoop/hive/shims/HadoopShims.java Wed Jan 19 17:28:28 2011
@@ -21,6 +21,10 @@ import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
 
+import javax.security.auth.login.LoginException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
@@ -35,10 +39,6 @@ import org.apache.hadoop.mapred.Reporter
 import org.apache.hadoop.mapred.RunningJob;
 import org.apache.hadoop.mapred.TaskCompletionEvent;
 import org.apache.hadoop.security.UserGroupInformation;
-import javax.security.auth.login.LoginException;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
 
 /**
  * In order to be compatible with multiple versions of Hadoop, all parts
@@ -158,6 +158,24 @@ public interface HadoopShims {
    */
   public UserGroupInformation getUGIForConf(Configuration conf) throws LoginException, IOException;
 
+
+  /**
+   * Get the string form of the token given a token signature.
+   * The signature is used as the value of the "service" field in the token for lookup.
+   * Ref: AbstractDelegationTokenSelector in Hadoop. If there exists such a token
+   * in the token cache (credential store) of the job, the lookup returns that.
+   * This is relevant only when running against a "secure" hadoop release
+   * The method gets hold of the tokens if they are set up by hadoop - this should
+   * happen on the map/reduce tasks if the client added the tokens into hadoop's
+   * credential store in the front end during job submission. The method will
+   * select the hive delegation token among the set of tokens and return the string
+   * form of it
+   * @param tokenSignature
+   * @return the string form of the token found
+   * @throws IOException
+   */
+  String getTokenStrForm(String tokenSignature) throws IOException;
+
   /**
    * InputSplitShim.
    *

Modified: hive/trunk/shims/src/common/java/org/apache/hadoop/hive/thrift/HadoopThriftAuthBridge.java
URL: http://svn.apache.org/viewvc/hive/trunk/shims/src/common/java/org/apache/hadoop/hive/thrift/HadoopThriftAuthBridge.java?rev=1060876&r1=1060875&r2=1060876&view=diff
==============================================================================
--- hive/trunk/shims/src/common/java/org/apache/hadoop/hive/thrift/HadoopThriftAuthBridge.java (original)
+++ hive/trunk/shims/src/common/java/org/apache/hadoop/hive/thrift/HadoopThriftAuthBridge.java Wed Jan 19 17:28:28 2011
@@ -20,15 +20,15 @@
 
  import java.io.IOException;
 
+import org.apache.hadoop.conf.Configuration;
 import org.apache.thrift.TProcessor;
 import org.apache.thrift.transport.TTransport;
 import org.apache.thrift.transport.TTransportException;
 import org.apache.thrift.transport.TTransportFactory;
-
  /**
   * This class is only overridden by the secure hadoop shim. It allows
   * the Thrift SASL support to bridge to Hadoop's UserGroupInformation
-  * infrastructure.
+  * & DelegationToken infrastructure.
   */
  public class HadoopThriftAuthBridge {
    public Client createClient() {
@@ -44,15 +44,33 @@ import org.apache.thrift.transport.TTran
 
 
    public static abstract class Client {
+   /**
+    *
+    * @param principalConfig In the case of Kerberos authentication this will
+    * be the kerberos principal name, for DIGEST-MD5 (delegation token) based
+    * authentication this will be null
+    * @param host The metastore server host name
+    * @param methodStr "KERBEROS" or "DIGEST"
+    * @param tokenStrForm This is url encoded string form of
+    * org.apache.hadoop.security.token.
+    * @param underlyingTransport the underlying transport
+    * @return the transport
+    * @throws IOException
+    */
      public abstract TTransport createClientTransport(
        String principalConfig, String host,
-       String methodStr, TTransport underlyingTransport)
+       String methodStr,String tokenStrForm, TTransport underlyingTransport)
        throws IOException;
    }
 
    public static abstract class Server {
      public abstract TTransportFactory createTransportFactory() throws TTransportException;
      public abstract TProcessor wrapProcessor(TProcessor processor);
+     public abstract void startDelegationTokenSecretManager(Configuration conf) throws IOException;
+     public abstract String getDelegationToken(String renewer) throws IOException;
+     public abstract long renewDelegationToken(String tokenStrForm) throws IOException;
+     public abstract String getDelegationToken(String renewer, String token_signature) throws IOException;
+     public abstract void cancelDelegationToken(String tokenStrForm) throws IOException;
    }
  }
 

Added: hive/trunk/shims/src/test/org/apache/hadoop/hive/thrift/TestHadoop20SAuthBridge.java
URL: http://svn.apache.org/viewvc/hive/trunk/shims/src/test/org/apache/hadoop/hive/thrift/TestHadoop20SAuthBridge.java?rev=1060876&view=auto
==============================================================================
--- hive/trunk/shims/src/test/org/apache/hadoop/hive/thrift/TestHadoop20SAuthBridge.java (added)
+++ hive/trunk/shims/src/test/org/apache/hadoop/hive/thrift/TestHadoop20SAuthBridge.java Wed Jan 19 17:28:28 2011
@@ -0,0 +1,195 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.thrift;
+
+import java.net.InetSocketAddress;
+import java.net.Socket;
+import java.security.PrivilegedExceptionAction;
+
+import junit.framework.TestCase;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.HiveMetaStore;
+import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
+import org.apache.hadoop.hive.metastore.api.Database;
+import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.security.SaslRpcServer;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.SaslRpcServer.AuthMethod;
+import org.apache.hadoop.security.token.Token;
+import org.apache.thrift.transport.TSaslServerTransport;
+import org.apache.thrift.transport.TTransportException;
+import org.apache.thrift.transport.TTransportFactory;
+
+public class TestHadoop20SAuthBridge extends TestCase {
+
+  private static class MyHadoopThriftAuthBridge20S extends HadoopThriftAuthBridge20S {
+    @Override
+    public Server createServer(String keytabFile, String principalConf)
+    throws TTransportException {
+      //Create a Server that doesn't interpret any Kerberos stuff
+      return new Server();
+    }
+
+    static class Server extends HadoopThriftAuthBridge20S.Server {
+      public Server() throws TTransportException {
+        super();
+      }
+      @Override
+      public TTransportFactory createTransportFactory()
+      throws TTransportException {
+        TSaslServerTransport.Factory transFactory =
+          new TSaslServerTransport.Factory();
+        transFactory.addServerDefinition(AuthMethod.DIGEST.getMechanismName(),
+            null, SaslRpcServer.SASL_DEFAULT_REALM,
+            SaslRpcServer.SASL_PROPS,
+            new SaslDigestCallbackHandler(secretManager));
+
+        return new TUGIAssumingTransportFactory(transFactory, realUgi);
+      }
+    }
+  }
+  private static final int port = 10000;
+
+  private final HiveConf conf;
+
+  public TestHadoop20SAuthBridge(String name) {
+    super(name);
+    System.setProperty(HiveConf.ConfVars.METASTORE_USE_THRIFT_SASL.varname,
+        "true");
+    System.setProperty(HiveConf.ConfVars.METASTOREURIS.varname,
+        "thrift://localhost:" + port);
+    System.setProperty(HiveConf.ConfVars.METASTOREWAREHOUSE.varname, new Path(
+        System.getProperty("test.build.data", "/tmp")).toString());
+    conf = new HiveConf(TestHadoop20SAuthBridge.class);
+    conf.setBoolean("hive.metastore.local", false);
+  }
+
+  public void testSaslWithHiveMetaStore() throws Exception {
+
+    Thread thread = new Thread(new Runnable() {
+      public void run() {
+        try {
+          HiveMetaStore.startMetaStore(port,new MyHadoopThriftAuthBridge20S());
+        } catch (Throwable e) {
+          System.exit(1);
+        }
+      }
+    });
+    thread.setDaemon(true);
+    thread.start();
+    loopUntilHMSReady();
+    UserGroupInformation clientUgi = UserGroupInformation.getCurrentUser();
+    obtainTokenAndAddIntoUGI(clientUgi, null);
+    obtainTokenAndAddIntoUGI(clientUgi, "tokenForFooTablePartition");
+  }
+
+  private void obtainTokenAndAddIntoUGI(UserGroupInformation clientUgi,
+      String tokenSig) throws Exception {
+    //obtain a token by directly invoking the metastore operation(without going
+    //through the thrift interface). Obtaining a token makes the secret manager
+    //aware of the user and that it gave the token to the user
+    String tokenStrForm;
+    if (tokenSig == null) {
+      tokenStrForm =
+        HiveMetaStore.getDelegationToken(clientUgi.getShortUserName());
+    } else {
+      tokenStrForm =
+        HiveMetaStore.getDelegationToken(clientUgi.getShortUserName(),
+                                         tokenSig);
+      conf.set("hive.metastore.token.signature", tokenSig);
+    }
+
+    Token<DelegationTokenIdentifier> t= new Token<DelegationTokenIdentifier>();
+    t.decodeFromUrlString(tokenStrForm);
+    //add the token to the clientUgi for securely talking to the metastore
+    clientUgi.addToken(t);
+    //Create the metastore client as the clientUgi. Doing so this
+    //way will give the client access to the token that was added earlier
+    //in the clientUgi
+    HiveMetaStoreClient hiveClient =
+      clientUgi.doAs(new PrivilegedExceptionAction<HiveMetaStoreClient>() {
+        public HiveMetaStoreClient run() throws Exception {
+          HiveMetaStoreClient hiveClient =
+            new HiveMetaStoreClient(conf);
+          return hiveClient;
+        }
+      });
+
+    assertTrue("Couldn't connect to metastore", hiveClient != null);
+
+    //try out some metastore operations
+    createDBAndVerifyExistence(hiveClient);
+    hiveClient.close();
+
+    //Now cancel the delegation token
+    HiveMetaStore.cancelDelegationToken(tokenStrForm);
+
+    //now metastore connection should fail
+    hiveClient =
+      clientUgi.doAs(new PrivilegedExceptionAction<HiveMetaStoreClient>() {
+        public HiveMetaStoreClient run() {
+          try {
+            HiveMetaStoreClient hiveClient =
+              new HiveMetaStoreClient(conf);
+            return hiveClient;
+          } catch (MetaException e) {
+            return null;
+          }
+        }
+      });
+    assertTrue("Expected metastore operations to fail", hiveClient == null);
+  }
+
+  /**
+   * A simple connect test to make sure that the metastore is up
+   * @throws Exception
+   */
+  private void loopUntilHMSReady() throws Exception {
+    int retries = 0;
+    Exception exc = null;
+    while (true) {
+      try {
+        Socket socket = new Socket();
+        socket.connect(new InetSocketAddress(port), 5000);
+        socket.close();
+        return;
+      } catch (Exception e) {
+        if (retries++ > 6) { //give up
+          exc = e;
+          break;
+        }
+        Thread.sleep(10000);
+      }
+    }
+    throw exc;
+  }
+
+  private void createDBAndVerifyExistence(HiveMetaStoreClient client)
+  throws Exception {
+    String dbName = "simpdb";
+    Database db = new Database();
+    db.setName(dbName);
+    client.createDatabase(db);
+    Database db1 = client.getDatabase(dbName);
+    client.dropDatabase(dbName);
+    assertTrue("Databases do not match", db1.getName().equals(db.getName()));
+  }
+}