You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by st...@apache.org on 2012/05/12 00:06:59 UTC

svn commit: r1337396 [2/5] - in /hbase/trunk: ./ security/src/main/java/org/apache/hadoop/hbase/security/ security/src/main/java/org/apache/hadoop/hbase/security/access/ security/src/main/java/org/apache/hadoop/hbase/security/token/ security/src/test/ ...

Added: hbase/trunk/src/main/java/org/apache/hadoop/hbase/security/HBaseSaslRpcClient.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/security/HBaseSaslRpcClient.java?rev=1337396&view=auto
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/security/HBaseSaslRpcClient.java (added)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/security/HBaseSaslRpcClient.java Fri May 11 22:06:57 2012
@@ -0,0 +1,279 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.security;
+
+import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
+import javax.security.auth.callback.Callback;
+import javax.security.auth.callback.CallbackHandler;
+import javax.security.auth.callback.NameCallback;
+import javax.security.auth.callback.PasswordCallback;
+import javax.security.auth.callback.UnsupportedCallbackException;
+import javax.security.sasl.RealmCallback;
+import javax.security.sasl.RealmChoiceCallback;
+import javax.security.sasl.Sasl;
+import javax.security.sasl.SaslException;
+import javax.security.sasl.SaslClient;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.io.WritableUtils;
+import org.apache.hadoop.ipc.RemoteException;
+import org.apache.hadoop.hbase.security.HBaseSaslRpcServer.AuthMethod;
+import org.apache.hadoop.hbase.security.HBaseSaslRpcServer.SaslStatus;
+import org.apache.hadoop.security.SaslInputStream;
+import org.apache.hadoop.security.SaslOutputStream;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.TokenIdentifier;
+
+/**
+ * A utility class that encapsulates SASL logic for RPC client.
+ * Copied from <code>org.apache.hadoop.security</code>
+ */
+public class HBaseSaslRpcClient {
+  public static final Log LOG = LogFactory.getLog(HBaseSaslRpcClient.class);
+
+  private final SaslClient saslClient;
+
+  /**
+   * Create a HBaseSaslRpcClient for an authentication method
+   * 
+   * @param method
+   *          the requested authentication method
+   * @param token
+   *          token to use if needed by the authentication method
+   */
+  public HBaseSaslRpcClient(AuthMethod method,
+      Token<? extends TokenIdentifier> token, String serverPrincipal)
+      throws IOException {
+    switch (method) {
+    case DIGEST:
+      if (LOG.isDebugEnabled())
+        LOG.debug("Creating SASL " + AuthMethod.DIGEST.getMechanismName()
+            + " client to authenticate to service at " + token.getService());
+      saslClient = Sasl.createSaslClient(new String[] { AuthMethod.DIGEST
+          .getMechanismName() }, null, null, HBaseSaslRpcServer.SASL_DEFAULT_REALM,
+          HBaseSaslRpcServer.SASL_PROPS, new SaslClientCallbackHandler(token));
+      break;
+    case KERBEROS:
+      if (LOG.isDebugEnabled()) {
+        LOG
+            .debug("Creating SASL " + AuthMethod.KERBEROS.getMechanismName()
+                + " client. Server's Kerberos principal name is "
+                + serverPrincipal);
+      }
+      if (serverPrincipal == null || serverPrincipal.length() == 0) {
+        throw new IOException(
+            "Failed to specify server's Kerberos principal name");
+      }
+      String names[] = HBaseSaslRpcServer.splitKerberosName(serverPrincipal);
+      if (names.length != 3) {
+        throw new IOException(
+          "Kerberos principal does not have the expected format: "
+                + serverPrincipal);
+      }
+      saslClient = Sasl.createSaslClient(new String[] { AuthMethod.KERBEROS
+          .getMechanismName() }, null, names[0], names[1],
+          HBaseSaslRpcServer.SASL_PROPS, null);
+      break;
+    default:
+      throw new IOException("Unknown authentication method " + method);
+    }
+    if (saslClient == null)
+      throw new IOException("Unable to find SASL client implementation");
+  }
+
+  private static void readStatus(DataInputStream inStream) throws IOException {
+    int status = inStream.readInt(); // read status
+    if (status != SaslStatus.SUCCESS.state) {
+      throw new RemoteException(WritableUtils.readString(inStream),
+          WritableUtils.readString(inStream));
+    }
+  }
+  
+  /**
+   * Do client side SASL authentication with server via the given InputStream
+   * and OutputStream
+   * 
+   * @param inS
+   *          InputStream to use
+   * @param outS
+   *          OutputStream to use
+   * @return true if connection is set up, or false if needs to switch 
+   *             to simple Auth.
+   * @throws IOException
+   */
+  public boolean saslConnect(InputStream inS, OutputStream outS)
+      throws IOException {
+    DataInputStream inStream = new DataInputStream(new BufferedInputStream(inS));
+    DataOutputStream outStream = new DataOutputStream(new BufferedOutputStream(
+        outS));
+
+    try {
+      byte[] saslToken = new byte[0];
+      if (saslClient.hasInitialResponse())
+        saslToken = saslClient.evaluateChallenge(saslToken);
+      if (saslToken != null) {
+        outStream.writeInt(saslToken.length);
+        outStream.write(saslToken, 0, saslToken.length);
+        outStream.flush();
+        if (LOG.isDebugEnabled())
+          LOG.debug("Have sent token of size " + saslToken.length
+              + " from initSASLContext.");
+      }
+      if (!saslClient.isComplete()) {
+        readStatus(inStream);
+        int len = inStream.readInt();
+        if (len == HBaseSaslRpcServer.SWITCH_TO_SIMPLE_AUTH) {
+          if (LOG.isDebugEnabled())
+            LOG.debug("Server asks us to fall back to simple auth.");
+          saslClient.dispose();
+          return false;
+        }
+        saslToken = new byte[len];
+        if (LOG.isDebugEnabled())
+          LOG.debug("Will read input token of size " + saslToken.length
+              + " for processing by initSASLContext");
+        inStream.readFully(saslToken);
+      }
+
+      while (!saslClient.isComplete()) {
+        saslToken = saslClient.evaluateChallenge(saslToken);
+        if (saslToken != null) {
+          if (LOG.isDebugEnabled())
+            LOG.debug("Will send token of size " + saslToken.length
+                + " from initSASLContext.");
+          outStream.writeInt(saslToken.length);
+          outStream.write(saslToken, 0, saslToken.length);
+          outStream.flush();
+        }
+        if (!saslClient.isComplete()) {
+          readStatus(inStream);
+          saslToken = new byte[inStream.readInt()];
+          if (LOG.isDebugEnabled())
+            LOG.debug("Will read input token of size " + saslToken.length
+                + " for processing by initSASLContext");
+          inStream.readFully(saslToken);
+        }
+      }
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("SASL client context established. Negotiated QoP: "
+            + saslClient.getNegotiatedProperty(Sasl.QOP));
+      }
+      return true;
+    } catch (IOException e) {
+      try {
+        saslClient.dispose();
+      } catch (SaslException ignored) {
+        // ignore further exceptions during cleanup
+      }
+      throw e;
+    }
+  }
+
+  /**
+   * Get a SASL wrapped InputStream. Can be called only after saslConnect() has
+   * been called.
+   * 
+   * @param in
+   *          the InputStream to wrap
+   * @return a SASL wrapped InputStream
+   * @throws IOException
+   */
+  public InputStream getInputStream(InputStream in) throws IOException {
+    if (!saslClient.isComplete()) {
+      throw new IOException("Sasl authentication exchange hasn't completed yet");
+    }
+    return new SaslInputStream(in, saslClient);
+  }
+
+  /**
+   * Get a SASL wrapped OutputStream. Can be called only after saslConnect() has
+   * been called.
+   * 
+   * @param out
+   *          the OutputStream to wrap
+   * @return a SASL wrapped OutputStream
+   * @throws IOException
+   */
+  public OutputStream getOutputStream(OutputStream out) throws IOException {
+    if (!saslClient.isComplete()) {
+      throw new IOException("Sasl authentication exchange hasn't completed yet");
+    }
+    return new SaslOutputStream(out, saslClient);
+  }
+
+  /** Release resources used by wrapped saslClient */
+  public void dispose() throws SaslException {
+    saslClient.dispose();
+  }
+
+  private static class SaslClientCallbackHandler implements CallbackHandler {
+    private final String userName;
+    private final char[] userPassword;
+
+    public SaslClientCallbackHandler(Token<? extends TokenIdentifier> token) {
+      this.userName = HBaseSaslRpcServer.encodeIdentifier(token.getIdentifier());
+      this.userPassword = HBaseSaslRpcServer.encodePassword(token.getPassword());
+    }
+
+    public void handle(Callback[] callbacks)
+        throws UnsupportedCallbackException {
+      NameCallback nc = null;
+      PasswordCallback pc = null;
+      RealmCallback rc = null;
+      for (Callback callback : callbacks) {
+        if (callback instanceof RealmChoiceCallback) {
+          continue;
+        } else if (callback instanceof NameCallback) {
+          nc = (NameCallback) callback;
+        } else if (callback instanceof PasswordCallback) {
+          pc = (PasswordCallback) callback;
+        } else if (callback instanceof RealmCallback) {
+          rc = (RealmCallback) callback;
+        } else {
+          throw new UnsupportedCallbackException(callback,
+              "Unrecognized SASL client callback");
+        }
+      }
+      if (nc != null) {
+        if (LOG.isDebugEnabled())
+          LOG.debug("SASL client callback: setting username: " + userName);
+        nc.setName(userName);
+      }
+      if (pc != null) {
+        if (LOG.isDebugEnabled())
+          LOG.debug("SASL client callback: setting userPassword");
+        pc.setPassword(userPassword);
+      }
+      if (rc != null) {
+        if (LOG.isDebugEnabled())
+          LOG.debug("SASL client callback: setting realm: "
+              + rc.getDefaultText());
+        rc.setText(rc.getDefaultText());
+      }
+    }
+  }
+}

Added: hbase/trunk/src/main/java/org/apache/hadoop/hbase/security/HBaseSaslRpcServer.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/security/HBaseSaslRpcServer.java?rev=1337396&view=auto
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/security/HBaseSaslRpcServer.java (added)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/security/HBaseSaslRpcServer.java Fri May 11 22:06:57 2012
@@ -0,0 +1,278 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.security;
+
+import java.io.ByteArrayInputStream;
+import java.io.DataInput;
+import java.io.DataInputStream;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.Map;
+import java.util.TreeMap;
+
+import javax.security.auth.callback.Callback;
+import javax.security.auth.callback.CallbackHandler;
+import javax.security.auth.callback.NameCallback;
+import javax.security.auth.callback.PasswordCallback;
+import javax.security.auth.callback.UnsupportedCallbackException;
+import javax.security.sasl.AuthorizeCallback;
+import javax.security.sasl.RealmCallback;
+import javax.security.sasl.Sasl;
+
+import org.apache.commons.codec.binary.Base64;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.ipc.HBaseServer;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.SecretManager;
+import org.apache.hadoop.security.token.TokenIdentifier;
+import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod;
+import org.apache.hadoop.security.token.SecretManager.InvalidToken;
+
+/**
+ * A utility class for dealing with SASL on RPC server
+ */
+public class HBaseSaslRpcServer {
+  public static final Log LOG = LogFactory.getLog(HBaseSaslRpcServer.class);
+  public static final String SASL_DEFAULT_REALM = "default";
+  public static final Map<String, String> SASL_PROPS =
+      new TreeMap<String, String>();
+
+  public static final int SWITCH_TO_SIMPLE_AUTH = -88;
+
+  public static enum QualityOfProtection {
+    AUTHENTICATION("auth"),
+    INTEGRITY("auth-int"),
+    PRIVACY("auth-conf");
+
+    public final String saslQop;
+
+    private QualityOfProtection(String saslQop) {
+      this.saslQop = saslQop;
+    }
+
+    public String getSaslQop() {
+      return saslQop;
+    }
+  }
+
+  public static void init(Configuration conf) {
+    QualityOfProtection saslQOP = QualityOfProtection.AUTHENTICATION;
+    String rpcProtection = conf.get("hbase.rpc.protection",
+        QualityOfProtection.AUTHENTICATION.name().toLowerCase());
+    if (QualityOfProtection.INTEGRITY.name().toLowerCase()
+        .equals(rpcProtection)) {
+      saslQOP = QualityOfProtection.INTEGRITY;
+    } else if (QualityOfProtection.PRIVACY.name().toLowerCase().equals(
+        rpcProtection)) {
+      saslQOP = QualityOfProtection.PRIVACY;
+    }
+
+    SASL_PROPS.put(Sasl.QOP, saslQOP.getSaslQop());
+    SASL_PROPS.put(Sasl.SERVER_AUTH, "true");
+  }
+
+  static String encodeIdentifier(byte[] identifier) {
+    return new String(Base64.encodeBase64(identifier));
+  }
+
+  static byte[] decodeIdentifier(String identifier) {
+    return Base64.decodeBase64(identifier.getBytes());
+  }
+
+  public static <T extends TokenIdentifier> T getIdentifier(String id,
+      SecretManager<T> secretManager) throws InvalidToken {
+    byte[] tokenId = decodeIdentifier(id);
+    T tokenIdentifier = secretManager.createIdentifier();
+    try {
+      tokenIdentifier.readFields(new DataInputStream(new ByteArrayInputStream(
+          tokenId)));
+    } catch (IOException e) {
+      throw (InvalidToken) new InvalidToken(
+          "Can't de-serialize tokenIdentifier").initCause(e);
+    }
+    return tokenIdentifier;
+  }
+
+  static char[] encodePassword(byte[] password) {
+    return new String(Base64.encodeBase64(password)).toCharArray();
+  }
+
+  /** Splitting fully qualified Kerberos name into parts */
+  public static String[] splitKerberosName(String fullName) {
+    return fullName.split("[/@]");
+  }
+
+  public enum SaslStatus {
+    SUCCESS (0),
+    ERROR (1);
+    
+    public final int state;
+    private SaslStatus(int state) {
+      this.state = state;
+    }
+  }
+  
+  /** Authentication method */
+  public static enum AuthMethod {
+    SIMPLE((byte) 80, "", AuthenticationMethod.SIMPLE),
+    KERBEROS((byte) 81, "GSSAPI", AuthenticationMethod.KERBEROS),
+    DIGEST((byte) 82, "DIGEST-MD5", AuthenticationMethod.TOKEN);
+
+    /** The code for this method. */
+    public final byte code;
+    public final String mechanismName;
+    public final AuthenticationMethod authenticationMethod;
+
+    private AuthMethod(byte code, String mechanismName, 
+                       AuthenticationMethod authMethod) {
+      this.code = code;
+      this.mechanismName = mechanismName;
+      this.authenticationMethod = authMethod;
+    }
+
+    private static final int FIRST_CODE = values()[0].code;
+
+    /** Return the object represented by the code. */
+    private static AuthMethod valueOf(byte code) {
+      final int i = (code & 0xff) - FIRST_CODE;
+      return i < 0 || i >= values().length ? null : values()[i];
+    }
+
+    /** Return the SASL mechanism name */
+    public String getMechanismName() {
+      return mechanismName;
+    }
+
+    /** Read from in */
+    public static AuthMethod read(DataInput in) throws IOException {
+      return valueOf(in.readByte());
+    }
+
+    /** Write to out */
+    public void write(DataOutput out) throws IOException {
+      out.write(code);
+    }
+  };
+
+  /** CallbackHandler for SASL DIGEST-MD5 mechanism */
+  public static class SaslDigestCallbackHandler implements CallbackHandler {
+    private SecretManager<TokenIdentifier> secretManager;
+    private HBaseServer.Connection connection;
+
+    public SaslDigestCallbackHandler(
+        SecretManager<TokenIdentifier> secretManager,
+        HBaseServer.Connection connection) {
+      this.secretManager = secretManager;
+      this.connection = connection;
+    }
+
+    private char[] getPassword(TokenIdentifier tokenid) throws InvalidToken {
+      return encodePassword(secretManager.retrievePassword(tokenid));
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public void handle(Callback[] callbacks) throws InvalidToken,
+        UnsupportedCallbackException {
+      NameCallback nc = null;
+      PasswordCallback pc = null;
+      AuthorizeCallback ac = null;
+      for (Callback callback : callbacks) {
+        if (callback instanceof AuthorizeCallback) {
+          ac = (AuthorizeCallback) callback;
+        } else if (callback instanceof NameCallback) {
+          nc = (NameCallback) callback;
+        } else if (callback instanceof PasswordCallback) {
+          pc = (PasswordCallback) callback;
+        } else if (callback instanceof RealmCallback) {
+          continue; // realm is ignored
+        } else {
+          throw new UnsupportedCallbackException(callback,
+              "Unrecognized SASL DIGEST-MD5 Callback");
+        }
+      }
+      if (pc != null) {
+        TokenIdentifier tokenIdentifier = getIdentifier(nc.getDefaultName(), secretManager);
+        char[] password = getPassword(tokenIdentifier);
+        UserGroupInformation user = null;
+        user = tokenIdentifier.getUser(); // may throw exception
+        connection.attemptingUser = user;
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("SASL server DIGEST-MD5 callback: setting password "
+              + "for client: " + tokenIdentifier.getUser());
+        }
+        pc.setPassword(password);
+      }
+      if (ac != null) {
+        String authid = ac.getAuthenticationID();
+        String authzid = ac.getAuthorizationID();
+        if (authid.equals(authzid)) {
+          ac.setAuthorized(true);
+        } else {
+          ac.setAuthorized(false);
+        }
+        if (ac.isAuthorized()) {
+          if (LOG.isDebugEnabled()) {
+            String username =
+              getIdentifier(authzid, secretManager).getUser().getUserName();
+            LOG.debug("SASL server DIGEST-MD5 callback: setting "
+                + "canonicalized client ID: " + username);
+          }
+          ac.setAuthorizedID(authzid);
+        }
+      }
+    }
+  }
+
+  /** CallbackHandler for SASL GSSAPI Kerberos mechanism */
+  public static class SaslGssCallbackHandler implements CallbackHandler {
+
+    /** {@inheritDoc} */
+    @Override
+    public void handle(Callback[] callbacks) throws
+        UnsupportedCallbackException {
+      AuthorizeCallback ac = null;
+      for (Callback callback : callbacks) {
+        if (callback instanceof AuthorizeCallback) {
+          ac = (AuthorizeCallback) callback;
+        } else {
+          throw new UnsupportedCallbackException(callback,
+              "Unrecognized SASL GSSAPI Callback");
+        }
+      }
+      if (ac != null) {
+        String authid = ac.getAuthenticationID();
+        String authzid = ac.getAuthorizationID();
+        if (authid.equals(authzid)) {
+          ac.setAuthorized(true);
+        } else {
+          ac.setAuthorized(false);
+        }
+        if (ac.isAuthorized()) {
+          if (LOG.isDebugEnabled())
+            LOG.debug("SASL server GSSAPI callback: setting "
+                + "canonicalized client ID: " + authzid);
+          ac.setAuthorizedID(authzid);
+        }
+      }
+    }
+  }
+}

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/security/User.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/security/User.java?rev=1337396&r1=1337395&r2=1337396&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/security/User.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/security/User.java Fri May 11 22:06:57 2012
@@ -22,17 +22,14 @@ package org.apache.hadoop.hbase.security
 
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.HBaseConfiguration;
-import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.ConnectionHeader;
-import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.UserInformation;
 import org.apache.hadoop.hbase.util.Methods;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.security.UserGroupInformation;
 
 import java.io.IOException;
-import java.lang.reflect.Constructor;
 import java.lang.reflect.UndeclaredThrowableException;
 import java.security.PrivilegedAction;
 import java.security.PrivilegedExceptionAction;
@@ -50,24 +47,12 @@ import org.apache.commons.logging.Log;
  * HBase, but can be extended as needs change.
  * </p>
  */
-@InterfaceAudience.Private
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
 public abstract class User {
   public static final String HBASE_SECURITY_CONF_KEY =
       "hbase.security.authentication";
 
-  /**
-   * Flag to differentiate between API-incompatible changes to
-   * {@link org.apache.hadoop.security.UserGroupInformation} between vanilla
-   * Hadoop 0.20.x and secure Hadoop 0.20+.
-   */
-  private static boolean IS_SECURE_HADOOP = true;
-  static {
-    try {
-      UserGroupInformation.class.getMethod("isSecurityEnabled");
-    } catch (NoSuchMethodException nsme) {
-      IS_SECURE_HADOOP = false;
-    }
-  }
   private static Log LOG = LogFactory.getLog(User.class);
 
   protected UserGroupInformation ugi;
@@ -138,12 +123,7 @@ public abstract class User {
    * Returns the {@code User} instance within current execution context.
    */
   public static User getCurrent() throws IOException {
-    User user;
-    if (IS_SECURE_HADOOP) {
-      user = new SecureHadoopUser();
-    } else {
-      user = new HadoopUser();
-    }
+    User user = new SecureHadoopUser();
     if (user.getUGI() == null) {
       return null;
     }
@@ -159,38 +139,7 @@ public abstract class User {
     if (ugi == null) {
       return null;
     }
-
-    if (IS_SECURE_HADOOP) {
-      return new SecureHadoopUser(ugi);
-    }
-    return new HadoopUser(ugi);
-  }
-
-  public static User createUser(ConnectionHeader head) {
-    UserGroupInformation ugi = null;
-
-    if (!head.hasUserInfo()) {
-      return create(null);
-    }
-    UserInformation userInfoProto = head.getUserInfo();
-    String effectiveUser = null;
-    if (userInfoProto.hasEffectiveUser()) {
-      effectiveUser = userInfoProto.getEffectiveUser();
-    }
-    String realUser = null;
-    if (userInfoProto.hasRealUser()) {
-      realUser = userInfoProto.getRealUser();
-    }
-    if (effectiveUser != null) {
-      if (realUser != null) {
-        UserGroupInformation realUserUgi =
-            UserGroupInformation.createRemoteUser(realUser);
-        ugi = UserGroupInformation.createProxyUser(effectiveUser, realUserUgi);
-      } else {
-        ugi = UserGroupInformation.createRemoteUser(effectiveUser);
-      }
-    }
-    return create(ugi);
+    return new SecureHadoopUser(ugi);
   }
 
   /**
@@ -201,10 +150,7 @@ public abstract class User {
    */
   public static User createUserForTesting(Configuration conf,
       String name, String[] groups) {
-    if (IS_SECURE_HADOOP) {
-      return SecureHadoopUser.createUserForTesting(conf, name, groups);
-    }
-    return HadoopUser.createUserForTesting(conf, name, groups);
+    return SecureHadoopUser.createUserForTesting(conf, name, groups);
   }
 
   /**
@@ -225,11 +171,7 @@ public abstract class User {
    */
   public static void login(Configuration conf, String fileConfKey,
       String principalConfKey, String localhost) throws IOException {
-    if (IS_SECURE_HADOOP) {
-      SecureHadoopUser.login(conf, fileConfKey, principalConfKey, localhost);
-    } else {
-      HadoopUser.login(conf, fileConfKey, principalConfKey, localhost);
-    }
+    SecureHadoopUser.login(conf, fileConfKey, principalConfKey, localhost);
   }
 
   /**
@@ -239,11 +181,7 @@ public abstract class User {
    * {@code UserGroupInformation.isSecurityEnabled()}.
    */
   public static boolean isSecurityEnabled() {
-    if (IS_SECURE_HADOOP) {
-      return SecureHadoopUser.isSecurityEnabled();
-    } else {
-      return HadoopUser.isSecurityEnabled();
-    }
+    return SecureHadoopUser.isSecurityEnabled();
   }
 
   /**
@@ -258,160 +196,6 @@ public abstract class User {
   /* Concrete implementations */
 
   /**
-   * Bridges {@link User} calls to invocations of the appropriate methods
-   * in {@link org.apache.hadoop.security.UserGroupInformation} in regular
-   * Hadoop 0.20 (ASF Hadoop and other versions without the backported security
-   * features).
-   */
-  private static class HadoopUser extends User {
-
-    private HadoopUser() {
-      try {
-        ugi = (UserGroupInformation) callStatic("getCurrentUGI");
-        if (ugi == null) {
-          // Secure Hadoop UGI will perform an implicit login if the current
-          // user is null.  Emulate the same behavior here for consistency
-          Configuration conf = HBaseConfiguration.create();
-          ugi = (UserGroupInformation) callStatic("login",
-              new Class[]{ Configuration.class }, new Object[]{ conf });
-          if (ugi != null) {
-            callStatic("setCurrentUser",
-                new Class[]{ UserGroupInformation.class }, new Object[]{ ugi });
-          }
-        }
-      } catch (RuntimeException re) {
-        throw re;
-      } catch (Exception e) {
-        throw new UndeclaredThrowableException(e,
-            "Unexpected exception HadoopUser<init>");
-      }
-    }
-
-    private HadoopUser(UserGroupInformation ugi) {
-      this.ugi = ugi;
-    }
-
-    @Override
-    public String getShortName() {
-      return ugi != null ? ugi.getUserName() : null;
-    }
-
-    @Override
-    public <T> T runAs(PrivilegedAction<T> action) {
-      T result = null;
-      UserGroupInformation previous = null;
-      try {
-        previous = (UserGroupInformation) callStatic("getCurrentUGI");
-        try {
-          if (ugi != null) {
-            callStatic("setCurrentUser", new Class[]{UserGroupInformation.class},
-                new Object[]{ugi});
-          }
-          result = action.run();
-        } finally {
-          callStatic("setCurrentUser", new Class[]{UserGroupInformation.class},
-              new Object[]{previous});
-        }
-      } catch (RuntimeException re) {
-        throw re;
-      } catch (Exception e) {
-        throw new UndeclaredThrowableException(e,
-            "Unexpected exception in runAs()");
-      }
-      return result;
-    }
-
-    @Override
-    public <T> T runAs(PrivilegedExceptionAction<T> action)
-        throws IOException, InterruptedException {
-      T result = null;
-      try {
-        UserGroupInformation previous =
-            (UserGroupInformation) callStatic("getCurrentUGI");
-        try {
-          if (ugi != null) {
-            callStatic("setCurrentUGI", new Class[]{UserGroupInformation.class},
-                new Object[]{ugi});
-          }
-          result = action.run();
-        } finally {
-          callStatic("setCurrentUGI", new Class[]{UserGroupInformation.class},
-              new Object[]{previous});
-        }
-      } catch (Exception e) {
-        if (e instanceof IOException) {
-          throw (IOException)e;
-        } else if (e instanceof InterruptedException) {
-          throw (InterruptedException)e;
-        } else if (e instanceof RuntimeException) {
-          throw (RuntimeException)e;
-        } else {
-          throw new UndeclaredThrowableException(e, "Unknown exception in runAs()");
-        }
-      }
-      return result;
-    }
-
-    @Override
-    public void obtainAuthTokenForJob(Configuration conf, Job job)
-        throws IOException, InterruptedException {
-      // this is a no-op.  token creation is only supported for kerberos
-      // authenticated clients
-    }
-
-    @Override
-    public void obtainAuthTokenForJob(JobConf job)
-        throws IOException, InterruptedException {
-      // this is a no-op.  token creation is only supported for kerberos
-      // authenticated clients
-    }
-
-    /** @see User#createUserForTesting(org.apache.hadoop.conf.Configuration, String, String[]) */
-    public static User createUserForTesting(Configuration conf,
-        String name, String[] groups) {
-      try {
-        Class c = Class.forName("org.apache.hadoop.security.UnixUserGroupInformation");
-        Constructor constructor = c.getConstructor(String.class, String[].class);
-        if (constructor == null) {
-          throw new NullPointerException(
-             );
-        }
-        UserGroupInformation newUser =
-            (UserGroupInformation)constructor.newInstance(name, groups);
-        // set user in configuration -- hack for regular hadoop
-        conf.set("hadoop.job.ugi", newUser.toString());
-        return new HadoopUser(newUser);
-      } catch (ClassNotFoundException cnfe) {
-        throw new RuntimeException(
-            "UnixUserGroupInformation not found, is this secure Hadoop?", cnfe);
-      } catch (NoSuchMethodException nsme) {
-        throw new RuntimeException(
-            "No valid constructor found for UnixUserGroupInformation!", nsme);
-      } catch (RuntimeException re) {
-        throw re;
-      } catch (Exception e) {
-        throw new UndeclaredThrowableException(e,
-            "Unexpected exception instantiating new UnixUserGroupInformation");
-      }
-    }
-
-    /**
-     * No-op since we're running on a version of Hadoop that doesn't support
-     * logins.
-     * @see User#login(org.apache.hadoop.conf.Configuration, String, String, String)
-     */
-    public static void login(Configuration conf, String fileConfKey,
-        String principalConfKey, String localhost) throws IOException {
-      LOG.info("Skipping login, not running on secure Hadoop");
-    }
-
-    /** Always returns {@code false}. */
-    public static boolean isSecurityEnabled() {
-      return false;
-    }
-  }
-
-  /**
    * Bridges {@code User} invocations to underlying calls to
    * {@link org.apache.hadoop.security.UserGroupInformation} for secure Hadoop
    * 0.20 and versions 0.21 and above.

Added: hbase/trunk/src/main/java/org/apache/hadoop/hbase/security/access/AccessControlFilter.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/security/access/AccessControlFilter.java?rev=1337396&view=auto
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/security/access/AccessControlFilter.java (added)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/security/access/AccessControlFilter.java Fri May 11 22:06:57 2012
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.security.access;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.filter.FilterBase;
+import org.apache.hadoop.hbase.security.User;
+
+/**
+ * <strong>NOTE: for internal use only by AccessController implementation</strong>
+ *
+ * <p>
+ * TODO: There is room for further performance optimization here.
+ * Calling TableAuthManager.authorize() per KeyValue imposes a fair amount of
+ * overhead.  A more optimized solution might look at the qualifiers where
+ * permissions are actually granted and explicitly limit the scan to those.
+ * </p>
+ * <p>
+ * We should aim to use this _only_ when access to the requested column families
+ * is not granted at the column family levels.  If table or column family
+ * access succeeds, then there is no need to impose the overhead of this filter.
+ * </p>
+ */
+class AccessControlFilter extends FilterBase {
+
+  private TableAuthManager authManager;
+  private byte[] table;
+  private User user;
+
+  /**
+   * For Writable
+   */
+  AccessControlFilter() {
+  }
+
+  AccessControlFilter(TableAuthManager mgr, User ugi,
+      byte[] tableName) {
+    authManager = mgr;
+    table = tableName;
+    user = ugi;
+  }
+
+  @Override
+  public ReturnCode filterKeyValue(KeyValue kv) {
+    if (authManager.authorize(user, table, kv, TablePermission.Action.READ)) {
+      return ReturnCode.INCLUDE;
+    }
+    return ReturnCode.NEXT_COL;
+  }
+
+  @Override
+  public void write(DataOutput dataOutput) throws IOException {
+    // no implementation, server-side use only
+    throw new UnsupportedOperationException(
+        "Serialization not supported.  Intended for server-side use only.");
+  }
+
+  @Override
+  public void readFields(DataInput dataInput) throws IOException {
+    // no implementation, server-side use only
+    throw new UnsupportedOperationException(
+        "Serialization not supported.  Intended for server-side use only.");
+  }
+}

Added: hbase/trunk/src/main/java/org/apache/hadoop/hbase/security/access/AccessControlLists.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/security/access/AccessControlLists.java?rev=1337396&view=auto
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/security/access/AccessControlLists.java (added)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/security/access/AccessControlLists.java Fri May 11 22:06:57 2012
@@ -0,0 +1,514 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.security.access;
+
+import com.google.common.collect.ArrayListMultimap;
+import com.google.common.collect.ListMultimap;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.catalog.MetaReader;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.io.HbaseObjectWritable;
+import org.apache.hadoop.hbase.io.hfile.Compression;
+import org.apache.hadoop.hbase.master.MasterServices;
+import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.regionserver.InternalScanner;
+import org.apache.hadoop.hbase.regionserver.StoreFile;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Pair;
+import org.apache.hadoop.io.Text;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.*;
+
+/**
+ * Maintains lists of permission grants to users and groups to allow for
+ * authorization checks by {@link AccessController}.
+ *
+ * <p>
+ * Access control lists are stored in an "internal" metadata table named
+ * {@code _acl_}. Each table's permission grants are stored as a separate row,
+ * keyed by the table name. KeyValues for permissions assignments are stored
+ * in one of the formats:
+ * <pre>
+ * Key                      Desc
+ * --------                 --------
+ * user                     table level permissions for a user [R=read, W=write]
+ * @group                   table level permissions for a group
+ * user,family              column family level permissions for a user
+ * @group,family            column family level permissions for a group
+ * user,family,qualifier    column qualifier level permissions for a user
+ * @group,family,qualifier  column qualifier level permissions for a group
+ * </pre>
+ * All values are encoded as byte arrays containing the codes from the
+ * {@link org.apache.hadoop.hbase.security.access.TablePermission.Action} enum.
+ * </p>
+ */
+public class AccessControlLists {
+  /** Internal storage table for access control lists */
+  public static final String ACL_TABLE_NAME_STR = "_acl_";
+  public static final byte[] ACL_TABLE_NAME = Bytes.toBytes(ACL_TABLE_NAME_STR);
+  /** Column family used to store ACL grants */
+  public static final String ACL_LIST_FAMILY_STR = "l";
+  public static final byte[] ACL_LIST_FAMILY = Bytes.toBytes(ACL_LIST_FAMILY_STR);
+
+  /** Table descriptor for ACL internal table */
+  public static final HTableDescriptor ACL_TABLEDESC = new HTableDescriptor(
+      ACL_TABLE_NAME);
+  static {
+    ACL_TABLEDESC.addFamily(
+        new HColumnDescriptor(ACL_LIST_FAMILY,
+            10, // Ten is arbitrary number.  Keep versions to help debugging.
+            Compression.Algorithm.NONE.getName(), true, true, 8 * 1024,
+            HConstants.FOREVER, StoreFile.BloomType.NONE.toString(),
+            HConstants.REPLICATION_SCOPE_LOCAL));
+  }
+
+  /**
+   * Delimiter to separate user, column family, and qualifier in
+   * _acl_ table info: column keys */
+  public static final char ACL_KEY_DELIMITER = ',';
+  /** Prefix character to denote group names */
+  public static final String GROUP_PREFIX = "@";
+  /** Configuration key for superusers */
+  public static final String SUPERUSER_CONF_KEY = "hbase.superuser";
+
+  private static Log LOG = LogFactory.getLog(AccessControlLists.class);
+
+  /**
+   * Check for existence of {@code _acl_} table and create it if it does not exist
+   * @param master reference to HMaster
+   */
+  static void init(MasterServices master) throws IOException {
+    if (!MetaReader.tableExists(master.getCatalogTracker(), ACL_TABLE_NAME_STR)) {
+      master.createTable(ACL_TABLEDESC, null);
+    }
+  }
+
+  /**
+   * Stores a new table permission grant in the access control lists table.
+   * @param conf the configuration
+   * @param tableName the table to which access is being granted
+   * @param username the user or group being granted the permission
+   * @param perm the details of the permission being granted
+   * @throws IOException in the case of an error accessing the metadata table
+   */
+  static void addTablePermission(Configuration conf,
+      byte[] tableName, String username, TablePermission perm)
+    throws IOException {
+
+    Put p = new Put(tableName);
+    byte[] key = Bytes.toBytes(username);
+    if (perm.getFamily() != null && perm.getFamily().length > 0) {
+      key = Bytes.add(key,
+          Bytes.add(new byte[]{ACL_KEY_DELIMITER}, perm.getFamily()));
+      if (perm.getQualifier() != null && perm.getQualifier().length > 0) {
+        key = Bytes.add(key,
+            Bytes.add(new byte[]{ACL_KEY_DELIMITER}, perm.getQualifier()));
+      }
+    }
+
+    TablePermission.Action[] actions = perm.getActions();
+    if ((actions == null) || (actions.length == 0)) {
+      LOG.warn("No actions associated with user '"+username+"'");
+      return;
+    }
+
+    byte[] value = new byte[actions.length];
+    for (int i = 0; i < actions.length; i++) {
+      value[i] = actions[i].code();
+    }
+    p.add(ACL_LIST_FAMILY, key, value);
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Writing permission for table "+
+          Bytes.toString(tableName)+" "+
+          Bytes.toString(key)+": "+Bytes.toStringBinary(value)
+      );
+    }
+    HTable acls = null;
+    try {
+      acls = new HTable(conf, ACL_TABLE_NAME);
+      acls.put(p);
+    } finally {
+      if (acls != null) acls.close();
+    }
+  }
+
+  /**
+   * Removes a previously granted permission from the stored access control
+   * lists.  The {@link TablePermission} being removed must exactly match what
+   * is stored -- no wildcard matching is attempted.  Ie, if user "bob" has
+   * been granted "READ" access to the "data" table, but only to column family
+   * plus qualifier "info:colA", then trying to call this method with only
+   * user "bob" and the table name "data" (but without specifying the
+   * column qualifier "info:colA") will have no effect.
+   *
+   * @param conf the configuration
+   * @param tableName the table of the current permission grant
+   * @param userName the user or group currently granted the permission
+   * @param perm the details of the permission to be revoked
+   * @throws IOException if there is an error accessing the metadata table
+   */
+  static void removeTablePermission(Configuration conf,
+      byte[] tableName, String userName, TablePermission perm)
+    throws IOException {
+
+    Delete d = new Delete(tableName);
+    byte[] key = null;
+    if (perm.getFamily() != null && perm.getFamily().length > 0) {
+      key = Bytes.toBytes(userName + ACL_KEY_DELIMITER +
+          Bytes.toString(perm.getFamily()));
+      if (perm.getQualifier() != null && perm.getQualifier().length > 0) {
+       key = Bytes.toBytes(userName + ACL_KEY_DELIMITER +
+          Bytes.toString(perm.getFamily()) + ACL_KEY_DELIMITER +
+          Bytes.toString(perm.getQualifier()));
+      } else {
+        key = Bytes.toBytes(userName + ACL_KEY_DELIMITER +
+          Bytes.toString(perm.getFamily()));
+      }
+    } else {
+      key = Bytes.toBytes(userName);
+    }
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Removing permission for user '" + userName+ "': "+
+          perm.toString());
+    }
+    d.deleteColumns(ACL_LIST_FAMILY, key);
+    HTable acls = null;
+    try {
+      acls = new HTable(conf, ACL_TABLE_NAME);
+      acls.delete(d);
+    } finally {
+      if (acls != null) acls.close();
+    }
+  }
+
+  /**
+   * Returns {@code true} if the given region is part of the {@code _acl_}
+   * metadata table.
+   */
+  static boolean isAclRegion(HRegion region) {
+    return Bytes.equals(ACL_TABLE_NAME, region.getTableDesc().getName());
+  }
+
+  /**
+   * Loads all of the permission grants stored in a region of the {@code _acl_}
+   * table.
+   *
+   * @param aclRegion
+   * @return
+   * @throws IOException
+   */
+  static Map<byte[],ListMultimap<String,TablePermission>> loadAll(
+      HRegion aclRegion)
+    throws IOException {
+
+    if (!isAclRegion(aclRegion)) {
+      throw new IOException("Can only load permissions from "+ACL_TABLE_NAME_STR);
+    }
+
+    Map<byte[],ListMultimap<String,TablePermission>> allPerms =
+        new TreeMap<byte[],ListMultimap<String,TablePermission>>(Bytes.BYTES_COMPARATOR);
+    
+    // do a full scan of _acl_ table
+
+    Scan scan = new Scan();
+    scan.addFamily(ACL_LIST_FAMILY);
+
+    InternalScanner iScanner = null;
+    try {
+      iScanner = aclRegion.getScanner(scan);
+
+      while (true) {
+        List<KeyValue> row = new ArrayList<KeyValue>();
+
+        boolean hasNext = iScanner.next(row);
+        ListMultimap<String,TablePermission> perms = ArrayListMultimap.create();
+        byte[] table = null;
+        for (KeyValue kv : row) {
+          if (table == null) {
+            table = kv.getRow();
+          }
+          Pair<String,TablePermission> permissionsOfUserOnTable =
+              parseTablePermissionRecord(table, kv);
+          if (permissionsOfUserOnTable != null) {
+            String username = permissionsOfUserOnTable.getFirst();
+            TablePermission permissions = permissionsOfUserOnTable.getSecond();
+            perms.put(username, permissions);
+          }
+        }
+        if (table != null) {
+          allPerms.put(table, perms);
+        }
+        if (!hasNext) {
+          break;
+        }
+      }
+    } finally {
+      if (iScanner != null) {
+        iScanner.close();
+      }
+    }
+
+    return allPerms;
+  }
+
+  /**
+   * Load all permissions from the region server holding {@code _acl_},
+   * primarily intended for testing purposes.
+   */
+  static Map<byte[],ListMultimap<String,TablePermission>> loadAll(
+      Configuration conf) throws IOException {
+    Map<byte[],ListMultimap<String,TablePermission>> allPerms =
+        new TreeMap<byte[],ListMultimap<String,TablePermission>>(Bytes.BYTES_COMPARATOR);
+
+    // do a full scan of _acl_, filtering on only first table region rows
+
+    Scan scan = new Scan();
+    scan.addFamily(ACL_LIST_FAMILY);
+
+    HTable acls = null;
+    ResultScanner scanner = null;
+    try {
+      acls = new HTable(conf, ACL_TABLE_NAME);
+      scanner = acls.getScanner(scan);
+      for (Result row : scanner) {
+        ListMultimap<String,TablePermission> resultPerms =
+            parseTablePermissions(row.getRow(), row);
+        allPerms.put(row.getRow(), resultPerms);
+      }
+    } finally {
+      if (scanner != null) scanner.close();
+      if (acls != null) acls.close();
+    }
+
+    return allPerms;
+  }
+
+  /**
+   * Reads user permission assignments stored in the <code>l:</code> column
+   * family of the first table row in <code>_acl_</code>.
+   *
+   * <p>
+   * See {@link AccessControlLists class documentation} for the key structure
+   * used for storage.
+   * </p>
+   */
+  static ListMultimap<String,TablePermission> getTablePermissions(
+      Configuration conf, byte[] tableName)
+  throws IOException {
+    /* TODO: -ROOT- and .META. cannot easily be handled because they must be
+     * online before _acl_ table.  Can anything be done here?
+     */
+    if (Bytes.equals(tableName, HConstants.ROOT_TABLE_NAME) ||
+        Bytes.equals(tableName, HConstants.META_TABLE_NAME) ||
+        Bytes.equals(tableName, AccessControlLists.ACL_TABLE_NAME)) {
+      return ArrayListMultimap.create(0,0);
+    }
+
+    // for normal user tables, we just read the table row from _acl_
+    ListMultimap<String,TablePermission> perms = ArrayListMultimap.create();
+    HTable acls = null;
+    try {
+      acls = new HTable(conf, ACL_TABLE_NAME);
+      Get get = new Get(tableName);
+      get.addFamily(ACL_LIST_FAMILY);
+      Result row = acls.get(get);
+      if (!row.isEmpty()) {
+        perms = parseTablePermissions(tableName, row);
+      } else {
+        LOG.info("No permissions found in "+ACL_TABLE_NAME_STR+
+            " for table "+Bytes.toString(tableName));
+      }
+    } finally {
+      if (acls != null) acls.close();
+    }
+
+    return perms;
+  }
+
+  /**
+   * Returns the currently granted permissions for a given table as a list of
+   * user plus associated permissions.
+   */
+  static List<UserPermission> getUserPermissions(
+      Configuration conf, byte[] tableName)
+  throws IOException {
+    ListMultimap<String,TablePermission> allPerms = getTablePermissions(
+      conf, tableName);
+
+    List<UserPermission> perms = new ArrayList<UserPermission>();
+
+    for (Map.Entry<String, TablePermission> entry : allPerms.entries()) {
+      UserPermission up = new UserPermission(Bytes.toBytes(entry.getKey()),
+          entry.getValue().getTable(), entry.getValue().getFamily(),
+          entry.getValue().getQualifier(), entry.getValue().getActions());
+      perms.add(up);
+    }
+    return perms;
+  }
+
+  private static ListMultimap<String,TablePermission> parseTablePermissions(
+      byte[] table, Result result) {
+    ListMultimap<String,TablePermission> perms = ArrayListMultimap.create();
+    if (result != null && result.size() > 0) {
+      for (KeyValue kv : result.raw()) {
+
+        Pair<String,TablePermission> permissionsOfUserOnTable =
+            parseTablePermissionRecord(table, kv);
+
+        if (permissionsOfUserOnTable != null) {
+          String username = permissionsOfUserOnTable.getFirst();
+          TablePermission permissions = permissionsOfUserOnTable.getSecond();
+          perms.put(username, permissions);
+        }
+      }
+    }
+    return perms;
+  }
+
+  private static Pair<String,TablePermission> parseTablePermissionRecord(
+      byte[] table, KeyValue kv) {
+    // return X given a set of permissions encoded in the permissionRecord kv.
+    byte[] family = kv.getFamily();
+
+    if (!Bytes.equals(family, ACL_LIST_FAMILY)) {
+      return null;
+    }
+
+    byte[] key = kv.getQualifier();
+    byte[] value = kv.getValue();
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Read acl: kv ["+
+                Bytes.toStringBinary(key)+": "+
+                Bytes.toStringBinary(value)+"]");
+    }
+
+    // check for a column family appended to the key
+    // TODO: avoid the string conversion to make this more efficient
+    String username = Bytes.toString(key);
+    int idx = username.indexOf(ACL_KEY_DELIMITER);
+    byte[] permFamily = null;
+    byte[] permQualifier = null;
+    if (idx > 0 && idx < username.length()-1) {
+      String remainder = username.substring(idx+1);
+      username = username.substring(0, idx);
+      idx = remainder.indexOf(ACL_KEY_DELIMITER);
+      if (idx > 0 && idx < remainder.length()-1) {
+        permFamily = Bytes.toBytes(remainder.substring(0, idx));
+        permQualifier = Bytes.toBytes(remainder.substring(idx+1));
+      } else {
+        permFamily = Bytes.toBytes(remainder);
+      }
+    }
+
+    return new Pair<String,TablePermission>(
+        username, new TablePermission(table, permFamily, permQualifier, value));
+  }
+
+  /**
+   * Writes a set of permissions as {@link org.apache.hadoop.io.Writable} instances
+   * to the given output stream.
+   * @param out
+   * @param perms
+   * @param conf
+   * @throws IOException
+   */
+  public static void writePermissions(DataOutput out,
+      ListMultimap<String,? extends Permission> perms, Configuration conf)
+  throws IOException {
+    Set<String> keys = perms.keySet();
+    out.writeInt(keys.size());
+    for (String key : keys) {
+      Text.writeString(out, key);
+      HbaseObjectWritable.writeObject(out, perms.get(key), List.class, conf);
+    }
+  }
+
+  /**
+   * Writes a set of permissions as {@link org.apache.hadoop.io.Writable} instances
+   * and returns the resulting byte array.
+   */
+  public static byte[] writePermissionsAsBytes(
+      ListMultimap<String,? extends Permission> perms, Configuration conf) {
+    try {
+      ByteArrayOutputStream bos = new ByteArrayOutputStream();
+      writePermissions(new DataOutputStream(bos), perms, conf);
+      return bos.toByteArray();
+    } catch (IOException ioe) {
+      // shouldn't happen here
+      LOG.error("Error serializing permissions", ioe);
+    }
+    return null;
+  }
+
+  /**
+   * Reads a set of permissions as {@link org.apache.hadoop.io.Writable} instances
+   * from the input stream.
+   */
+  public static <T extends Permission> ListMultimap<String,T> readPermissions(
+      DataInput in, Configuration conf) throws IOException {
+    ListMultimap<String,T> perms = ArrayListMultimap.create();
+    int length = in.readInt();
+    for (int i=0; i<length; i++) {
+      String user = Text.readString(in);
+      List<T> userPerms =
+          (List)HbaseObjectWritable.readObject(in, conf);
+      perms.putAll(user, userPerms);
+    }
+
+    return perms;
+  }
+
+  /**
+   * Returns whether or not the given name should be interpreted as a group
+   * principal.  Currently this simply checks if the name starts with the
+   * special group prefix character ("@").
+   */
+  public static boolean isGroupPrincipal(String name) {
+    return name != null && name.startsWith(GROUP_PREFIX);
+  }
+
+  /**
+   * Returns the actual name for a group principal (stripped of the
+   * group prefix).
+   */
+  public static String getGroupName(String aclKey) {
+    if (!isGroupPrincipal(aclKey)) {
+      return aclKey;
+    }
+
+    return aclKey.substring(GROUP_PREFIX.length());
+  }
+}