You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by st...@apache.org on 2012/05/12 00:06:59 UTC

svn commit: r1337396 [1/5] - in /hbase/trunk: ./ security/src/main/java/org/apache/hadoop/hbase/security/ security/src/main/java/org/apache/hadoop/hbase/security/access/ security/src/main/java/org/apache/hadoop/hbase/security/token/ security/src/test/ ...

Author: stack
Date: Fri May 11 22:06:57 2012
New Revision: 1337396

URL: http://svn.apache.org/viewvc?rev=1337396&view=rev
Log:
HBASE-5732 Remove the SecureRPCEngine and merge the security-related logic in the core engine

Added:
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/security/AccessDeniedException.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/security/HBasePolicyProvider.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/security/HBaseSaslRpcClient.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/security/HBaseSaslRpcServer.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/security/access/
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/security/access/AccessControlFilter.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/security/access/AccessControlLists.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/security/access/AccessController.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/security/access/AccessControllerProtocol.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/security/access/Permission.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/security/access/TableAuthManager.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/security/access/TablePermission.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/security/access/UserPermission.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/security/access/ZKPermissionWatcher.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/security/token/
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/security/token/AuthenticationKey.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/security/token/AuthenticationProtocol.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/security/token/AuthenticationTokenIdentifier.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/security/token/AuthenticationTokenSecretManager.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/security/token/AuthenticationTokenSelector.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/security/token/TokenProvider.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/security/token/TokenUtil.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/security/token/ZKSecretWatcher.java
    hbase/trunk/src/test/java/org/apache/hadoop/hbase/security/access/
    hbase/trunk/src/test/java/org/apache/hadoop/hbase/security/access/SecureTestUtil.java
    hbase/trunk/src/test/java/org/apache/hadoop/hbase/security/access/TestAccessControlFilter.java
    hbase/trunk/src/test/java/org/apache/hadoop/hbase/security/access/TestAccessController.java
    hbase/trunk/src/test/java/org/apache/hadoop/hbase/security/access/TestTablePermissions.java
    hbase/trunk/src/test/java/org/apache/hadoop/hbase/security/access/TestZKPermissionsWatcher.java
    hbase/trunk/src/test/java/org/apache/hadoop/hbase/security/token/
    hbase/trunk/src/test/java/org/apache/hadoop/hbase/security/token/TestTokenAuthentication.java
    hbase/trunk/src/test/java/org/apache/hadoop/hbase/security/token/TestZKSecretWatcher.java
Removed:
    hbase/trunk/security/src/main/java/org/apache/hadoop/hbase/security/HBaseSaslRpcServer.java
    hbase/trunk/security/src/main/java/org/apache/hadoop/hbase/security/access/AccessControlFilter.java
    hbase/trunk/security/src/main/java/org/apache/hadoop/hbase/security/access/AccessControlLists.java
    hbase/trunk/security/src/main/java/org/apache/hadoop/hbase/security/access/AccessController.java
    hbase/trunk/security/src/main/java/org/apache/hadoop/hbase/security/access/AccessControllerProtocol.java
    hbase/trunk/security/src/main/java/org/apache/hadoop/hbase/security/access/Permission.java
    hbase/trunk/security/src/main/java/org/apache/hadoop/hbase/security/access/TableAuthManager.java
    hbase/trunk/security/src/main/java/org/apache/hadoop/hbase/security/access/TablePermission.java
    hbase/trunk/security/src/main/java/org/apache/hadoop/hbase/security/access/UserPermission.java
    hbase/trunk/security/src/main/java/org/apache/hadoop/hbase/security/token/AuthenticationKey.java
    hbase/trunk/security/src/main/java/org/apache/hadoop/hbase/security/token/AuthenticationProtocol.java
    hbase/trunk/security/src/main/java/org/apache/hadoop/hbase/security/token/AuthenticationTokenIdentifier.java
    hbase/trunk/security/src/main/java/org/apache/hadoop/hbase/security/token/AuthenticationTokenSecretManager.java
    hbase/trunk/security/src/main/java/org/apache/hadoop/hbase/security/token/AuthenticationTokenSelector.java
    hbase/trunk/security/src/main/java/org/apache/hadoop/hbase/security/token/TokenProvider.java
    hbase/trunk/security/src/main/java/org/apache/hadoop/hbase/security/token/TokenUtil.java
    hbase/trunk/security/src/main/java/org/apache/hadoop/hbase/security/token/ZKSecretWatcher.java
    hbase/trunk/security/src/test/
Modified:
    hbase/trunk/pom.xml
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/AdminProtocol.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/ClientProtocol.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/ConnectionHeader.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/HBaseClient.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/RegionServerStatusProtocol.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/WritableRpcEngine.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/protobuf/generated/RPCProtos.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/security/User.java
    hbase/trunk/src/main/protobuf/RPC.proto
    hbase/trunk/src/test/resources/hbase-site.xml

Modified: hbase/trunk/pom.xml
URL: http://svn.apache.org/viewvc/hbase/trunk/pom.xml?rev=1337396&r1=1337395&r2=1337396&view=diff
==============================================================================
--- hbase/trunk/pom.xml (original)
+++ hbase/trunk/pom.xml Fri May 11 22:06:57 2012
@@ -1624,64 +1624,6 @@
       </build>
     </profile>
 
-    <!-- profile for building against Hadoop 0.20+security-->
-    <profile>
-      <id>security</id>
-      <properties>
-        <hadoop.version>1.0.2</hadoop.version>
-      </properties>
-      <build>
-        <finalName>${project.artifactId}-${project.version}-security</finalName>
-        <plugins>
-          <plugin>
-            <groupId>org.codehaus.mojo</groupId>
-            <artifactId>build-helper-maven-plugin</artifactId>
-            <executions>
-              <execution>
-                <id>add-source</id>
-                <goals>
-                  <goal>add-source</goal>
-                </goals>
-                <configuration>
-                  <sources>
-                    <source>${project.basedir}/security/src/main/java</source>
-                  </sources>
-                </configuration>
-              </execution>
-              <execution>
-                <id>add-test-source</id>
-                <goals>
-                  <goal>add-test-source</goal>
-                </goals>
-                <configuration>
-                  <sources>
-                    <source>${project.basedir}/security/src/test/java</source>
-                  </sources>
-                </configuration>
-              </execution>
-              <execution>
-                <id>add-test-resource</id>
-                <goals>
-                  <goal>add-test-resource</goal>
-                </goals>
-                <configuration>
-                  <resources>
-                    <resource>
-                      <directory>${project.basedir}/security/src/test/resources</directory>
-                      <includes>
-                        <include>hbase-site.xml</include>
-                      </includes>
-                    </resource>
-                  </resources>
-                </configuration>
-              </execution>
-            </executions>
-          </plugin>
-        </plugins>
-      </build>
-    </profile>
-
-
     <!--
       profile for building against Hadoop 0.22.0. Activate using:
        mvn -Dhadoop.profile=22

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/AdminProtocol.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/AdminProtocol.java?rev=1337396&r1=1337395&r2=1337396&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/AdminProtocol.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/AdminProtocol.java Fri May 11 22:06:57 2012
@@ -22,7 +22,7 @@ import org.apache.hadoop.classification.
 import org.apache.hadoop.hbase.ipc.VersionedProtocol;
 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.AdminService;
 import org.apache.hadoop.hbase.security.TokenInfo;
-import org.apache.hadoop.security.KerberosInfo;
+import org.apache.hadoop.hbase.security.KerberosInfo;
 
 /**
  * Protocol that a HBase client uses to communicate with a region server.

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/ClientProtocol.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/ClientProtocol.java?rev=1337396&r1=1337395&r2=1337396&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/ClientProtocol.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/ClientProtocol.java Fri May 11 22:06:57 2012
@@ -23,7 +23,7 @@ import org.apache.hadoop.classification.
 import org.apache.hadoop.hbase.ipc.VersionedProtocol;
 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ClientService;
 import org.apache.hadoop.hbase.security.TokenInfo;
-import org.apache.hadoop.security.KerberosInfo;
+import org.apache.hadoop.hbase.security.KerberosInfo;
 
 /**
  * Protocol that a HBase client uses to communicate with a region server.

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/ConnectionHeader.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/ConnectionHeader.java?rev=1337396&r1=1337395&r2=1337396&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/ConnectionHeader.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/ConnectionHeader.java Fri May 11 22:06:57 2012
@@ -1,77 +0,0 @@
-/*
- * Copyright 2010 The Apache Software Foundation
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.ipc;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.security.User;
-
-/**
- * The IPC connection header sent by the client to the server
- * on connection establishment.
- */
-@InterfaceAudience.Private
-class ConnectionHeader implements Writable {
-  protected String protocol;
-
-  public ConnectionHeader() {}
-
-  /**
-   * Create a new {@link ConnectionHeader} with the given <code>protocol</code>
-   * and {@link User}.
-   * @param protocol protocol used for communication between the IPC client
-   *                 and the server
-   * @param user {@link User} of the client communicating with
-   *            the server
-   */
-  public ConnectionHeader(String protocol, User user) {
-    this.protocol = protocol;
-  }
-
-  @Override
-  public void readFields(DataInput in) throws IOException {
-    protocol = Text.readString(in);
-    if (protocol.isEmpty()) {
-      protocol = null;
-    }
-  }
-
-  @Override
-  public void write(DataOutput out) throws IOException {
-    Text.writeString(out, (protocol == null) ? "" : protocol);
-  }
-
-  public String getProtocol() {
-    return protocol;
-  }
-
-  public User getUser() {
-    return null;
-  }
-
-  public String toString() {
-    return protocol;
-  }
-}

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/HBaseClient.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/HBaseClient.java?rev=1337396&r1=1337395&r2=1337396&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/HBaseClient.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/HBaseClient.java Fri May 11 22:06:57 2012
@@ -28,13 +28,18 @@ import java.io.EOFException;
 import java.io.FilterInputStream;
 import java.io.IOException;
 import java.io.InputStream;
+import java.io.OutputStream;
 import java.net.ConnectException;
 import java.net.InetSocketAddress;
 import java.net.Socket;
 import java.net.SocketException;
 import java.net.SocketTimeoutException;
 import java.net.UnknownHostException;
+import java.security.PrivilegedExceptionAction;
+import java.util.HashMap;
 import java.util.Iterator;
+import java.util.Map;
+import java.util.Random;
 import java.util.Map.Entry;
 import java.util.concurrent.ConcurrentSkipListMap;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -48,18 +53,32 @@ import org.apache.hadoop.classification.
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.protobuf.generated.RPCProtos;
+import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RpcResponse.Status;
 import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.ConnectionHeader;
 import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RpcRequest;
 import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RpcResponse;
+import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.UserInformation;
+import org.apache.hadoop.hbase.security.HBaseSaslRpcClient;
+import org.apache.hadoop.hbase.security.KerberosInfo;
+import org.apache.hadoop.hbase.security.TokenInfo;
+import org.apache.hadoop.hbase.security.HBaseSaslRpcServer.AuthMethod;
 import org.apache.hadoop.hbase.security.User;
+import org.apache.hadoop.hbase.security.token.AuthenticationTokenIdentifier;
+import org.apache.hadoop.hbase.security.token.AuthenticationTokenSelector;
 import org.apache.hadoop.hbase.util.PoolMap;
 import org.apache.hadoop.hbase.util.PoolMap.PoolType;
 import org.apache.hadoop.io.DataOutputBuffer;
 import org.apache.hadoop.hbase.io.DataOutputOutputStream;
 import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.ipc.RemoteException;
 import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.security.SecurityUtil;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.TokenIdentifier;
+import org.apache.hadoop.security.token.TokenSelector;
 import org.apache.hadoop.util.ReflectionUtils;
 
 import com.google.protobuf.ByteString;
@@ -213,7 +232,12 @@ public class HBaseClient {
       return this.startTime;
     }
   }
-
+  protected static Map<String,TokenSelector<? extends TokenIdentifier>> tokenHandlers =
+      new HashMap<String,TokenSelector<? extends TokenIdentifier>>();
+  static {
+    tokenHandlers.put(AuthenticationTokenIdentifier.AUTH_TOKEN_TYPE.toString(),
+        new AuthenticationTokenSelector());
+  }
   /** Thread that reads responses and notifies callers.  Each connection owns a
    * socket connected to a remote address.  Calls are multiplexed through this
    * socket: responses may be delivered out of order. */
@@ -223,6 +247,13 @@ public class HBaseClient {
     protected Socket socket = null;                 // connected socket
     protected DataInputStream in;
     protected DataOutputStream out;
+    private InetSocketAddress server;             // server ip:port
+    private String serverPrincipal;  // server's krb5 principal name
+    private AuthMethod authMethod; // authentication method
+    private boolean useSasl;
+    private Token<? extends TokenIdentifier> token;
+    private HBaseSaslRpcClient saslRpcClient;
+    private int reloginMaxBackoff; // max pause before relogin on sasl failure
 
     // currently active calls
     protected final ConcurrentSkipListMap<Integer, Call> calls = new ConcurrentSkipListMap<Integer, Call>();
@@ -235,20 +266,90 @@ public class HBaseClient {
         throw new UnknownHostException("unknown host: " +
                                        remoteId.getAddress().getHostName());
       }
+      this.server = remoteId.getAddress();
+
+      UserGroupInformation ticket = remoteId.getTicket().getUGI();
+      Class<?> protocol = remoteId.getProtocol();
+      this.useSasl = UserGroupInformation.isSecurityEnabled();
+      if (useSasl && protocol != null) {
+        TokenInfo tokenInfo = protocol.getAnnotation(TokenInfo.class);
+        if (tokenInfo != null) {
+          TokenSelector<? extends TokenIdentifier> tokenSelector =
+              tokenHandlers.get(tokenInfo.value());
+          if (tokenSelector != null) {
+            token = tokenSelector.selectToken(new Text(clusterId),
+                ticket.getTokens());
+          } else if (LOG.isDebugEnabled()) {
+            LOG.debug("No token selector found for type "+tokenInfo.value());
+          }
+        }
+        KerberosInfo krbInfo = protocol.getAnnotation(KerberosInfo.class);
+        if (krbInfo != null) {
+          String serverKey = krbInfo.serverPrincipal();
+          if (serverKey == null) {
+            throw new IOException(
+                "Can't obtain server Kerberos config key from KerberosInfo");
+          }
+          serverPrincipal = SecurityUtil.getServerPrincipal(
+              conf.get(serverKey), server.getAddress().getCanonicalHostName().toLowerCase());
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("RPC Server Kerberos principal name for protocol="
+                + protocol.getCanonicalName() + " is " + serverPrincipal);
+          }
+        }
+      }
+
+      if (!useSasl) {
+        authMethod = AuthMethod.SIMPLE;
+      } else if (token != null) {
+        authMethod = AuthMethod.DIGEST;
+      } else {
+        authMethod = AuthMethod.KERBEROS;
+      }
+
+      if (LOG.isDebugEnabled())
+        LOG.debug("Use " + authMethod + " authentication for protocol "
+            + protocol.getSimpleName());
+
+      reloginMaxBackoff = conf.getInt("hbase.security.relogin.maxbackoff", 5000);
       this.remoteId = remoteId;
-      User ticket = remoteId.getTicket();
-      Class<? extends VersionedProtocol> protocol = remoteId.getProtocol();
 
       ConnectionHeader.Builder builder = ConnectionHeader.newBuilder();
       builder.setProtocol(protocol == null ? "" : protocol.getName());
+      UserInformation userInfoPB;
+      if ((userInfoPB = getUserInfoPB(ticket)) != null) {
+        builder.setUserInfo(userInfoPB);
+      }
       this.header = builder.build();
 
       this.setName("IPC Client (" + socketFactory.hashCode() +") connection to " +
         remoteId.getAddress().toString() +
-        ((ticket==null)?" from an unknown user": (" from " + ticket.getName())));
+        ((ticket==null)?" from an unknown user": (" from " 
+        + ticket.getUserName())));
       this.setDaemon(true);
     }
 
+    private UserInformation getUserInfoPB(UserGroupInformation ugi) {
+      if (ugi == null || authMethod == AuthMethod.DIGEST) {
+        // Don't send user for token auth
+        return null;
+      }
+      UserInformation.Builder userInfoPB = UserInformation.newBuilder();
+      if (ugi != null) {
+        if (authMethod == AuthMethod.KERBEROS) {
+          // Send effective user for Kerberos auth
+          userInfoPB.setEffectiveUser(ugi.getUserName());
+        } else if (authMethod == AuthMethod.SIMPLE) {
+          //Send both effective user and real user for simple auth
+          userInfoPB.setEffectiveUser(ugi.getUserName());
+          if (ugi.getRealUser() != null) {
+            userInfoPB.setRealUser(ugi.getRealUser().getUserName());
+          }
+        }
+      }
+      return userInfoPB.build();
+    }
+
     /** Update lastActivity with the current time. */
     protected void touch() {
       lastActivity.set(System.currentTimeMillis());
@@ -352,42 +453,6 @@ public class HBaseClient {
       }
     }
 
-    /** Connect to the server and set up the I/O streams. It then sends
-     * a header to the server and starts
-     * the connection thread that waits for responses.
-     * @throws java.io.IOException e
-     */
-    protected synchronized void setupIOstreams()
-        throws IOException, InterruptedException {
-
-      if (socket != null || shouldCloseConnection.get()) {
-        return;
-      }
-
-      try {
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("Connecting to "+remoteId);
-        }
-        setupConnection();
-        this.in = new DataInputStream(new BufferedInputStream
-            (new PingInputStream(NetUtils.getInputStream(socket))));
-        this.out = new DataOutputStream
-            (new BufferedOutputStream(NetUtils.getOutputStream(socket)));
-        writeHeader();
-
-        // update last activity time
-        touch();
-
-        // start the receiver thread after the socket connection has been set up
-        start();
-      } catch (IOException e) {
-        markClosed(e);
-        close();
-
-        throw e;
-      }
-    }
-
     protected void closeConnection() {
       // close the current connection
       if (socket != null) {
@@ -437,16 +502,6 @@ public class HBaseClient {
         " time(s).");
     }
 
-    /* Write the header for each connection
-     * Out is not synchronized because only the first thread does this.
-     */
-    private void writeHeader() throws IOException {
-      out.write(HBaseServer.HEADER.array());
-      out.write(HBaseServer.CURRENT_VERSION);
-      out.writeInt(header.getSerializedSize());
-      header.writeTo(out);
-    }
-
     /* wait till someone signals us to start reading RPC response or
      * it is idle too long, it is marked as to be closed,
      * or the client is marked as not running.
@@ -519,6 +574,230 @@ public class HBaseClient {
             + connections.size());
     }
 
+    private synchronized void disposeSasl() {
+      if (saslRpcClient != null) {
+        try {
+          saslRpcClient.dispose();
+          saslRpcClient = null;
+        } catch (IOException ioe) {
+          LOG.error("Error disposing of SASL client", ioe);
+        }
+      }
+    }
+
+    private synchronized boolean shouldAuthenticateOverKrb() throws IOException {
+      UserGroupInformation loginUser = UserGroupInformation.getLoginUser();
+      UserGroupInformation currentUser =
+        UserGroupInformation.getCurrentUser();
+      UserGroupInformation realUser = currentUser.getRealUser();
+      return authMethod == AuthMethod.KERBEROS &&
+          loginUser != null &&
+          //Make sure user logged in using Kerberos either keytab or TGT
+          loginUser.hasKerberosCredentials() &&
+          // relogin only in case it is the login user (e.g. JT)
+          // or superuser (like oozie).
+          (loginUser.equals(currentUser) || loginUser.equals(realUser));
+    }
+
+    private synchronized boolean setupSaslConnection(final InputStream in2,
+        final OutputStream out2) throws IOException {
+      saslRpcClient = new HBaseSaslRpcClient(authMethod, token, serverPrincipal);
+      return saslRpcClient.saslConnect(in2, out2);
+    }
+
+    /**
+     * If multiple clients with the same principal try to connect
+     * to the same server at the same time, the server assumes a
+     * replay attack is in progress. This is a feature of kerberos.
+     * In order to work around this, what is done is that the client
+     * backs off randomly and tries to initiate the connection
+     * again.
+     * The other problem is to do with ticket expiry. To handle that,
+     * a relogin is attempted.
+     */
+    private synchronized void handleSaslConnectionFailure(
+        final int currRetries,
+        final int maxRetries, final Exception ex, final Random rand,
+        final UserGroupInformation user)
+    throws IOException, InterruptedException{
+      user.doAs(new PrivilegedExceptionAction<Object>() {
+        public Object run() throws IOException, InterruptedException {
+          closeConnection();
+          if (shouldAuthenticateOverKrb()) {
+            if (currRetries < maxRetries) {
+              LOG.debug("Exception encountered while connecting to " +
+                  "the server : " + ex);
+              //try re-login
+              if (UserGroupInformation.isLoginKeytabBased()) {
+                UserGroupInformation.getLoginUser().reloginFromKeytab();
+              } else {
+                UserGroupInformation.getLoginUser().reloginFromTicketCache();
+              }
+              disposeSasl();
+              //have granularity of milliseconds
+              //we are sleeping with the Connection lock held but since this
+              //connection instance is being used for connecting to the server
+              //in question, it is okay
+              Thread.sleep((rand.nextInt(reloginMaxBackoff) + 1));
+              return null;
+            } else {
+              String msg = "Couldn't setup connection for " +
+              UserGroupInformation.getLoginUser().getUserName() +
+              " to " + serverPrincipal;
+              LOG.warn(msg);
+              throw (IOException) new IOException(msg).initCause(ex);
+            }
+          } else {
+            LOG.warn("Exception encountered while connecting to " +
+                "the server : " + ex);
+          }
+          if (ex instanceof RemoteException)
+            throw (RemoteException)ex;
+          throw new IOException(ex);
+        }
+      });
+    }
+
+    protected synchronized void setupIOstreams()
+        throws IOException, InterruptedException {
+      if (socket != null || shouldCloseConnection.get()) {
+        return;
+      }
+
+      try {
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Connecting to "+server);
+        }
+        short numRetries = 0;
+        final short MAX_RETRIES = 5;
+        Random rand = null;
+        while (true) {
+          setupConnection();
+          InputStream inStream = NetUtils.getInputStream(socket);
+          OutputStream outStream = NetUtils.getOutputStream(socket);
+          writeRpcHeader(outStream);
+          if (useSasl) {
+            final InputStream in2 = inStream;
+            final OutputStream out2 = outStream;
+            UserGroupInformation ticket = remoteId.getTicket().getUGI();
+            if (authMethod == AuthMethod.KERBEROS) {;
+              if (ticket != null && ticket.getRealUser() != null) {
+                ticket = ticket.getRealUser();
+              }
+            }
+            boolean continueSasl = false;
+            try {
+              continueSasl =
+                ticket.doAs(new PrivilegedExceptionAction<Boolean>() {
+                  @Override
+                  public Boolean run() throws IOException {
+                    return setupSaslConnection(in2, out2);
+                  }
+                });
+            } catch (Exception ex) {
+              if (rand == null) {
+                rand = new Random();
+              }
+              handleSaslConnectionFailure(numRetries++, MAX_RETRIES, ex, rand,
+                   ticket);
+              continue;
+            }
+            if (continueSasl) {
+              // Sasl connect is successful. Let's set up Sasl i/o streams.
+              inStream = saslRpcClient.getInputStream(inStream);
+              outStream = saslRpcClient.getOutputStream(outStream);
+            } else {
+              // fall back to simple auth because server told us so.
+              authMethod = AuthMethod.SIMPLE;
+              useSasl = false;
+            }
+          }
+          this.in = new DataInputStream(new BufferedInputStream
+              (new PingInputStream(inStream)));
+          this.out = new DataOutputStream
+          (new BufferedOutputStream(outStream));
+          writeHeader();
+
+          // update last activity time
+          touch();
+
+          // start the receiver thread after the socket connection has been set up
+          start();
+          return;
+        }
+      } catch (IOException e) {
+        markClosed(e);
+        close();
+
+        throw e;
+      }
+    }
+
+    /* Write the RPC header */
+    private void writeRpcHeader(OutputStream outStream) throws IOException {
+      DataOutputStream out = new DataOutputStream(new BufferedOutputStream(outStream));
+      // Write out the header, version and authentication method
+      out.write(HBaseServer.HEADER.array());
+      out.write(HBaseServer.CURRENT_VERSION);
+      authMethod.write(out);
+      out.flush();
+    }
+
+    /**
+     * Write the protocol header for each connection
+     * Out is not synchronized because only the first thread does this.
+     */
+    private void writeHeader() throws IOException {
+      // Write out the ConnectionHeader
+      out.writeInt(header.getSerializedSize());
+      header.writeTo(out);
+    }
+
+    /** Close the connection. */
+    protected synchronized void close() {
+      if (!shouldCloseConnection.get()) {
+        LOG.error("The connection is not in the closed state");
+        return;
+      }
+
+      // release the resources
+      // first thing to do;take the connection out of the connection list
+      synchronized (connections) {
+        if (connections.get(remoteId) == this) {
+          connections.remove(remoteId);
+        }
+      }
+
+      // close the streams and therefore the socket
+      IOUtils.closeStream(out);
+      IOUtils.closeStream(in);
+      disposeSasl();
+
+      // clean up all calls
+      if (closeException == null) {
+        if (!calls.isEmpty()) {
+          LOG.warn(
+              "A connection is closed for no cause and calls are not empty. " +
+              "#Calls: " + calls.size());
+
+          // clean up calls anyway
+          closeException = new IOException("Unexpected closed connection");
+          cleanupCalls();
+        }
+      } else {
+        // log the info
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("closing ipc connection to " + server + ": " +
+              closeException.getMessage(),closeException);
+        }
+
+        // cleanup calls
+        cleanupCalls();
+      }
+      if (LOG.isDebugEnabled())
+        LOG.debug(getName() + ": closed");
+    }
+
     /* Initiates a call by sending the parameter to the remote server.
      * Note: this is not called from the Connection thread, but by other
      * threads.
@@ -575,15 +854,8 @@ public class HBaseClient {
           LOG.debug(getName() + " got value #" + id);
         Call call = calls.remove(id);
 
-        boolean isError = response.getError();
-        if (isError) {
-          if (call != null) {
-            //noinspection ThrowableInstanceNeverThrown
-            call.setException(new RemoteException(
-                response.getException().getExceptionName(),
-                response.getException().getStackTrace()));
-          }
-        } else {
+        Status status = response.getStatus();
+        if (status == Status.SUCCESS) {
           ByteString responseObj = response.getResponse();
           DataInputStream dis =
               new DataInputStream(responseObj.newInput());
@@ -594,6 +866,18 @@ public class HBaseClient {
           if (call != null) {
             call.setValue(value);
           }
+        } else if (status == Status.ERROR) {
+          if (call != null) {
+            //noinspection ThrowableInstanceNeverThrown
+            call.setException(new RemoteException(
+                response.getException().getExceptionName(),
+                response.getException().getStackTrace()));
+          }
+        } else if (status == Status.FATAL) {
+          // Close the connection
+          markClosed(new RemoteException(
+              response.getException().getExceptionName(),
+              response.getException().getStackTrace()));
         }
       } catch (IOException e) {
         if (e instanceof SocketTimeoutException && remoteId.rpcTimeout > 0) {
@@ -620,47 +904,6 @@ public class HBaseClient {
       }
     }
 
-    /** Close the connection. */
-    protected synchronized void close() {
-      if (!shouldCloseConnection.get()) {
-        LOG.error("The connection is not in the closed state");
-        return;
-      }
-
-      // release the resources
-      // first thing to do;take the connection out of the connection list
-      synchronized (connections) {
-        connections.remove(remoteId, this);
-      }
-
-      // close the streams and therefore the socket
-      IOUtils.closeStream(out);
-      IOUtils.closeStream(in);
-
-      // clean up all calls
-      if (closeException == null) {
-        if (!calls.isEmpty()) {
-          LOG.warn(
-              "A connection is closed for no cause and calls are not empty");
-
-          // clean up calls anyway
-          closeException = new IOException("Unexpected closed connection");
-          cleanupCalls();
-        }
-      } else {
-        // log the info
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("closing ipc connection to " + remoteId.address + ": " +
-              closeException.getMessage(),closeException);
-        }
-
-        // cleanup calls
-        cleanupCalls();
-      }
-      if (LOG.isDebugEnabled())
-        LOG.debug(getName() + ": closed");
-    }
-
     /* Cleanup all calls and mark them as done */
     protected void cleanupCalls() {
       cleanupCalls(0);

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java?rev=1337396&r1=1337395&r2=1337396&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java Fri May 11 22:06:57 2012
@@ -20,6 +20,8 @@
 
 package org.apache.hadoop.hbase.ipc;
 
+import static org.apache.hadoop.fs.CommonConfigurationKeys.HADOOP_SECURITY_AUTHORIZATION;
+
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
 import java.io.DataInputStream;
@@ -34,6 +36,7 @@ import java.net.SocketException;
 import java.net.UnknownHostException;
 import java.nio.ByteBuffer;
 import java.nio.channels.CancelledKeyException;
+import java.nio.channels.Channels;
 import java.nio.channels.ClosedChannelException;
 import java.nio.channels.ReadableByteChannel;
 import java.nio.channels.SelectionKey;
@@ -41,6 +44,7 @@ import java.nio.channels.Selector;
 import java.nio.channels.ServerSocketChannel;
 import java.nio.channels.SocketChannel;
 import java.nio.channels.WritableByteChannel;
+import java.security.PrivilegedExceptionAction;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
@@ -56,6 +60,10 @@ import java.util.concurrent.ExecutorServ
 import java.util.concurrent.Executors;
 import java.util.concurrent.LinkedBlockingQueue;
 
+import javax.security.sasl.Sasl;
+import javax.security.sasl.SaslException;
+import javax.security.sasl.SaslServer;
+
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
@@ -68,14 +76,34 @@ import org.apache.hadoop.hbase.protobuf.
 import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RpcRequest;
 import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RpcResponse;
 import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RpcException;
+import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RpcResponse.Status;
+import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.UserInformation;
 import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler;
 import org.apache.hadoop.hbase.monitoring.TaskMonitor;
 import org.apache.hadoop.hbase.security.User;
+import org.apache.hadoop.hbase.security.HBaseSaslRpcServer;
+import org.apache.hadoop.hbase.security.HBaseSaslRpcServer.AuthMethod;
+import org.apache.hadoop.hbase.security.HBaseSaslRpcServer.SaslDigestCallbackHandler;
+import org.apache.hadoop.hbase.security.HBaseSaslRpcServer.SaslGssCallbackHandler;
+import org.apache.hadoop.hbase.security.HBaseSaslRpcServer.SaslStatus;
 import org.apache.hadoop.hbase.util.ByteBufferOutputStream;
 import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.io.BytesWritable;
 import org.apache.hadoop.io.DataOutputBuffer;
+import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableUtils;
+import org.apache.hadoop.ipc.Server;
 import org.apache.hadoop.ipc.RPC.VersionMismatch;
+import org.apache.hadoop.security.AccessControlException;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod;
+import org.apache.hadoop.security.authorize.AuthorizationException;
+import org.apache.hadoop.security.authorize.ProxyUsers;
+import org.apache.hadoop.security.authorize.ServiceAuthorizationManager;
+import org.apache.hadoop.security.token.SecretManager;
+import org.apache.hadoop.security.token.TokenIdentifier;
+import org.apache.hadoop.security.token.SecretManager.InvalidToken;
 import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.hadoop.util.StringUtils;
 
@@ -96,7 +124,8 @@ import org.cliffc.high_scale_lib.Counter
  */
 @InterfaceAudience.Private
 public abstract class HBaseServer implements RpcServer {
-
+  private final boolean authorize;
+  private boolean isSecurityEnabled;
   /**
    * The first four bytes of Hadoop RPC connections
    */
@@ -130,6 +159,13 @@ public abstract class HBaseServer implem
   protected static final Log TRACELOG =
       LogFactory.getLog("org.apache.hadoop.ipc.HBaseServer.trace");
 
+  private static final String AUTH_FAILED_FOR = "Auth failed for ";
+  private static final String AUTH_SUCCESSFUL_FOR = "Auth successful for ";
+  private static final Log AUDITLOG =
+      LogFactory.getLog("SecurityLogger."+Server.class.getName());
+  protected SecretManager<TokenIdentifier> secretManager;
+  protected ServiceAuthorizationManager authManager;
+
   protected static final ThreadLocal<RpcServer> SERVER =
     new ThreadLocal<RpcServer>();
   private volatile boolean started = false;
@@ -303,11 +339,12 @@ public abstract class HBaseServer implem
       return param.toString() + " from " + connection.toString();
     }
 
+    protected synchronized void setSaslTokenResponse(ByteBuffer response) {
+      this.response = response;
+    }
+
     protected synchronized void setResponse(Object value, Status status,
         String errorClass, String error) {
-      // Avoid overwriting an error value in the response.  This can happen if
-      // endDelayThrowing is called by another thread before the actual call
-      // returning.
       if (this.isError)
         return;
       if (errorClass != null) {
@@ -328,8 +365,7 @@ public abstract class HBaseServer implem
       if (result instanceof WritableWithSize) {
         // get the size hint.
         WritableWithSize ohint = (WritableWithSize) result;
-        long hint = ohint.getWritableSize() + Bytes.SIZEOF_BYTE +
-          (2 * Bytes.SIZEOF_INT);
+        long hint = ohint.getWritableSize() + 2*Bytes.SIZEOF_INT;
         if (hint > Integer.MAX_VALUE) {
           // oops, new problem.
           IOException ioe =
@@ -342,12 +378,11 @@ public abstract class HBaseServer implem
       }
 
       ByteBufferOutputStream buf = new ByteBufferOutputStream(size);
-      DataOutputStream out = new DataOutputStream(buf);
       try {
         RpcResponse.Builder builder = RpcResponse.newBuilder();
         // Call id.
         builder.setCallId(this.id);
-        builder.setError(error != null);
+        builder.setStatus(status);
         if (error != null) {
           RpcException.Builder b = RpcException.newBuilder();
           b.setExceptionName(errorClass);
@@ -359,8 +394,10 @@ public abstract class HBaseServer implem
           byte[] response = d.getData();
           builder.setResponse(ByteString.copyFrom(response));
         }
-        builder.build().writeDelimitedTo(
-            DataOutputOutputStream.constructOutputStream(out));
+        builder.build().writeDelimitedTo(buf);
+        if (connection.useWrap) {
+          wrapWithSasl(buf);
+        }
       } catch (IOException e) {
         LOG.warn("Exception while creating response " + e);
       }
@@ -369,6 +406,28 @@ public abstract class HBaseServer implem
       this.response = bb;
     }
 
+    private void wrapWithSasl(ByteBufferOutputStream response)
+        throws IOException {
+      if (connection.useSasl) {
+        // getByteBuffer calls flip()
+        ByteBuffer buf = response.getByteBuffer();
+        byte[] token;
+        // synchronization may be needed since there can be multiple Handler
+        // threads using saslServer to wrap responses.
+        synchronized (connection.saslServer) {
+          token = connection.saslServer.wrap(buf.array(),
+              buf.arrayOffset(), buf.remaining());
+        }
+        if (LOG.isDebugEnabled())
+          LOG.debug("Adding saslServer wrapped token of size " + token.length
+              + " as call response.");
+        buf.clear();
+        DataOutputStream saslOut = new DataOutputStream(response);
+        saslOut.writeInt(token.length);
+        saslOut.write(token, 0, token.length);
+      }
+    }
+
     @Override
     public synchronized void endDelay(Object result) throws IOException {
       assert this.delayResponse;
@@ -1056,8 +1115,8 @@ public abstract class HBaseServer implem
   }
 
   /** Reads calls from a connection and queues them for handling. */
-  protected class Connection {
-    private boolean versionRead = false; //if initial signature and
+  public class Connection {
+    private boolean rpcHeaderRead = false; //if initial signature and
                                          //version are read
     private boolean headerRead = false;  //if the connection header that
                                          //follows version is read.
@@ -1068,6 +1127,7 @@ public abstract class HBaseServer implem
     private volatile int rpcCount = 0; // number of outstanding rpcs
     private long lastContact;
     private int dataLength;
+    private InetAddress addr;
     protected Socket socket;
     // Cache the remote host & port info so that even if the socket is
     // disconnected, we can say where it used to connect to.
@@ -1075,8 +1135,27 @@ public abstract class HBaseServer implem
     protected int remotePort;
     ConnectionHeader header;
     Class<? extends VersionedProtocol> protocol;
-    protected User user = null;
+    protected UserGroupInformation user = null;
+    private AuthMethod authMethod;
+    private boolean saslContextEstablished;
+    private boolean skipInitialSaslHandshake;
+    private ByteBuffer rpcHeaderBuffer;
+    private ByteBuffer unwrappedData;
+    private ByteBuffer unwrappedDataLengthBuffer;
+    boolean useSasl;
+    SaslServer saslServer;
+    private boolean useWrap = false;
+    // Fake 'call' for failed authorization response
+    private final int AUTHROIZATION_FAILED_CALLID = -1;
+    private final Call authFailedCall =
+      new Call(AUTHROIZATION_FAILED_CALLID, null, this, null, 0);
+    private ByteArrayOutputStream authFailedResponse =
+        new ByteArrayOutputStream();
+    // Fake 'call' for SASL context setup
+    private static final int SASL_CALLID = -33;
+    private final Call saslCall = new Call(SASL_CALLID, null, this, null, 0);
 
+    public UserGroupInformation attemptingUser = null; // user name before auth
     public Connection(SocketChannel channel, long lastContact) {
       this.channel = channel;
       this.lastContact = lastContact;
@@ -1110,6 +1189,10 @@ public abstract class HBaseServer implem
       return hostAddress;
     }
 
+    public InetAddress getHostInetAddress() {
+      return addr;
+    }
+
     public int getRemotePort() {
       return remotePort;
     }
@@ -1141,39 +1224,218 @@ public abstract class HBaseServer implem
       return isIdle() && currentTime - lastContact > maxIdleTime;
     }
 
+    private UserGroupInformation getAuthorizedUgi(String authorizedId)
+        throws IOException {
+      if (authMethod == AuthMethod.DIGEST) {
+        TokenIdentifier tokenId = HBaseSaslRpcServer.getIdentifier(authorizedId,
+            secretManager);
+        UserGroupInformation ugi = tokenId.getUser();
+        if (ugi == null) {
+          throw new AccessControlException(
+              "Can't retrieve username from tokenIdentifier.");
+        }
+        ugi.addTokenIdentifier(tokenId);
+        return ugi;
+      } else {
+        return UserGroupInformation.createRemoteUser(authorizedId);
+      }
+    }
+
+    private void saslReadAndProcess(byte[] saslToken) throws IOException,
+        InterruptedException {
+      if (saslContextEstablished) {
+        if (LOG.isDebugEnabled())
+          LOG.debug("Have read input token of size " + saslToken.length
+              + " for processing by saslServer.unwrap()");
+
+        if (!useWrap) {
+          processOneRpc(saslToken);
+        } else {
+          byte[] plaintextData = saslServer.unwrap(saslToken, 0,
+              saslToken.length);
+          processUnwrappedData(plaintextData);
+        }
+      } else {
+        byte[] replyToken = null;
+        try {
+          if (saslServer == null) {
+            switch (authMethod) {
+            case DIGEST:
+              if (secretManager == null) {
+                throw new AccessControlException(
+                    "Server is not configured to do DIGEST authentication.");
+              }
+              saslServer = Sasl.createSaslServer(AuthMethod.DIGEST
+                  .getMechanismName(), null, HBaseSaslRpcServer.SASL_DEFAULT_REALM,
+                  HBaseSaslRpcServer.SASL_PROPS, new SaslDigestCallbackHandler(
+                      secretManager, this));
+              break;
+            default:
+              UserGroupInformation current = UserGroupInformation
+              .getCurrentUser();
+              String fullName = current.getUserName();
+              if (LOG.isDebugEnabled())
+                LOG.debug("Kerberos principal name is " + fullName);
+              final String names[] = HBaseSaslRpcServer.splitKerberosName(fullName);
+              if (names.length != 3) {
+                throw new AccessControlException(
+                    "Kerberos principal name does NOT have the expected "
+                        + "hostname part: " + fullName);
+              }
+              current.doAs(new PrivilegedExceptionAction<Object>() {
+                @Override
+                public Object run() throws SaslException {
+                  saslServer = Sasl.createSaslServer(AuthMethod.KERBEROS
+                      .getMechanismName(), names[0], names[1],
+                      HBaseSaslRpcServer.SASL_PROPS, new SaslGssCallbackHandler());
+                  return null;
+                }
+              });
+            }
+            if (saslServer == null)
+              throw new AccessControlException(
+                  "Unable to find SASL server implementation for "
+                      + authMethod.getMechanismName());
+            if (LOG.isDebugEnabled())
+              LOG.debug("Created SASL server with mechanism = "
+                  + authMethod.getMechanismName());
+          }
+          if (LOG.isDebugEnabled())
+            LOG.debug("Have read input token of size " + saslToken.length
+                + " for processing by saslServer.evaluateResponse()");
+          replyToken = saslServer.evaluateResponse(saslToken);
+        } catch (IOException e) {
+          IOException sendToClient = e;
+          Throwable cause = e;
+          while (cause != null) {
+            if (cause instanceof InvalidToken) {
+              sendToClient = (InvalidToken) cause;
+              break;
+            }
+            cause = cause.getCause();
+          }
+          doRawSaslReply(SaslStatus.ERROR, null, sendToClient.getClass().getName(),
+              sendToClient.getLocalizedMessage());
+          rpcMetrics.authenticationFailures.inc();
+          String clientIP = this.toString();
+          // attempting user could be null
+          AUDITLOG.warn(AUTH_FAILED_FOR + clientIP + ":" + attemptingUser);
+          throw e;
+        }
+        if (replyToken != null) {
+          if (LOG.isDebugEnabled())
+            LOG.debug("Will send token of size " + replyToken.length
+                + " from saslServer.");
+          doRawSaslReply(SaslStatus.SUCCESS, new BytesWritable(replyToken), null,
+              null);
+        }
+        if (saslServer.isComplete()) {
+          String qop = (String) saslServer.getNegotiatedProperty(Sasl.QOP);
+          useWrap = qop != null && !"auth".equalsIgnoreCase(qop);
+          user = getAuthorizedUgi(saslServer.getAuthorizationID());
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("SASL server context established. Authenticated client: "
+              + user + ". Negotiated QoP is "
+              + saslServer.getNegotiatedProperty(Sasl.QOP));
+          }          
+          rpcMetrics.authenticationSuccesses.inc();
+          AUDITLOG.trace(AUTH_SUCCESSFUL_FOR + user);
+          saslContextEstablished = true;
+        }
+      }
+    }
+    /**
+     * No protobuf encoding of raw sasl messages
+     */
+    private void doRawSaslReply(SaslStatus status, Writable rv,
+        String errorClass, String error) throws IOException {
+      //In my testing, have noticed that sasl messages are usually
+      //in the ballpark of 100-200. That's why the initialcapacity is 256.
+      ByteBufferOutputStream saslResponse = new ByteBufferOutputStream(256);
+      DataOutputStream out = new DataOutputStream(saslResponse);
+      out.writeInt(status.state); // write status
+      if (status == SaslStatus.SUCCESS) {
+        rv.write(out);
+      } else {
+        WritableUtils.writeString(out, errorClass);
+        WritableUtils.writeString(out, error);
+      }
+      saslCall.setSaslTokenResponse(saslResponse.getByteBuffer());
+      saslCall.responder = responder;
+      saslCall.sendResponseIfReady();
+    }
+
+    private void disposeSasl() {
+      if (saslServer != null) {
+        try {
+          saslServer.dispose();
+          saslServer = null;
+        } catch (SaslException ignored) {
+        }
+      }
+    }
+
     public int readAndProcess() throws IOException, InterruptedException {
       while (true) {
         /* Read at most one RPC. If the header is not read completely yet
          * then iterate until we read first RPC or until there is no data left.
          */
-        int count;
+        int count = -1;
         if (dataLengthBuffer.remaining() > 0) {
           count = channelRead(channel, dataLengthBuffer);
           if (count < 0 || dataLengthBuffer.remaining() > 0)
             return count;
         }
 
-        if (!versionRead) {
+        if (!rpcHeaderRead) {
           //Every connection is expected to send the header.
-          ByteBuffer versionBuffer = ByteBuffer.allocate(1);
-          count = channelRead(channel, versionBuffer);
-          if (count <= 0) {
+          if (rpcHeaderBuffer == null) {
+            rpcHeaderBuffer = ByteBuffer.allocate(2);
+          }
+          count = channelRead(channel, rpcHeaderBuffer);
+          if (count < 0 || rpcHeaderBuffer.remaining() > 0) {
             return count;
           }
-          int version = versionBuffer.get(0);
-
+          int version = rpcHeaderBuffer.get(0);
+          byte[] method = new byte[] {rpcHeaderBuffer.get(1)};
+          authMethod = AuthMethod.read(new DataInputStream(
+              new ByteArrayInputStream(method)));
           dataLengthBuffer.flip();
           if (!HEADER.equals(dataLengthBuffer) || version != CURRENT_VERSION) {
-            //Warning is ok since this is not supposed to happen.
-            LOG.warn("Incorrect header or version mismatch from " +
-                     hostAddress + ":" + remotePort +
-                     " got version " + version +
-                     " expected version " + CURRENT_VERSION);
+              LOG.warn("Incorrect header or version mismatch from " +
+                  hostAddress + ":" + remotePort +
+                  " got version " + version +
+                  " expected version " + CURRENT_VERSION);
             setupBadVersionResponse(version);
             return -1;
           }
           dataLengthBuffer.clear();
-          versionRead = true;
+          if (authMethod == null) {
+            throw new IOException("Unable to read authentication method");
+          }
+          if (isSecurityEnabled && authMethod == AuthMethod.SIMPLE) {
+            AccessControlException ae = new AccessControlException(
+                "Authentication is required");
+            setupResponse(authFailedResponse, authFailedCall, Status.FATAL,
+                null, ae.getClass().getName(), ae.getMessage());
+            responder.doRespond(authFailedCall);
+            throw ae;
+          }
+          if (!isSecurityEnabled && authMethod != AuthMethod.SIMPLE) {
+            doRawSaslReply(SaslStatus.SUCCESS, new IntWritable(
+                HBaseSaslRpcServer.SWITCH_TO_SIMPLE_AUTH), null, null);
+            authMethod = AuthMethod.SIMPLE;
+            // client has already sent the initial Sasl message and we
+            // should ignore it. Both client and server should fall back
+            // to simple auth from now on.
+            skipInitialSaslHandshake = true;
+          }
+          if (authMethod != AuthMethod.SIMPLE) {
+            useSasl = true;
+          }
+
+          rpcHeaderBuffer = null;
+          rpcHeaderRead = true;
           continue;
         }
 
@@ -1182,8 +1444,14 @@ public abstract class HBaseServer implem
           dataLength = dataLengthBuffer.getInt();
 
           if (dataLength == HBaseClient.PING_CALL_ID) {
-            dataLengthBuffer.clear();
-            return 0;  //ping message
+            if(!useWrap) { //covers the !useSasl too
+              dataLengthBuffer.clear();
+              return 0;  //ping message
+            }
+          }
+          if (dataLength < 0) {
+            throw new IllegalArgumentException("Unexpected data length " 
+                + dataLength + "!! from " + getHostAddress());
           }
           data = ByteBuffer.allocate(dataLength);
           incRpcCount();  // Increment the rpc count
@@ -1194,15 +1462,21 @@ public abstract class HBaseServer implem
         if (data.remaining() == 0) {
           dataLengthBuffer.clear();
           data.flip();
-          if (headerRead) {
-            processData(data.array());
+          if (skipInitialSaslHandshake) {
             data = null;
-            return count;
+            skipInitialSaslHandshake = false;
+            continue;
+          }
+          boolean isHeaderRead = headerRead;
+          if (useSasl) {
+            saslReadAndProcess(data.array());
+          } else {
+            processOneRpc(data.array());
           }
-          processHeader();
-          headerRead = true;
           data = null;
-          continue;
+          if (!isHeaderRead) {
+            continue;
+          }
         }
         return count;
       }
@@ -1238,16 +1512,104 @@ public abstract class HBaseServer implem
     }
 
     /// Reads the connection header following version
-    private void processHeader() throws IOException {
-      header = ConnectionHeader.parseFrom(new ByteArrayInputStream(data.array()));
+    private void processHeader(byte[] buf) throws IOException {
+      DataInputStream in =
+        new DataInputStream(new ByteArrayInputStream(buf));
+      header = ConnectionHeader.parseFrom(in);
       try {
         String protocolClassName = header.getProtocol();
-        protocol = getProtocolClass(protocolClassName, conf);
+        if (protocolClassName != null) {
+          protocol = getProtocolClass(header.getProtocol(), conf);
+        }
       } catch (ClassNotFoundException cnfe) {
         throw new IOException("Unknown protocol: " + header.getProtocol());
       }
 
-      user = User.createUser(header);
+      UserGroupInformation protocolUser = createUser(header);
+      if (!useSasl) {
+        user = protocolUser;
+        if (user != null) {
+          user.setAuthenticationMethod(AuthMethod.SIMPLE.authenticationMethod);
+        }
+      } else {
+        // user is authenticated
+        user.setAuthenticationMethod(authMethod.authenticationMethod);
+        //Now we check if this is a proxy user case. If the protocol user is
+        //different from the 'user', it is a proxy user scenario. However,
+        //this is not allowed if user authenticated with DIGEST.
+        if ((protocolUser != null)
+            && (!protocolUser.getUserName().equals(user.getUserName()))) {
+          if (authMethod == AuthMethod.DIGEST) {
+            // Not allowed to doAs if token authentication is used
+            throw new AccessControlException("Authenticated user (" + user
+                + ") doesn't match what the client claims to be ("
+                + protocolUser + ")");
+          } else {
+            // Effective user can be different from authenticated user
+            // for simple auth or kerberos auth
+            // The user is the real user. Now we create a proxy user
+            UserGroupInformation realUser = user;
+            user = UserGroupInformation.createProxyUser(protocolUser
+                .getUserName(), realUser);
+            // Now the user is a proxy user, set Authentication method Proxy.
+            user.setAuthenticationMethod(AuthenticationMethod.PROXY);
+          }
+        }
+      }
+    }
+
+    private void processUnwrappedData(byte[] inBuf) throws IOException,
+    InterruptedException {
+      ReadableByteChannel ch = Channels.newChannel(new ByteArrayInputStream(
+          inBuf));
+      // Read all RPCs contained in the inBuf, even partial ones
+      while (true) {
+        int count = -1;
+        if (unwrappedDataLengthBuffer.remaining() > 0) {
+          count = channelRead(ch, unwrappedDataLengthBuffer);
+          if (count <= 0 || unwrappedDataLengthBuffer.remaining() > 0)
+            return;
+        }
+
+        if (unwrappedData == null) {
+          unwrappedDataLengthBuffer.flip();
+          int unwrappedDataLength = unwrappedDataLengthBuffer.getInt();
+
+          if (unwrappedDataLength == HBaseClient.PING_CALL_ID) {
+            if (LOG.isDebugEnabled())
+              LOG.debug("Received ping message");
+            unwrappedDataLengthBuffer.clear();
+            continue; // ping message
+          }
+          unwrappedData = ByteBuffer.allocate(unwrappedDataLength);
+        }
+
+        count = channelRead(ch, unwrappedData);
+        if (count <= 0 || unwrappedData.remaining() > 0)
+          return;
+
+        if (unwrappedData.remaining() == 0) {
+          unwrappedDataLengthBuffer.clear();
+          unwrappedData.flip();
+          processOneRpc(unwrappedData.array());
+          unwrappedData = null;
+        }
+      }
+    }
+
+    private void processOneRpc(byte[] buf) throws IOException,
+    InterruptedException {
+      if (headerRead) {
+        processData(buf);
+      } else {
+        processHeader(buf);
+        headerRead = true;
+        if (!authorizeConnection()) {
+          throw new AccessControlException("Connection from " + this
+              + " for protocol " + header.getProtocol()
+              + " is unauthorized for user " + user);
+        }
+      }
     }
 
     protected void processData(byte[] buf) throws  IOException, InterruptedException {
@@ -1303,7 +1665,34 @@ public abstract class HBaseServer implem
       }
     }
 
+    private boolean authorizeConnection() throws IOException {
+      try {
+        // If auth method is DIGEST, the token was obtained by the
+        // real user for the effective user, therefore not required to
+        // authorize real user. doAs is allowed only for simple or kerberos
+        // authentication
+        if (user != null && user.getRealUser() != null
+            && (authMethod != AuthMethod.DIGEST)) {
+          ProxyUsers.authorize(user, this.getHostAddress(), conf);
+        }
+        authorize(user, header, getHostInetAddress());
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Successfully authorized " + header);
+        }
+        rpcMetrics.authorizationSuccesses.inc();
+      } catch (AuthorizationException ae) {
+        LOG.debug("Connection authorization failed: "+ae.getMessage(), ae);
+        rpcMetrics.authorizationFailures.inc();
+        setupResponse(authFailedResponse, authFailedCall, Status.FATAL, null,
+            ae.getClass().getName(), ae.getMessage());
+        responder.doRespond(authFailedCall);
+        return false;
+      }
+      return true;
+    }
+
     protected synchronized void close() {
+      disposeSasl();
       data = null;
       dataLengthBuffer = null;
       if (!channel.isOpen())
@@ -1314,6 +1703,33 @@ public abstract class HBaseServer implem
       }
       try {socket.close();} catch(Exception ignored) {}
     }
+
+    private UserGroupInformation createUser(ConnectionHeader head) {
+      UserGroupInformation ugi = null;
+
+      if (!head.hasUserInfo()) {
+        return null;
+      }
+      UserInformation userInfoProto = head.getUserInfo();
+      String effectiveUser = null;
+      if (userInfoProto.hasEffectiveUser()) {
+        effectiveUser = userInfoProto.getEffectiveUser();
+      }
+      String realUser = null;
+      if (userInfoProto.hasRealUser()) {
+        realUser = userInfoProto.getRealUser();
+      }
+      if (effectiveUser != null) {
+        if (realUser != null) {
+          UserGroupInformation realUserUgi =
+              UserGroupInformation.createRemoteUser(realUser);
+          ugi = UserGroupInformation.createProxyUser(effectiveUser, realUserUgi);
+        } else {
+          ugi = UserGroupInformation.createRemoteUser(effectiveUser);
+        }
+      }
+      return ugi;
+    }
   }
 
   /**
@@ -1377,15 +1793,16 @@ public abstract class HBaseServer implem
               throw new ServerNotRunningYetException("Server is not running yet");
 
             if (LOG.isDebugEnabled()) {
-              User remoteUser = call.connection.user;
+              UserGroupInformation remoteUser = call.connection.user;
               LOG.debug(getName() + ": call #" + call.id + " executing as "
-                  + (remoteUser == null ? "NULL principal" : remoteUser.getName()));
+                  + (remoteUser == null ? "NULL principal" :
+                    remoteUser.getUserName()));
             }
 
-            RequestContext.set(call.connection.user, getRemoteIp(),
+            RequestContext.set(User.create(call.connection.user), getRemoteIp(),
                 call.connection.protocol);
             // make the call
-            value = call(call.connection.protocol, call.param, call.timestamp, 
+            value = call(call.connection.protocol, call.param, call.timestamp,
                 status);
           } catch (Throwable e) {
             LOG.debug(getName()+", call "+call+": error: " + e, e);
@@ -1517,6 +1934,12 @@ public abstract class HBaseServer implem
 
     // Create the responder here
     responder = new Responder();
+    this.authorize =
+        conf.getBoolean(HADOOP_SECURITY_AUTHORIZATION, false);
+    this.isSecurityEnabled = UserGroupInformation.isSecurityEnabled();
+    if (isSecurityEnabled) {
+      HBaseSaslRpcServer.init(conf);
+    }
   }
 
   /**
@@ -1572,6 +1995,10 @@ public abstract class HBaseServer implem
     rpcMetrics.numOpenConnections.set(numConnections);
   }
 
+  Configuration getConf() {
+    return conf;
+  }
+
   /** Sets the socket buffer size used for responding to RPCs.
    * @param size send size
    */
@@ -1617,6 +2044,14 @@ public abstract class HBaseServer implem
     }
   }
 
+  public SecretManager<? extends TokenIdentifier> getSecretManager() {
+    return this.secretManager;
+  }
+
+  public void setSecretManager(SecretManager<? extends TokenIdentifier> secretManager) {
+    this.secretManager = (SecretManager<TokenIdentifier>) secretManager;
+  }
+
   /** Stops the service.  No new calls will be handled after this is called. */
   @Override
   public synchronized void stop() {
@@ -1683,6 +2118,31 @@ public abstract class HBaseServer implem
   }
 
   /**
+   * Authorize the incoming client connection.
+   *
+   * @param user client user
+   * @param connection incoming connection
+   * @param addr InetAddress of incoming connection
+   * @throws org.apache.hadoop.security.authorize.AuthorizationException when the client isn't authorized to talk the protocol
+   */
+  public void authorize(UserGroupInformation user,
+                        ConnectionHeader connection,
+                        InetAddress addr
+                        ) throws AuthorizationException {
+    if (authorize) {
+      Class<?> protocol = null;
+      try {
+        protocol = getProtocolClass(connection.getProtocol(), getConf());
+      } catch (ClassNotFoundException cfne) {
+        throw new AuthorizationException("Unknown protocol: " +
+                                         connection.getProtocol());
+      }
+      authManager.authorize(user != null ? user : null,
+          protocol, getConf(), addr);
+    }
+  }
+
+  /**
    * When the read or write buffer size is larger than this limit, i/o will be
    * done in chunks of this size. Most RPC requests and responses would be
    * be smaller.

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/RegionServerStatusProtocol.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/RegionServerStatusProtocol.java?rev=1337396&r1=1337395&r2=1337396&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/RegionServerStatusProtocol.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/RegionServerStatusProtocol.java Fri May 11 22:06:57 2012
@@ -23,7 +23,7 @@ import org.apache.hadoop.classification.
 import org.apache.hadoop.hbase.ipc.VersionedProtocol;
 import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionServerStatusService;
 import org.apache.hadoop.hbase.security.TokenInfo;
-import org.apache.hadoop.security.KerberosInfo;
+import org.apache.hadoop.hbase.security.KerberosInfo;
 
 /**
  * Protocol that a RegionServer uses to communicate its status to the Master.

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/WritableRpcEngine.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/WritableRpcEngine.java?rev=1337396&r1=1337395&r2=1337396&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/WritableRpcEngine.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/WritableRpcEngine.java Fri May 11 22:06:57 2012
@@ -50,8 +50,11 @@ import org.apache.hadoop.hbase.util.Obje
 import org.apache.hadoop.io.*;
 import org.apache.hadoop.ipc.RPC;
 import org.apache.hadoop.ipc.RemoteException;
+import org.apache.hadoop.security.authorize.ServiceAuthorizationManager;
+import org.apache.hadoop.hbase.security.HBasePolicyProvider;
 import org.apache.hadoop.hbase.ipc.VersionedProtocol;
 import org.apache.hadoop.hbase.security.User;
+import org.apache.hadoop.hbase.security.token.AuthenticationTokenSecretManager;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.conf.*;
 
@@ -252,9 +255,6 @@ class WritableRpcEngine implements RpcEn
     private Class<?>[] ifaces;
     private boolean verbose;
 
-    // for JSON encoding
-    private static ObjectMapper mapper = new ObjectMapper();
-
     private static final String WARN_RESPONSE_TIME =
       "hbase.ipc.warn.response.time";
     private static final String WARN_RESPONSE_SIZE =
@@ -310,6 +310,36 @@ class WritableRpcEngine implements RpcEn
           DEFAULT_WARN_RESPONSE_SIZE);
     }
 
+    public AuthenticationTokenSecretManager createSecretManager(){
+      if (!User.isSecurityEnabled() ||
+          !(instance instanceof org.apache.hadoop.hbase.Server)) {
+        return null;
+      }
+      org.apache.hadoop.hbase.Server server =
+          (org.apache.hadoop.hbase.Server)instance;
+      Configuration conf = server.getConfiguration();
+      long keyUpdateInterval =
+          conf.getLong("hbase.auth.key.update.interval", 24*60*60*1000);
+      long maxAge =
+          conf.getLong("hbase.auth.token.max.lifetime", 7*24*60*60*1000);
+      return new AuthenticationTokenSecretManager(conf, server.getZooKeeper(),
+          server.getServerName().toString(), keyUpdateInterval, maxAge);
+    }
+
+    @Override
+    public void startThreads() {
+      AuthenticationTokenSecretManager mgr = createSecretManager();
+      if (mgr != null) {
+        setSecretManager(mgr);
+        mgr.start();
+      }
+      this.authManager = new ServiceAuthorizationManager();
+      HBasePolicyProvider.init(conf, authManager);
+
+      // continue with base startup
+      super.startThreads();
+    }
+
     @Override
     public Writable call(Class<? extends VersionedProtocol> protocol,
         Writable param, long receivedTime, MonitoredRPCHandler status)

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/protobuf/generated/RPCProtos.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/protobuf/generated/RPCProtos.java?rev=1337396&r1=1337395&r2=1337396&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/protobuf/generated/RPCProtos.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/protobuf/generated/RPCProtos.java Fri May 11 22:06:57 2012
@@ -15,7 +15,7 @@ public final class RPCProtos {
     boolean hasEffectiveUser();
     String getEffectiveUser();
     
-    // required string realUser = 2;
+    // optional string realUser = 2;
     boolean hasRealUser();
     String getRealUser();
   }
@@ -80,7 +80,7 @@ public final class RPCProtos {
       }
     }
     
-    // required string realUser = 2;
+    // optional string realUser = 2;
     public static final int REALUSER_FIELD_NUMBER = 2;
     private java.lang.Object realUser_;
     public boolean hasRealUser() {
@@ -125,10 +125,6 @@ public final class RPCProtos {
         memoizedIsInitialized = 0;
         return false;
       }
-      if (!hasRealUser()) {
-        memoizedIsInitialized = 0;
-        return false;
-      }
       memoizedIsInitialized = 1;
       return true;
     }
@@ -406,10 +402,6 @@ public final class RPCProtos {
           
           return false;
         }
-        if (!hasRealUser()) {
-          
-          return false;
-        }
         return true;
       }
       
@@ -488,7 +480,7 @@ public final class RPCProtos {
         onChanged();
       }
       
-      // required string realUser = 2;
+      // optional string realUser = 2;
       private java.lang.Object realUser_ = "";
       public boolean hasRealUser() {
         return ((bitField0_ & 0x00000002) == 0x00000002);
@@ -2081,9 +2073,9 @@ public final class RPCProtos {
     boolean hasCallId();
     int getCallId();
     
-    // required bool error = 2;
-    boolean hasError();
-    boolean getError();
+    // required .RpcResponse.Status status = 2;
+    boolean hasStatus();
+    org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RpcResponse.Status getStatus();
     
     // optional bytes response = 3;
     boolean hasResponse();
@@ -2122,6 +2114,78 @@ public final class RPCProtos {
       return org.apache.hadoop.hbase.protobuf.generated.RPCProtos.internal_static_RpcResponse_fieldAccessorTable;
     }
     
+    public enum Status
+        implements com.google.protobuf.ProtocolMessageEnum {
+      SUCCESS(0, 0),
+      ERROR(1, 1),
+      FATAL(2, 2),
+      ;
+      
+      public static final int SUCCESS_VALUE = 0;
+      public static final int ERROR_VALUE = 1;
+      public static final int FATAL_VALUE = 2;
+      
+      
+      public final int getNumber() { return value; }
+      
+      public static Status valueOf(int value) {
+        switch (value) {
+          case 0: return SUCCESS;
+          case 1: return ERROR;
+          case 2: return FATAL;
+          default: return null;
+        }
+      }
+      
+      public static com.google.protobuf.Internal.EnumLiteMap<Status>
+          internalGetValueMap() {
+        return internalValueMap;
+      }
+      private static com.google.protobuf.Internal.EnumLiteMap<Status>
+          internalValueMap =
+            new com.google.protobuf.Internal.EnumLiteMap<Status>() {
+              public Status findValueByNumber(int number) {
+                return Status.valueOf(number);
+              }
+            };
+      
+      public final com.google.protobuf.Descriptors.EnumValueDescriptor
+          getValueDescriptor() {
+        return getDescriptor().getValues().get(index);
+      }
+      public final com.google.protobuf.Descriptors.EnumDescriptor
+          getDescriptorForType() {
+        return getDescriptor();
+      }
+      public static final com.google.protobuf.Descriptors.EnumDescriptor
+          getDescriptor() {
+        return org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RpcResponse.getDescriptor().getEnumTypes().get(0);
+      }
+      
+      private static final Status[] VALUES = {
+        SUCCESS, ERROR, FATAL, 
+      };
+      
+      public static Status valueOf(
+          com.google.protobuf.Descriptors.EnumValueDescriptor desc) {
+        if (desc.getType() != getDescriptor()) {
+          throw new java.lang.IllegalArgumentException(
+            "EnumValueDescriptor is not for this type.");
+        }
+        return VALUES[desc.getIndex()];
+      }
+      
+      private final int index;
+      private final int value;
+      
+      private Status(int index, int value) {
+        this.index = index;
+        this.value = value;
+      }
+      
+      // @@protoc_insertion_point(enum_scope:RpcResponse.Status)
+    }
+    
     private int bitField0_;
     // required int32 callId = 1;
     public static final int CALLID_FIELD_NUMBER = 1;
@@ -2133,14 +2197,14 @@ public final class RPCProtos {
       return callId_;
     }
     
-    // required bool error = 2;
-    public static final int ERROR_FIELD_NUMBER = 2;
-    private boolean error_;
-    public boolean hasError() {
+    // required .RpcResponse.Status status = 2;
+    public static final int STATUS_FIELD_NUMBER = 2;
+    private org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RpcResponse.Status status_;
+    public boolean hasStatus() {
       return ((bitField0_ & 0x00000002) == 0x00000002);
     }
-    public boolean getError() {
-      return error_;
+    public org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RpcResponse.Status getStatus() {
+      return status_;
     }
     
     // optional bytes response = 3;
@@ -2168,7 +2232,7 @@ public final class RPCProtos {
     
     private void initFields() {
       callId_ = 0;
-      error_ = false;
+      status_ = org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RpcResponse.Status.SUCCESS;
       response_ = com.google.protobuf.ByteString.EMPTY;
       exception_ = org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RpcException.getDefaultInstance();
     }
@@ -2181,7 +2245,7 @@ public final class RPCProtos {
         memoizedIsInitialized = 0;
         return false;
       }
-      if (!hasError()) {
+      if (!hasStatus()) {
         memoizedIsInitialized = 0;
         return false;
       }
@@ -2202,7 +2266,7 @@ public final class RPCProtos {
         output.writeInt32(1, callId_);
       }
       if (((bitField0_ & 0x00000002) == 0x00000002)) {
-        output.writeBool(2, error_);
+        output.writeEnum(2, status_.getNumber());
       }
       if (((bitField0_ & 0x00000004) == 0x00000004)) {
         output.writeBytes(3, response_);
@@ -2225,7 +2289,7 @@ public final class RPCProtos {
       }
       if (((bitField0_ & 0x00000002) == 0x00000002)) {
         size += com.google.protobuf.CodedOutputStream
-          .computeBoolSize(2, error_);
+          .computeEnumSize(2, status_.getNumber());
       }
       if (((bitField0_ & 0x00000004) == 0x00000004)) {
         size += com.google.protobuf.CodedOutputStream
@@ -2263,10 +2327,10 @@ public final class RPCProtos {
         result = result && (getCallId()
             == other.getCallId());
       }
-      result = result && (hasError() == other.hasError());
-      if (hasError()) {
-        result = result && (getError()
-            == other.getError());
+      result = result && (hasStatus() == other.hasStatus());
+      if (hasStatus()) {
+        result = result &&
+            (getStatus() == other.getStatus());
       }
       result = result && (hasResponse() == other.hasResponse());
       if (hasResponse()) {
@@ -2291,9 +2355,9 @@ public final class RPCProtos {
         hash = (37 * hash) + CALLID_FIELD_NUMBER;
         hash = (53 * hash) + getCallId();
       }
-      if (hasError()) {
-        hash = (37 * hash) + ERROR_FIELD_NUMBER;
-        hash = (53 * hash) + hashBoolean(getError());
+      if (hasStatus()) {
+        hash = (37 * hash) + STATUS_FIELD_NUMBER;
+        hash = (53 * hash) + hashEnum(getStatus());
       }
       if (hasResponse()) {
         hash = (37 * hash) + RESPONSE_FIELD_NUMBER;
@@ -2422,7 +2486,7 @@ public final class RPCProtos {
         super.clear();
         callId_ = 0;
         bitField0_ = (bitField0_ & ~0x00000001);
-        error_ = false;
+        status_ = org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RpcResponse.Status.SUCCESS;
         bitField0_ = (bitField0_ & ~0x00000002);
         response_ = com.google.protobuf.ByteString.EMPTY;
         bitField0_ = (bitField0_ & ~0x00000004);
@@ -2477,7 +2541,7 @@ public final class RPCProtos {
         if (((from_bitField0_ & 0x00000002) == 0x00000002)) {
           to_bitField0_ |= 0x00000002;
         }
-        result.error_ = error_;
+        result.status_ = status_;
         if (((from_bitField0_ & 0x00000004) == 0x00000004)) {
           to_bitField0_ |= 0x00000004;
         }
@@ -2509,8 +2573,8 @@ public final class RPCProtos {
         if (other.hasCallId()) {
           setCallId(other.getCallId());
         }
-        if (other.hasError()) {
-          setError(other.getError());
+        if (other.hasStatus()) {
+          setStatus(other.getStatus());
         }
         if (other.hasResponse()) {
           setResponse(other.getResponse());
@@ -2527,7 +2591,7 @@ public final class RPCProtos {
           
           return false;
         }
-        if (!hasError()) {
+        if (!hasStatus()) {
           
           return false;
         }
@@ -2569,8 +2633,14 @@ public final class RPCProtos {
               break;
             }
             case 16: {
-              bitField0_ |= 0x00000002;
-              error_ = input.readBool();
+              int rawValue = input.readEnum();
+              org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RpcResponse.Status value = org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RpcResponse.Status.valueOf(rawValue);
+              if (value == null) {
+                unknownFields.mergeVarintField(2, rawValue);
+              } else {
+                bitField0_ |= 0x00000002;
+                status_ = value;
+              }
               break;
             }
             case 26: {
@@ -2614,23 +2684,26 @@ public final class RPCProtos {
         return this;
       }
       
-      // required bool error = 2;
-      private boolean error_ ;
-      public boolean hasError() {
+      // required .RpcResponse.Status status = 2;
+      private org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RpcResponse.Status status_ = org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RpcResponse.Status.SUCCESS;
+      public boolean hasStatus() {
         return ((bitField0_ & 0x00000002) == 0x00000002);
       }
-      public boolean getError() {
-        return error_;
+      public org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RpcResponse.Status getStatus() {
+        return status_;
       }
-      public Builder setError(boolean value) {
+      public Builder setStatus(org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RpcResponse.Status value) {
+        if (value == null) {
+          throw new NullPointerException();
+        }
         bitField0_ |= 0x00000002;
-        error_ = value;
+        status_ = value;
         onChanged();
         return this;
       }
-      public Builder clearError() {
+      public Builder clearStatus() {
         bitField0_ = (bitField0_ & ~0x00000002);
-        error_ = false;
+        status_ = org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RpcResponse.Status.SUCCESS;
         onChanged();
         return this;
       }
@@ -2795,17 +2868,19 @@ public final class RPCProtos {
   static {
     java.lang.String[] descriptorData = {
       "\n\tRPC.proto\":\n\017UserInformation\022\025\n\reffect" +
-      "iveUser\030\001 \002(\t\022\020\n\010realUser\030\002 \002(\t\"w\n\020Conne" +
+      "iveUser\030\001 \002(\t\022\020\n\010realUser\030\002 \001(\t\"w\n\020Conne" +
       "ctionHeader\022\"\n\010userInfo\030\001 \001(\0132\020.UserInfo" +
       "rmation\022?\n\010protocol\030\002 \001(\t:-org.apache.ha" +
       "doop.hbase.client.ClientProtocol\"-\n\nRpcR" +
       "equest\022\016\n\006callId\030\001 \002(\005\022\017\n\007request\030\002 \001(\014\"" +
       "9\n\014RpcException\022\025\n\rexceptionName\030\001 \002(\t\022\022" +
-      "\n\nstackTrace\030\002 \001(\t\"`\n\013RpcResponse\022\016\n\006cal" +
-      "lId\030\001 \002(\005\022\r\n\005error\030\002 \002(\010\022\020\n\010response\030\003 \001" +
-      "(\014\022 \n\texception\030\004 \001(\0132\r.RpcExceptionB<\n*",
-      "org.apache.hadoop.hbase.protobuf.generat" +
-      "edB\tRPCProtosH\001\240\001\001"
+      "\n\nstackTrace\030\002 \001(\t\"\243\001\n\013RpcResponse\022\016\n\006ca" +
+      "llId\030\001 \002(\005\022#\n\006status\030\002 \002(\0162\023.RpcResponse" +
+      ".Status\022\020\n\010response\030\003 \001(\014\022 \n\texception\030\004",
+      " \001(\0132\r.RpcException\"+\n\006Status\022\013\n\007SUCCESS" +
+      "\020\000\022\t\n\005ERROR\020\001\022\t\n\005FATAL\020\002B<\n*org.apache.h" +
+      "adoop.hbase.protobuf.generatedB\tRPCProto" +
+      "sH\001\240\001\001"
     };
     com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
       new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {
@@ -2849,7 +2924,7 @@ public final class RPCProtos {
           internal_static_RpcResponse_fieldAccessorTable = new
             com.google.protobuf.GeneratedMessage.FieldAccessorTable(
               internal_static_RpcResponse_descriptor,
-              new java.lang.String[] { "CallId", "Error", "Response", "Exception", },
+              new java.lang.String[] { "CallId", "Status", "Response", "Exception", },
               org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RpcResponse.class,
               org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RpcResponse.Builder.class);
           return null;

Added: hbase/trunk/src/main/java/org/apache/hadoop/hbase/security/AccessDeniedException.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/security/AccessDeniedException.java?rev=1337396&view=auto
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/security/AccessDeniedException.java (added)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/security/AccessDeniedException.java Fri May 11 22:06:57 2012
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.security;
+
+import org.apache.hadoop.hbase.DoNotRetryIOException;
+
+/**
+ * Exception thrown by access-related methods.
+ */
+public class AccessDeniedException extends DoNotRetryIOException {
+  private static final long serialVersionUID = 1913879564363001780L;
+
+  public AccessDeniedException() {
+    super();
+  }
+
+  public AccessDeniedException(Class<?> clazz, String s) {
+    super( "AccessDenied [" + clazz.getName() + "]: " + s);
+  }
+
+  public AccessDeniedException(String s) {
+    super(s);
+  }
+}

Added: hbase/trunk/src/main/java/org/apache/hadoop/hbase/security/HBasePolicyProvider.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/security/HBasePolicyProvider.java?rev=1337396&view=auto
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/security/HBasePolicyProvider.java (added)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/security/HBasePolicyProvider.java Fri May 11 22:06:57 2012
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.security;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.client.AdminProtocol;
+import org.apache.hadoop.hbase.client.ClientProtocol;
+import org.apache.hadoop.hbase.ipc.HMasterInterface;
+import org.apache.hadoop.hbase.ipc.RegionServerStatusProtocol;
+import org.apache.hadoop.security.authorize.PolicyProvider;
+import org.apache.hadoop.security.authorize.Service;
+import org.apache.hadoop.security.authorize.ServiceAuthorizationManager;
+
+/**
+ * Implementation of secure Hadoop policy provider for mapping
+ * protocol interfaces to hbase-policy.xml entries.
+ */
+public class HBasePolicyProvider extends PolicyProvider {
+  protected static Service[] services = {
+      new Service("security.client.protocol.acl", ClientProtocol.class),
+      new Service("security.client.protocol.acl", AdminProtocol.class),
+      new Service("security.admin.protocol.acl", HMasterInterface.class),
+      new Service("security.masterregion.protocol.acl", RegionServerStatusProtocol.class)
+  };
+
+  @Override
+  public Service[] getServices() {
+    return services;
+  }
+
+  public static void init(Configuration conf,
+      ServiceAuthorizationManager authManager) {
+    // set service-level authorization security policy
+    conf.set("hadoop.policy.file", "hbase-policy.xml");
+    if (conf.getBoolean(
+          ServiceAuthorizationManager.SERVICE_AUTHORIZATION_CONFIG, false)) {
+      authManager.refresh(conf, new HBasePolicyProvider());
+    }
+  }
+}