You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by st...@apache.org on 2012/05/12 00:06:59 UTC
svn commit: r1337396 [2/5] - in /hbase/trunk: ./
security/src/main/java/org/apache/hadoop/hbase/security/
security/src/main/java/org/apache/hadoop/hbase/security/access/
security/src/main/java/org/apache/hadoop/hbase/security/token/
security/src/test/ ...
Added: hbase/trunk/src/main/java/org/apache/hadoop/hbase/security/HBaseSaslRpcClient.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/security/HBaseSaslRpcClient.java?rev=1337396&view=auto
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/security/HBaseSaslRpcClient.java (added)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/security/HBaseSaslRpcClient.java Fri May 11 22:06:57 2012
@@ -0,0 +1,279 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.security;
+
+import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
+import javax.security.auth.callback.Callback;
+import javax.security.auth.callback.CallbackHandler;
+import javax.security.auth.callback.NameCallback;
+import javax.security.auth.callback.PasswordCallback;
+import javax.security.auth.callback.UnsupportedCallbackException;
+import javax.security.sasl.RealmCallback;
+import javax.security.sasl.RealmChoiceCallback;
+import javax.security.sasl.Sasl;
+import javax.security.sasl.SaslException;
+import javax.security.sasl.SaslClient;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.io.WritableUtils;
+import org.apache.hadoop.ipc.RemoteException;
+import org.apache.hadoop.hbase.security.HBaseSaslRpcServer.AuthMethod;
+import org.apache.hadoop.hbase.security.HBaseSaslRpcServer.SaslStatus;
+import org.apache.hadoop.security.SaslInputStream;
+import org.apache.hadoop.security.SaslOutputStream;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.TokenIdentifier;
+
+/**
+ * A utility class that encapsulates SASL logic for RPC client.
+ * Copied from <code>org.apache.hadoop.security</code>
+ */
+public class HBaseSaslRpcClient {
+ public static final Log LOG = LogFactory.getLog(HBaseSaslRpcClient.class);
+
+ private final SaslClient saslClient;
+
+ /**
+ * Create a HBaseSaslRpcClient for an authentication method
+ *
+ * @param method
+ * the requested authentication method
+ * @param token
+ * token to use if needed by the authentication method
+ */
+ public HBaseSaslRpcClient(AuthMethod method,
+ Token<? extends TokenIdentifier> token, String serverPrincipal)
+ throws IOException {
+ switch (method) {
+ case DIGEST:
+ if (LOG.isDebugEnabled())
+ LOG.debug("Creating SASL " + AuthMethod.DIGEST.getMechanismName()
+ + " client to authenticate to service at " + token.getService());
+ saslClient = Sasl.createSaslClient(new String[] { AuthMethod.DIGEST
+ .getMechanismName() }, null, null, HBaseSaslRpcServer.SASL_DEFAULT_REALM,
+ HBaseSaslRpcServer.SASL_PROPS, new SaslClientCallbackHandler(token));
+ break;
+ case KERBEROS:
+ if (LOG.isDebugEnabled()) {
+ LOG
+ .debug("Creating SASL " + AuthMethod.KERBEROS.getMechanismName()
+ + " client. Server's Kerberos principal name is "
+ + serverPrincipal);
+ }
+ if (serverPrincipal == null || serverPrincipal.length() == 0) {
+ throw new IOException(
+ "Failed to specify server's Kerberos principal name");
+ }
+ String names[] = HBaseSaslRpcServer.splitKerberosName(serverPrincipal);
+ if (names.length != 3) {
+ throw new IOException(
+ "Kerberos principal does not have the expected format: "
+ + serverPrincipal);
+ }
+ saslClient = Sasl.createSaslClient(new String[] { AuthMethod.KERBEROS
+ .getMechanismName() }, null, names[0], names[1],
+ HBaseSaslRpcServer.SASL_PROPS, null);
+ break;
+ default:
+ throw new IOException("Unknown authentication method " + method);
+ }
+ if (saslClient == null)
+ throw new IOException("Unable to find SASL client implementation");
+ }
+
+ private static void readStatus(DataInputStream inStream) throws IOException {
+ int status = inStream.readInt(); // read status
+ if (status != SaslStatus.SUCCESS.state) {
+ throw new RemoteException(WritableUtils.readString(inStream),
+ WritableUtils.readString(inStream));
+ }
+ }
+
+ /**
+ * Do client side SASL authentication with server via the given InputStream
+ * and OutputStream
+ *
+ * @param inS
+ * InputStream to use
+ * @param outS
+ * OutputStream to use
+ * @return true if connection is set up, or false if needs to switch
+ * to simple Auth.
+ * @throws IOException
+ */
+ public boolean saslConnect(InputStream inS, OutputStream outS)
+ throws IOException {
+ DataInputStream inStream = new DataInputStream(new BufferedInputStream(inS));
+ DataOutputStream outStream = new DataOutputStream(new BufferedOutputStream(
+ outS));
+
+ try {
+ byte[] saslToken = new byte[0];
+ if (saslClient.hasInitialResponse())
+ saslToken = saslClient.evaluateChallenge(saslToken);
+ if (saslToken != null) {
+ outStream.writeInt(saslToken.length);
+ outStream.write(saslToken, 0, saslToken.length);
+ outStream.flush();
+ if (LOG.isDebugEnabled())
+ LOG.debug("Have sent token of size " + saslToken.length
+ + " from initSASLContext.");
+ }
+ if (!saslClient.isComplete()) {
+ readStatus(inStream);
+ int len = inStream.readInt();
+ if (len == HBaseSaslRpcServer.SWITCH_TO_SIMPLE_AUTH) {
+ if (LOG.isDebugEnabled())
+ LOG.debug("Server asks us to fall back to simple auth.");
+ saslClient.dispose();
+ return false;
+ }
+ saslToken = new byte[len];
+ if (LOG.isDebugEnabled())
+ LOG.debug("Will read input token of size " + saslToken.length
+ + " for processing by initSASLContext");
+ inStream.readFully(saslToken);
+ }
+
+ while (!saslClient.isComplete()) {
+ saslToken = saslClient.evaluateChallenge(saslToken);
+ if (saslToken != null) {
+ if (LOG.isDebugEnabled())
+ LOG.debug("Will send token of size " + saslToken.length
+ + " from initSASLContext.");
+ outStream.writeInt(saslToken.length);
+ outStream.write(saslToken, 0, saslToken.length);
+ outStream.flush();
+ }
+ if (!saslClient.isComplete()) {
+ readStatus(inStream);
+ saslToken = new byte[inStream.readInt()];
+ if (LOG.isDebugEnabled())
+ LOG.debug("Will read input token of size " + saslToken.length
+ + " for processing by initSASLContext");
+ inStream.readFully(saslToken);
+ }
+ }
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("SASL client context established. Negotiated QoP: "
+ + saslClient.getNegotiatedProperty(Sasl.QOP));
+ }
+ return true;
+ } catch (IOException e) {
+ try {
+ saslClient.dispose();
+ } catch (SaslException ignored) {
+ // ignore further exceptions during cleanup
+ }
+ throw e;
+ }
+ }
+
+ /**
+ * Get a SASL wrapped InputStream. Can be called only after saslConnect() has
+ * been called.
+ *
+ * @param in
+ * the InputStream to wrap
+ * @return a SASL wrapped InputStream
+ * @throws IOException
+ */
+ public InputStream getInputStream(InputStream in) throws IOException {
+ if (!saslClient.isComplete()) {
+ throw new IOException("Sasl authentication exchange hasn't completed yet");
+ }
+ return new SaslInputStream(in, saslClient);
+ }
+
+ /**
+ * Get a SASL wrapped OutputStream. Can be called only after saslConnect() has
+ * been called.
+ *
+ * @param out
+ * the OutputStream to wrap
+ * @return a SASL wrapped OutputStream
+ * @throws IOException
+ */
+ public OutputStream getOutputStream(OutputStream out) throws IOException {
+ if (!saslClient.isComplete()) {
+ throw new IOException("Sasl authentication exchange hasn't completed yet");
+ }
+ return new SaslOutputStream(out, saslClient);
+ }
+
+ /** Release resources used by wrapped saslClient */
+ public void dispose() throws SaslException {
+ saslClient.dispose();
+ }
+
+ private static class SaslClientCallbackHandler implements CallbackHandler {
+ private final String userName;
+ private final char[] userPassword;
+
+ public SaslClientCallbackHandler(Token<? extends TokenIdentifier> token) {
+ this.userName = HBaseSaslRpcServer.encodeIdentifier(token.getIdentifier());
+ this.userPassword = HBaseSaslRpcServer.encodePassword(token.getPassword());
+ }
+
+ public void handle(Callback[] callbacks)
+ throws UnsupportedCallbackException {
+ NameCallback nc = null;
+ PasswordCallback pc = null;
+ RealmCallback rc = null;
+ for (Callback callback : callbacks) {
+ if (callback instanceof RealmChoiceCallback) {
+ continue;
+ } else if (callback instanceof NameCallback) {
+ nc = (NameCallback) callback;
+ } else if (callback instanceof PasswordCallback) {
+ pc = (PasswordCallback) callback;
+ } else if (callback instanceof RealmCallback) {
+ rc = (RealmCallback) callback;
+ } else {
+ throw new UnsupportedCallbackException(callback,
+ "Unrecognized SASL client callback");
+ }
+ }
+ if (nc != null) {
+ if (LOG.isDebugEnabled())
+ LOG.debug("SASL client callback: setting username: " + userName);
+ nc.setName(userName);
+ }
+ if (pc != null) {
+ if (LOG.isDebugEnabled())
+ LOG.debug("SASL client callback: setting userPassword");
+ pc.setPassword(userPassword);
+ }
+ if (rc != null) {
+ if (LOG.isDebugEnabled())
+ LOG.debug("SASL client callback: setting realm: "
+ + rc.getDefaultText());
+ rc.setText(rc.getDefaultText());
+ }
+ }
+ }
+}
Added: hbase/trunk/src/main/java/org/apache/hadoop/hbase/security/HBaseSaslRpcServer.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/security/HBaseSaslRpcServer.java?rev=1337396&view=auto
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/security/HBaseSaslRpcServer.java (added)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/security/HBaseSaslRpcServer.java Fri May 11 22:06:57 2012
@@ -0,0 +1,278 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.security;
+
+import java.io.ByteArrayInputStream;
+import java.io.DataInput;
+import java.io.DataInputStream;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.Map;
+import java.util.TreeMap;
+
+import javax.security.auth.callback.Callback;
+import javax.security.auth.callback.CallbackHandler;
+import javax.security.auth.callback.NameCallback;
+import javax.security.auth.callback.PasswordCallback;
+import javax.security.auth.callback.UnsupportedCallbackException;
+import javax.security.sasl.AuthorizeCallback;
+import javax.security.sasl.RealmCallback;
+import javax.security.sasl.Sasl;
+
+import org.apache.commons.codec.binary.Base64;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.ipc.HBaseServer;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.SecretManager;
+import org.apache.hadoop.security.token.TokenIdentifier;
+import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod;
+import org.apache.hadoop.security.token.SecretManager.InvalidToken;
+
+/**
+ * A utility class for dealing with SASL on RPC server
+ */
+public class HBaseSaslRpcServer {
+ public static final Log LOG = LogFactory.getLog(HBaseSaslRpcServer.class);
+ public static final String SASL_DEFAULT_REALM = "default";
+ public static final Map<String, String> SASL_PROPS =
+ new TreeMap<String, String>();
+
+ public static final int SWITCH_TO_SIMPLE_AUTH = -88;
+
+ public static enum QualityOfProtection {
+ AUTHENTICATION("auth"),
+ INTEGRITY("auth-int"),
+ PRIVACY("auth-conf");
+
+ public final String saslQop;
+
+ private QualityOfProtection(String saslQop) {
+ this.saslQop = saslQop;
+ }
+
+ public String getSaslQop() {
+ return saslQop;
+ }
+ }
+
+ public static void init(Configuration conf) {
+ QualityOfProtection saslQOP = QualityOfProtection.AUTHENTICATION;
+ String rpcProtection = conf.get("hbase.rpc.protection",
+ QualityOfProtection.AUTHENTICATION.name().toLowerCase());
+ if (QualityOfProtection.INTEGRITY.name().toLowerCase()
+ .equals(rpcProtection)) {
+ saslQOP = QualityOfProtection.INTEGRITY;
+ } else if (QualityOfProtection.PRIVACY.name().toLowerCase().equals(
+ rpcProtection)) {
+ saslQOP = QualityOfProtection.PRIVACY;
+ }
+
+ SASL_PROPS.put(Sasl.QOP, saslQOP.getSaslQop());
+ SASL_PROPS.put(Sasl.SERVER_AUTH, "true");
+ }
+
+ static String encodeIdentifier(byte[] identifier) {
+ return new String(Base64.encodeBase64(identifier));
+ }
+
+ static byte[] decodeIdentifier(String identifier) {
+ return Base64.decodeBase64(identifier.getBytes());
+ }
+
+ public static <T extends TokenIdentifier> T getIdentifier(String id,
+ SecretManager<T> secretManager) throws InvalidToken {
+ byte[] tokenId = decodeIdentifier(id);
+ T tokenIdentifier = secretManager.createIdentifier();
+ try {
+ tokenIdentifier.readFields(new DataInputStream(new ByteArrayInputStream(
+ tokenId)));
+ } catch (IOException e) {
+ throw (InvalidToken) new InvalidToken(
+ "Can't de-serialize tokenIdentifier").initCause(e);
+ }
+ return tokenIdentifier;
+ }
+
+ static char[] encodePassword(byte[] password) {
+ return new String(Base64.encodeBase64(password)).toCharArray();
+ }
+
+ /** Splitting fully qualified Kerberos name into parts */
+ public static String[] splitKerberosName(String fullName) {
+ return fullName.split("[/@]");
+ }
+
+ public enum SaslStatus {
+ SUCCESS (0),
+ ERROR (1);
+
+ public final int state;
+ private SaslStatus(int state) {
+ this.state = state;
+ }
+ }
+
+ /** Authentication method */
+ public static enum AuthMethod {
+ SIMPLE((byte) 80, "", AuthenticationMethod.SIMPLE),
+ KERBEROS((byte) 81, "GSSAPI", AuthenticationMethod.KERBEROS),
+ DIGEST((byte) 82, "DIGEST-MD5", AuthenticationMethod.TOKEN);
+
+ /** The code for this method. */
+ public final byte code;
+ public final String mechanismName;
+ public final AuthenticationMethod authenticationMethod;
+
+ private AuthMethod(byte code, String mechanismName,
+ AuthenticationMethod authMethod) {
+ this.code = code;
+ this.mechanismName = mechanismName;
+ this.authenticationMethod = authMethod;
+ }
+
+ private static final int FIRST_CODE = values()[0].code;
+
+ /** Return the object represented by the code. */
+ private static AuthMethod valueOf(byte code) {
+ final int i = (code & 0xff) - FIRST_CODE;
+ return i < 0 || i >= values().length ? null : values()[i];
+ }
+
+ /** Return the SASL mechanism name */
+ public String getMechanismName() {
+ return mechanismName;
+ }
+
+ /** Read from in */
+ public static AuthMethod read(DataInput in) throws IOException {
+ return valueOf(in.readByte());
+ }
+
+ /** Write to out */
+ public void write(DataOutput out) throws IOException {
+ out.write(code);
+ }
+ };
+
+ /** CallbackHandler for SASL DIGEST-MD5 mechanism */
+ public static class SaslDigestCallbackHandler implements CallbackHandler {
+ private SecretManager<TokenIdentifier> secretManager;
+ private HBaseServer.Connection connection;
+
+ public SaslDigestCallbackHandler(
+ SecretManager<TokenIdentifier> secretManager,
+ HBaseServer.Connection connection) {
+ this.secretManager = secretManager;
+ this.connection = connection;
+ }
+
+ private char[] getPassword(TokenIdentifier tokenid) throws InvalidToken {
+ return encodePassword(secretManager.retrievePassword(tokenid));
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public void handle(Callback[] callbacks) throws InvalidToken,
+ UnsupportedCallbackException {
+ NameCallback nc = null;
+ PasswordCallback pc = null;
+ AuthorizeCallback ac = null;
+ for (Callback callback : callbacks) {
+ if (callback instanceof AuthorizeCallback) {
+ ac = (AuthorizeCallback) callback;
+ } else if (callback instanceof NameCallback) {
+ nc = (NameCallback) callback;
+ } else if (callback instanceof PasswordCallback) {
+ pc = (PasswordCallback) callback;
+ } else if (callback instanceof RealmCallback) {
+ continue; // realm is ignored
+ } else {
+ throw new UnsupportedCallbackException(callback,
+ "Unrecognized SASL DIGEST-MD5 Callback");
+ }
+ }
+ if (pc != null) {
+ TokenIdentifier tokenIdentifier = getIdentifier(nc.getDefaultName(), secretManager);
+ char[] password = getPassword(tokenIdentifier);
+ UserGroupInformation user = null;
+ user = tokenIdentifier.getUser(); // may throw exception
+ connection.attemptingUser = user;
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("SASL server DIGEST-MD5 callback: setting password "
+ + "for client: " + tokenIdentifier.getUser());
+ }
+ pc.setPassword(password);
+ }
+ if (ac != null) {
+ String authid = ac.getAuthenticationID();
+ String authzid = ac.getAuthorizationID();
+ if (authid.equals(authzid)) {
+ ac.setAuthorized(true);
+ } else {
+ ac.setAuthorized(false);
+ }
+ if (ac.isAuthorized()) {
+ if (LOG.isDebugEnabled()) {
+ String username =
+ getIdentifier(authzid, secretManager).getUser().getUserName();
+ LOG.debug("SASL server DIGEST-MD5 callback: setting "
+ + "canonicalized client ID: " + username);
+ }
+ ac.setAuthorizedID(authzid);
+ }
+ }
+ }
+ }
+
+ /** CallbackHandler for SASL GSSAPI Kerberos mechanism */
+ public static class SaslGssCallbackHandler implements CallbackHandler {
+
+ /** {@inheritDoc} */
+ @Override
+ public void handle(Callback[] callbacks) throws
+ UnsupportedCallbackException {
+ AuthorizeCallback ac = null;
+ for (Callback callback : callbacks) {
+ if (callback instanceof AuthorizeCallback) {
+ ac = (AuthorizeCallback) callback;
+ } else {
+ throw new UnsupportedCallbackException(callback,
+ "Unrecognized SASL GSSAPI Callback");
+ }
+ }
+ if (ac != null) {
+ String authid = ac.getAuthenticationID();
+ String authzid = ac.getAuthorizationID();
+ if (authid.equals(authzid)) {
+ ac.setAuthorized(true);
+ } else {
+ ac.setAuthorized(false);
+ }
+ if (ac.isAuthorized()) {
+ if (LOG.isDebugEnabled())
+ LOG.debug("SASL server GSSAPI callback: setting "
+ + "canonicalized client ID: " + authzid);
+ ac.setAuthorizedID(authzid);
+ }
+ }
+ }
+ }
+}
Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/security/User.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/security/User.java?rev=1337396&r1=1337395&r2=1337396&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/security/User.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/security/User.java Fri May 11 22:06:57 2012
@@ -22,17 +22,14 @@ package org.apache.hadoop.hbase.security
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.HBaseConfiguration;
-import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.ConnectionHeader;
-import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.UserInformation;
import org.apache.hadoop.hbase.util.Methods;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.security.UserGroupInformation;
import java.io.IOException;
-import java.lang.reflect.Constructor;
import java.lang.reflect.UndeclaredThrowableException;
import java.security.PrivilegedAction;
import java.security.PrivilegedExceptionAction;
@@ -50,24 +47,12 @@ import org.apache.commons.logging.Log;
* HBase, but can be extended as needs change.
* </p>
*/
-@InterfaceAudience.Private
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
public abstract class User {
public static final String HBASE_SECURITY_CONF_KEY =
"hbase.security.authentication";
- /**
- * Flag to differentiate between API-incompatible changes to
- * {@link org.apache.hadoop.security.UserGroupInformation} between vanilla
- * Hadoop 0.20.x and secure Hadoop 0.20+.
- */
- private static boolean IS_SECURE_HADOOP = true;
- static {
- try {
- UserGroupInformation.class.getMethod("isSecurityEnabled");
- } catch (NoSuchMethodException nsme) {
- IS_SECURE_HADOOP = false;
- }
- }
private static Log LOG = LogFactory.getLog(User.class);
protected UserGroupInformation ugi;
@@ -138,12 +123,7 @@ public abstract class User {
* Returns the {@code User} instance within current execution context.
*/
public static User getCurrent() throws IOException {
- User user;
- if (IS_SECURE_HADOOP) {
- user = new SecureHadoopUser();
- } else {
- user = new HadoopUser();
- }
+ User user = new SecureHadoopUser();
if (user.getUGI() == null) {
return null;
}
@@ -159,38 +139,7 @@ public abstract class User {
if (ugi == null) {
return null;
}
-
- if (IS_SECURE_HADOOP) {
- return new SecureHadoopUser(ugi);
- }
- return new HadoopUser(ugi);
- }
-
- public static User createUser(ConnectionHeader head) {
- UserGroupInformation ugi = null;
-
- if (!head.hasUserInfo()) {
- return create(null);
- }
- UserInformation userInfoProto = head.getUserInfo();
- String effectiveUser = null;
- if (userInfoProto.hasEffectiveUser()) {
- effectiveUser = userInfoProto.getEffectiveUser();
- }
- String realUser = null;
- if (userInfoProto.hasRealUser()) {
- realUser = userInfoProto.getRealUser();
- }
- if (effectiveUser != null) {
- if (realUser != null) {
- UserGroupInformation realUserUgi =
- UserGroupInformation.createRemoteUser(realUser);
- ugi = UserGroupInformation.createProxyUser(effectiveUser, realUserUgi);
- } else {
- ugi = UserGroupInformation.createRemoteUser(effectiveUser);
- }
- }
- return create(ugi);
+ return new SecureHadoopUser(ugi);
}
/**
@@ -201,10 +150,7 @@ public abstract class User {
*/
public static User createUserForTesting(Configuration conf,
String name, String[] groups) {
- if (IS_SECURE_HADOOP) {
- return SecureHadoopUser.createUserForTesting(conf, name, groups);
- }
- return HadoopUser.createUserForTesting(conf, name, groups);
+ return SecureHadoopUser.createUserForTesting(conf, name, groups);
}
/**
@@ -225,11 +171,7 @@ public abstract class User {
*/
public static void login(Configuration conf, String fileConfKey,
String principalConfKey, String localhost) throws IOException {
- if (IS_SECURE_HADOOP) {
- SecureHadoopUser.login(conf, fileConfKey, principalConfKey, localhost);
- } else {
- HadoopUser.login(conf, fileConfKey, principalConfKey, localhost);
- }
+ SecureHadoopUser.login(conf, fileConfKey, principalConfKey, localhost);
}
/**
@@ -239,11 +181,7 @@ public abstract class User {
* {@code UserGroupInformation.isSecurityEnabled()}.
*/
public static boolean isSecurityEnabled() {
- if (IS_SECURE_HADOOP) {
- return SecureHadoopUser.isSecurityEnabled();
- } else {
- return HadoopUser.isSecurityEnabled();
- }
+ return SecureHadoopUser.isSecurityEnabled();
}
/**
@@ -258,160 +196,6 @@ public abstract class User {
/* Concrete implementations */
/**
- * Bridges {@link User} calls to invocations of the appropriate methods
- * in {@link org.apache.hadoop.security.UserGroupInformation} in regular
- * Hadoop 0.20 (ASF Hadoop and other versions without the backported security
- * features).
- */
- private static class HadoopUser extends User {
-
- private HadoopUser() {
- try {
- ugi = (UserGroupInformation) callStatic("getCurrentUGI");
- if (ugi == null) {
- // Secure Hadoop UGI will perform an implicit login if the current
- // user is null. Emulate the same behavior here for consistency
- Configuration conf = HBaseConfiguration.create();
- ugi = (UserGroupInformation) callStatic("login",
- new Class[]{ Configuration.class }, new Object[]{ conf });
- if (ugi != null) {
- callStatic("setCurrentUser",
- new Class[]{ UserGroupInformation.class }, new Object[]{ ugi });
- }
- }
- } catch (RuntimeException re) {
- throw re;
- } catch (Exception e) {
- throw new UndeclaredThrowableException(e,
- "Unexpected exception HadoopUser<init>");
- }
- }
-
- private HadoopUser(UserGroupInformation ugi) {
- this.ugi = ugi;
- }
-
- @Override
- public String getShortName() {
- return ugi != null ? ugi.getUserName() : null;
- }
-
- @Override
- public <T> T runAs(PrivilegedAction<T> action) {
- T result = null;
- UserGroupInformation previous = null;
- try {
- previous = (UserGroupInformation) callStatic("getCurrentUGI");
- try {
- if (ugi != null) {
- callStatic("setCurrentUser", new Class[]{UserGroupInformation.class},
- new Object[]{ugi});
- }
- result = action.run();
- } finally {
- callStatic("setCurrentUser", new Class[]{UserGroupInformation.class},
- new Object[]{previous});
- }
- } catch (RuntimeException re) {
- throw re;
- } catch (Exception e) {
- throw new UndeclaredThrowableException(e,
- "Unexpected exception in runAs()");
- }
- return result;
- }
-
- @Override
- public <T> T runAs(PrivilegedExceptionAction<T> action)
- throws IOException, InterruptedException {
- T result = null;
- try {
- UserGroupInformation previous =
- (UserGroupInformation) callStatic("getCurrentUGI");
- try {
- if (ugi != null) {
- callStatic("setCurrentUGI", new Class[]{UserGroupInformation.class},
- new Object[]{ugi});
- }
- result = action.run();
- } finally {
- callStatic("setCurrentUGI", new Class[]{UserGroupInformation.class},
- new Object[]{previous});
- }
- } catch (Exception e) {
- if (e instanceof IOException) {
- throw (IOException)e;
- } else if (e instanceof InterruptedException) {
- throw (InterruptedException)e;
- } else if (e instanceof RuntimeException) {
- throw (RuntimeException)e;
- } else {
- throw new UndeclaredThrowableException(e, "Unknown exception in runAs()");
- }
- }
- return result;
- }
-
- @Override
- public void obtainAuthTokenForJob(Configuration conf, Job job)
- throws IOException, InterruptedException {
- // this is a no-op. token creation is only supported for kerberos
- // authenticated clients
- }
-
- @Override
- public void obtainAuthTokenForJob(JobConf job)
- throws IOException, InterruptedException {
- // this is a no-op. token creation is only supported for kerberos
- // authenticated clients
- }
-
- /** @see User#createUserForTesting(org.apache.hadoop.conf.Configuration, String, String[]) */
- public static User createUserForTesting(Configuration conf,
- String name, String[] groups) {
- try {
- Class c = Class.forName("org.apache.hadoop.security.UnixUserGroupInformation");
- Constructor constructor = c.getConstructor(String.class, String[].class);
- if (constructor == null) {
- throw new NullPointerException(
- );
- }
- UserGroupInformation newUser =
- (UserGroupInformation)constructor.newInstance(name, groups);
- // set user in configuration -- hack for regular hadoop
- conf.set("hadoop.job.ugi", newUser.toString());
- return new HadoopUser(newUser);
- } catch (ClassNotFoundException cnfe) {
- throw new RuntimeException(
- "UnixUserGroupInformation not found, is this secure Hadoop?", cnfe);
- } catch (NoSuchMethodException nsme) {
- throw new RuntimeException(
- "No valid constructor found for UnixUserGroupInformation!", nsme);
- } catch (RuntimeException re) {
- throw re;
- } catch (Exception e) {
- throw new UndeclaredThrowableException(e,
- "Unexpected exception instantiating new UnixUserGroupInformation");
- }
- }
-
- /**
- * No-op since we're running on a version of Hadoop that doesn't support
- * logins.
- * @see User#login(org.apache.hadoop.conf.Configuration, String, String, String)
- */
- public static void login(Configuration conf, String fileConfKey,
- String principalConfKey, String localhost) throws IOException {
- LOG.info("Skipping login, not running on secure Hadoop");
- }
-
- /** Always returns {@code false}. */
- public static boolean isSecurityEnabled() {
- return false;
- }
- }
-
- /**
* Bridges {@code User} invocations to underlying calls to
* {@link org.apache.hadoop.security.UserGroupInformation} for secure Hadoop
* 0.20 and versions 0.21 and above.
Added: hbase/trunk/src/main/java/org/apache/hadoop/hbase/security/access/AccessControlFilter.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/security/access/AccessControlFilter.java?rev=1337396&view=auto
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/security/access/AccessControlFilter.java (added)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/security/access/AccessControlFilter.java Fri May 11 22:06:57 2012
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.security.access;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.filter.FilterBase;
+import org.apache.hadoop.hbase.security.User;
+
+/**
+ * <strong>NOTE: for internal use only by AccessController implementation</strong>
+ *
+ * <p>
+ * TODO: There is room for further performance optimization here.
+ * Calling TableAuthManager.authorize() per KeyValue imposes a fair amount of
+ * overhead. A more optimized solution might look at the qualifiers where
+ * permissions are actually granted and explicitly limit the scan to those.
+ * </p>
+ * <p>
+ * We should aim to use this _only_ when access to the requested column families
+ * is not granted at the column family levels. If table or column family
+ * access succeeds, then there is no need to impose the overhead of this filter.
+ * </p>
+ */
+class AccessControlFilter extends FilterBase {
+
+ private TableAuthManager authManager;
+ private byte[] table;
+ private User user;
+
+ /**
+ * For Writable
+ */
+ AccessControlFilter() {
+ }
+
+ AccessControlFilter(TableAuthManager mgr, User ugi,
+ byte[] tableName) {
+ authManager = mgr;
+ table = tableName;
+ user = ugi;
+ }
+
+ @Override
+ public ReturnCode filterKeyValue(KeyValue kv) {
+ if (authManager.authorize(user, table, kv, TablePermission.Action.READ)) {
+ return ReturnCode.INCLUDE;
+ }
+ return ReturnCode.NEXT_COL;
+ }
+
+ @Override
+ public void write(DataOutput dataOutput) throws IOException {
+ // no implementation, server-side use only
+ throw new UnsupportedOperationException(
+ "Serialization not supported. Intended for server-side use only.");
+ }
+
+ @Override
+ public void readFields(DataInput dataInput) throws IOException {
+ // no implementation, server-side use only
+ throw new UnsupportedOperationException(
+ "Serialization not supported. Intended for server-side use only.");
+ }
+}
Added: hbase/trunk/src/main/java/org/apache/hadoop/hbase/security/access/AccessControlLists.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/security/access/AccessControlLists.java?rev=1337396&view=auto
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/security/access/AccessControlLists.java (added)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/security/access/AccessControlLists.java Fri May 11 22:06:57 2012
@@ -0,0 +1,514 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.security.access;
+
+import com.google.common.collect.ArrayListMultimap;
+import com.google.common.collect.ListMultimap;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.catalog.MetaReader;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.io.HbaseObjectWritable;
+import org.apache.hadoop.hbase.io.hfile.Compression;
+import org.apache.hadoop.hbase.master.MasterServices;
+import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.regionserver.InternalScanner;
+import org.apache.hadoop.hbase.regionserver.StoreFile;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Pair;
+import org.apache.hadoop.io.Text;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.*;
+
+/**
+ * Maintains lists of permission grants to users and groups to allow for
+ * authorization checks by {@link AccessController}.
+ *
+ * <p>
+ * Access control lists are stored in an "internal" metadata table named
+ * {@code _acl_}. Each table's permission grants are stored as a separate row,
+ * keyed by the table name. KeyValues for permissions assignments are stored
+ * in one of the formats:
+ * <pre>
+ * Key Desc
+ * -------- --------
+ * user table level permissions for a user [R=read, W=write]
+ * @group table level permissions for a group
+ * user,family column family level permissions for a user
+ * @group,family column family level permissions for a group
+ * user,family,qualifier column qualifier level permissions for a user
+ * @group,family,qualifier column qualifier level permissions for a group
+ * </pre>
+ * All values are encoded as byte arrays containing the codes from the
+ * {@link org.apache.hadoop.hbase.security.access.TablePermission.Action} enum.
+ * </p>
+ */
+public class AccessControlLists {
+ /** Internal storage table for access control lists */
+ public static final String ACL_TABLE_NAME_STR = "_acl_";
+ public static final byte[] ACL_TABLE_NAME = Bytes.toBytes(ACL_TABLE_NAME_STR);
+ /** Column family used to store ACL grants */
+ public static final String ACL_LIST_FAMILY_STR = "l";
+ public static final byte[] ACL_LIST_FAMILY = Bytes.toBytes(ACL_LIST_FAMILY_STR);
+
+ /** Table descriptor for ACL internal table */
+ public static final HTableDescriptor ACL_TABLEDESC = new HTableDescriptor(
+ ACL_TABLE_NAME);
+ static {
+ ACL_TABLEDESC.addFamily(
+ new HColumnDescriptor(ACL_LIST_FAMILY,
+ 10, // Ten is arbitrary number. Keep versions to help debugging.
+ Compression.Algorithm.NONE.getName(), true, true, 8 * 1024,
+ HConstants.FOREVER, StoreFile.BloomType.NONE.toString(),
+ HConstants.REPLICATION_SCOPE_LOCAL));
+ }
+
+ /**
+ * Delimiter to separate user, column family, and qualifier in
+ * _acl_ table info: column keys */
+ public static final char ACL_KEY_DELIMITER = ',';
+ /** Prefix character to denote group names */
+ public static final String GROUP_PREFIX = "@";
+ /** Configuration key for superusers */
+ public static final String SUPERUSER_CONF_KEY = "hbase.superuser";
+
+ private static Log LOG = LogFactory.getLog(AccessControlLists.class);
+
+ /**
+ * Check for existence of {@code _acl_} table and create it if it does not exist
+ * @param master reference to HMaster
+ */
+ static void init(MasterServices master) throws IOException {
+ if (!MetaReader.tableExists(master.getCatalogTracker(), ACL_TABLE_NAME_STR)) {
+ master.createTable(ACL_TABLEDESC, null);
+ }
+ }
+
+ /**
+ * Stores a new table permission grant in the access control lists table.
+ * @param conf the configuration
+ * @param tableName the table to which access is being granted
+ * @param username the user or group being granted the permission
+ * @param perm the details of the permission being granted
+ * @throws IOException in the case of an error accessing the metadata table
+ */
+ static void addTablePermission(Configuration conf,
+ byte[] tableName, String username, TablePermission perm)
+ throws IOException {
+
+ Put p = new Put(tableName);
+ byte[] key = Bytes.toBytes(username);
+ if (perm.getFamily() != null && perm.getFamily().length > 0) {
+ key = Bytes.add(key,
+ Bytes.add(new byte[]{ACL_KEY_DELIMITER}, perm.getFamily()));
+ if (perm.getQualifier() != null && perm.getQualifier().length > 0) {
+ key = Bytes.add(key,
+ Bytes.add(new byte[]{ACL_KEY_DELIMITER}, perm.getQualifier()));
+ }
+ }
+
+ TablePermission.Action[] actions = perm.getActions();
+ if ((actions == null) || (actions.length == 0)) {
+ LOG.warn("No actions associated with user '"+username+"'");
+ return;
+ }
+
+ byte[] value = new byte[actions.length];
+ for (int i = 0; i < actions.length; i++) {
+ value[i] = actions[i].code();
+ }
+ p.add(ACL_LIST_FAMILY, key, value);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Writing permission for table "+
+ Bytes.toString(tableName)+" "+
+ Bytes.toString(key)+": "+Bytes.toStringBinary(value)
+ );
+ }
+ HTable acls = null;
+ try {
+ acls = new HTable(conf, ACL_TABLE_NAME);
+ acls.put(p);
+ } finally {
+ if (acls != null) acls.close();
+ }
+ }
+
+ /**
+ * Removes a previously granted permission from the stored access control
+ * lists. The {@link TablePermission} being removed must exactly match what
+ * is stored -- no wildcard matching is attempted. Ie, if user "bob" has
+ * been granted "READ" access to the "data" table, but only to column family
+ * plus qualifier "info:colA", then trying to call this method with only
+ * user "bob" and the table name "data" (but without specifying the
+ * column qualifier "info:colA") will have no effect.
+ *
+ * @param conf the configuration
+ * @param tableName the table of the current permission grant
+ * @param userName the user or group currently granted the permission
+ * @param perm the details of the permission to be revoked
+ * @throws IOException if there is an error accessing the metadata table
+ */
+ static void removeTablePermission(Configuration conf,
+ byte[] tableName, String userName, TablePermission perm)
+ throws IOException {
+
+ Delete d = new Delete(tableName);
+ byte[] key = null;
+ if (perm.getFamily() != null && perm.getFamily().length > 0) {
+ key = Bytes.toBytes(userName + ACL_KEY_DELIMITER +
+ Bytes.toString(perm.getFamily()));
+ if (perm.getQualifier() != null && perm.getQualifier().length > 0) {
+ key = Bytes.toBytes(userName + ACL_KEY_DELIMITER +
+ Bytes.toString(perm.getFamily()) + ACL_KEY_DELIMITER +
+ Bytes.toString(perm.getQualifier()));
+ } else {
+ key = Bytes.toBytes(userName + ACL_KEY_DELIMITER +
+ Bytes.toString(perm.getFamily()));
+ }
+ } else {
+ key = Bytes.toBytes(userName);
+ }
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Removing permission for user '" + userName+ "': "+
+ perm.toString());
+ }
+ d.deleteColumns(ACL_LIST_FAMILY, key);
+ HTable acls = null;
+ try {
+ acls = new HTable(conf, ACL_TABLE_NAME);
+ acls.delete(d);
+ } finally {
+ if (acls != null) acls.close();
+ }
+ }
+
+ /**
+ * Returns {@code true} if the given region is part of the {@code _acl_}
+ * metadata table.
+ */
+ static boolean isAclRegion(HRegion region) {
+ return Bytes.equals(ACL_TABLE_NAME, region.getTableDesc().getName());
+ }
+
+ /**
+ * Loads all of the permission grants stored in a region of the {@code _acl_}
+ * table.
+ *
+ * @param aclRegion
+ * @return
+ * @throws IOException
+ */
+ static Map<byte[],ListMultimap<String,TablePermission>> loadAll(
+ HRegion aclRegion)
+ throws IOException {
+
+ if (!isAclRegion(aclRegion)) {
+ throw new IOException("Can only load permissions from "+ACL_TABLE_NAME_STR);
+ }
+
+ Map<byte[],ListMultimap<String,TablePermission>> allPerms =
+ new TreeMap<byte[],ListMultimap<String,TablePermission>>(Bytes.BYTES_COMPARATOR);
+
+ // do a full scan of _acl_ table
+
+ Scan scan = new Scan();
+ scan.addFamily(ACL_LIST_FAMILY);
+
+ InternalScanner iScanner = null;
+ try {
+ iScanner = aclRegion.getScanner(scan);
+
+ while (true) {
+ List<KeyValue> row = new ArrayList<KeyValue>();
+
+ boolean hasNext = iScanner.next(row);
+ ListMultimap<String,TablePermission> perms = ArrayListMultimap.create();
+ byte[] table = null;
+ for (KeyValue kv : row) {
+ if (table == null) {
+ table = kv.getRow();
+ }
+ Pair<String,TablePermission> permissionsOfUserOnTable =
+ parseTablePermissionRecord(table, kv);
+ if (permissionsOfUserOnTable != null) {
+ String username = permissionsOfUserOnTable.getFirst();
+ TablePermission permissions = permissionsOfUserOnTable.getSecond();
+ perms.put(username, permissions);
+ }
+ }
+ if (table != null) {
+ allPerms.put(table, perms);
+ }
+ if (!hasNext) {
+ break;
+ }
+ }
+ } finally {
+ if (iScanner != null) {
+ iScanner.close();
+ }
+ }
+
+ return allPerms;
+ }
+
+ /**
+ * Load all permissions from the region server holding {@code _acl_},
+ * primarily intended for testing purposes.
+ */
+ static Map<byte[],ListMultimap<String,TablePermission>> loadAll(
+ Configuration conf) throws IOException {
+ Map<byte[],ListMultimap<String,TablePermission>> allPerms =
+ new TreeMap<byte[],ListMultimap<String,TablePermission>>(Bytes.BYTES_COMPARATOR);
+
+ // do a full scan of _acl_, filtering on only first table region rows
+
+ Scan scan = new Scan();
+ scan.addFamily(ACL_LIST_FAMILY);
+
+ HTable acls = null;
+ ResultScanner scanner = null;
+ try {
+ acls = new HTable(conf, ACL_TABLE_NAME);
+ scanner = acls.getScanner(scan);
+ for (Result row : scanner) {
+ ListMultimap<String,TablePermission> resultPerms =
+ parseTablePermissions(row.getRow(), row);
+ allPerms.put(row.getRow(), resultPerms);
+ }
+ } finally {
+ if (scanner != null) scanner.close();
+ if (acls != null) acls.close();
+ }
+
+ return allPerms;
+ }
+
+ /**
+ * Reads user permission assignments stored in the <code>l:</code> column
+ * family of the first table row in <code>_acl_</code>.
+ *
+ * <p>
+ * See {@link AccessControlLists class documentation} for the key structure
+ * used for storage.
+ * </p>
+ */
+ static ListMultimap<String,TablePermission> getTablePermissions(
+ Configuration conf, byte[] tableName)
+ throws IOException {
+ /* TODO: -ROOT- and .META. cannot easily be handled because they must be
+ * online before _acl_ table. Can anything be done here?
+ */
+ if (Bytes.equals(tableName, HConstants.ROOT_TABLE_NAME) ||
+ Bytes.equals(tableName, HConstants.META_TABLE_NAME) ||
+ Bytes.equals(tableName, AccessControlLists.ACL_TABLE_NAME)) {
+ return ArrayListMultimap.create(0,0);
+ }
+
+ // for normal user tables, we just read the table row from _acl_
+ ListMultimap<String,TablePermission> perms = ArrayListMultimap.create();
+ HTable acls = null;
+ try {
+ acls = new HTable(conf, ACL_TABLE_NAME);
+ Get get = new Get(tableName);
+ get.addFamily(ACL_LIST_FAMILY);
+ Result row = acls.get(get);
+ if (!row.isEmpty()) {
+ perms = parseTablePermissions(tableName, row);
+ } else {
+ LOG.info("No permissions found in "+ACL_TABLE_NAME_STR+
+ " for table "+Bytes.toString(tableName));
+ }
+ } finally {
+ if (acls != null) acls.close();
+ }
+
+ return perms;
+ }
+
+ /**
+ * Returns the currently granted permissions for a given table as a list of
+ * user plus associated permissions.
+ */
+ static List<UserPermission> getUserPermissions(
+ Configuration conf, byte[] tableName)
+ throws IOException {
+ ListMultimap<String,TablePermission> allPerms = getTablePermissions(
+ conf, tableName);
+
+ List<UserPermission> perms = new ArrayList<UserPermission>();
+
+ for (Map.Entry<String, TablePermission> entry : allPerms.entries()) {
+ UserPermission up = new UserPermission(Bytes.toBytes(entry.getKey()),
+ entry.getValue().getTable(), entry.getValue().getFamily(),
+ entry.getValue().getQualifier(), entry.getValue().getActions());
+ perms.add(up);
+ }
+ return perms;
+ }
+
+ private static ListMultimap<String,TablePermission> parseTablePermissions(
+ byte[] table, Result result) {
+ ListMultimap<String,TablePermission> perms = ArrayListMultimap.create();
+ if (result != null && result.size() > 0) {
+ for (KeyValue kv : result.raw()) {
+
+ Pair<String,TablePermission> permissionsOfUserOnTable =
+ parseTablePermissionRecord(table, kv);
+
+ if (permissionsOfUserOnTable != null) {
+ String username = permissionsOfUserOnTable.getFirst();
+ TablePermission permissions = permissionsOfUserOnTable.getSecond();
+ perms.put(username, permissions);
+ }
+ }
+ }
+ return perms;
+ }
+
+ private static Pair<String,TablePermission> parseTablePermissionRecord(
+ byte[] table, KeyValue kv) {
+ // return X given a set of permissions encoded in the permissionRecord kv.
+ byte[] family = kv.getFamily();
+
+ if (!Bytes.equals(family, ACL_LIST_FAMILY)) {
+ return null;
+ }
+
+ byte[] key = kv.getQualifier();
+ byte[] value = kv.getValue();
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Read acl: kv ["+
+ Bytes.toStringBinary(key)+": "+
+ Bytes.toStringBinary(value)+"]");
+ }
+
+ // check for a column family appended to the key
+ // TODO: avoid the string conversion to make this more efficient
+ String username = Bytes.toString(key);
+ int idx = username.indexOf(ACL_KEY_DELIMITER);
+ byte[] permFamily = null;
+ byte[] permQualifier = null;
+ if (idx > 0 && idx < username.length()-1) {
+ String remainder = username.substring(idx+1);
+ username = username.substring(0, idx);
+ idx = remainder.indexOf(ACL_KEY_DELIMITER);
+ if (idx > 0 && idx < remainder.length()-1) {
+ permFamily = Bytes.toBytes(remainder.substring(0, idx));
+ permQualifier = Bytes.toBytes(remainder.substring(idx+1));
+ } else {
+ permFamily = Bytes.toBytes(remainder);
+ }
+ }
+
+ return new Pair<String,TablePermission>(
+ username, new TablePermission(table, permFamily, permQualifier, value));
+ }
+
+ /**
+ * Writes a set of permissions as {@link org.apache.hadoop.io.Writable} instances
+ * to the given output stream.
+ * @param out
+ * @param perms
+ * @param conf
+ * @throws IOException
+ */
+ public static void writePermissions(DataOutput out,
+ ListMultimap<String,? extends Permission> perms, Configuration conf)
+ throws IOException {
+ Set<String> keys = perms.keySet();
+ out.writeInt(keys.size());
+ for (String key : keys) {
+ Text.writeString(out, key);
+ HbaseObjectWritable.writeObject(out, perms.get(key), List.class, conf);
+ }
+ }
+
+ /**
+ * Writes a set of permissions as {@link org.apache.hadoop.io.Writable} instances
+ * and returns the resulting byte array.
+ */
+ public static byte[] writePermissionsAsBytes(
+ ListMultimap<String,? extends Permission> perms, Configuration conf) {
+ try {
+ ByteArrayOutputStream bos = new ByteArrayOutputStream();
+ writePermissions(new DataOutputStream(bos), perms, conf);
+ return bos.toByteArray();
+ } catch (IOException ioe) {
+ // shouldn't happen here
+ LOG.error("Error serializing permissions", ioe);
+ }
+ return null;
+ }
+
+ /**
+ * Reads a set of permissions as {@link org.apache.hadoop.io.Writable} instances
+ * from the input stream.
+ */
+ public static <T extends Permission> ListMultimap<String,T> readPermissions(
+ DataInput in, Configuration conf) throws IOException {
+ ListMultimap<String,T> perms = ArrayListMultimap.create();
+ int length = in.readInt();
+ for (int i=0; i<length; i++) {
+ String user = Text.readString(in);
+ List<T> userPerms =
+ (List)HbaseObjectWritable.readObject(in, conf);
+ perms.putAll(user, userPerms);
+ }
+
+ return perms;
+ }
+
+ /**
+ * Returns whether or not the given name should be interpreted as a group
+ * principal. Currently this simply checks if the name starts with the
+ * special group prefix character ("@").
+ */
+ public static boolean isGroupPrincipal(String name) {
+ return name != null && name.startsWith(GROUP_PREFIX);
+ }
+
+ /**
+ * Returns the actual name for a group principal (stripped of the
+ * group prefix).
+ */
+ public static String getGroupName(String aclKey) {
+ if (!isGroupPrincipal(aclKey)) {
+ return aclKey;
+ }
+
+ return aclKey.substring(GROUP_PREFIX.length());
+ }
+}