You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ga...@apache.org on 2012/12/19 22:22:36 UTC

svn commit: r1424122 [3/3] - in /hbase/trunk: hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/ hbase-protocol/src/main/protobuf/ hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ hbase-server/src/main/java/org/apache/hadoo...

Added: hbase/trunk/hbase-protocol/src/main/protobuf/Authentication.proto
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-protocol/src/main/protobuf/Authentication.proto?rev=1424122&view=auto
==============================================================================
--- hbase/trunk/hbase-protocol/src/main/protobuf/Authentication.proto (added)
+++ hbase/trunk/hbase-protocol/src/main/protobuf/Authentication.proto Wed Dec 19 21:22:36 2012
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+option java_package = "org.apache.hadoop.hbase.protobuf.generated";
+option java_outer_classname = "AuthenticationProtos";
+option java_generic_services = true;
+option java_generate_equals_and_hash = true;
+option optimize_for = SPEED;
+
+message AuthenticationKey {
+    required int32 id = 1;
+    required int64 expirationDate = 2;
+    required bytes key = 3;
+}
+
+
+message TokenIdentifier {
+    enum Kind {
+        HBASE_AUTH_TOKEN = 0;
+    }
+    required Kind kind = 1;
+    required bytes username = 2;
+    required int32 keyId = 3;
+    optional int64 issueDate = 4;
+    optional int64 expirationDate = 5;
+    optional int64 sequenceNumber = 6;
+}
+
+
+// Serialization of the org.apache.hadoop.security.token.Token class
+// Note that this is a Hadoop class, so fields may change!
+message Token {
+    // the TokenIdentifier in serialized form
+    // Note: we can't use the protobuf directly because the Hadoop Token class
+    // only stores the serialized bytes
+    optional bytes identifier = 1;
+    optional bytes password = 2;
+    optional bytes service = 3;
+}
+
+
+// RPC request & response messages
+message TokenRequest {
+}
+
+message TokenResponse {
+    optional Token token = 1;
+}
+
+message WhoAmIRequest {
+}
+
+message WhoAmIResponse {
+    optional string username = 1;
+    optional string authMethod = 2;
+}
+
+
+// RPC service
+service AuthenticationService {
+    rpc getAuthenticationToken(TokenRequest)
+        returns (TokenResponse);
+
+    rpc whoami(WhoAmIRequest)
+        returns (WhoAmIResponse);
+}
\ No newline at end of file

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ServerRpcController.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ServerRpcController.java?rev=1424122&r1=1424121&r2=1424122&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ServerRpcController.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ServerRpcController.java Wed Dec 19 21:22:36 2012
@@ -121,4 +121,13 @@ public class ServerRpcController impleme
   public boolean failedOnException() {
     return serviceException != null;
   }
+
+  /**
+   * Throws an IOException back out if one is currently stored.
+   */
+  public void checkFailed() throws IOException {
+    if (failedOnException()) {
+      throw getFailedOn();
+    }
+  }
 }

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java?rev=1424122&r1=1424121&r2=1424122&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java Wed Dec 19 21:22:36 2012
@@ -94,6 +94,7 @@ import org.apache.hadoop.hbase.protobuf.
 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.WALEntry;
 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.WALEntry.WALEdit.FamilyScope;
 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.WALEntry.WALKey;
+import org.apache.hadoop.hbase.protobuf.generated.AuthenticationProtos;
 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.BulkLoadHFileRequest;
 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.BulkLoadHFileResponse;
@@ -128,6 +129,7 @@ import org.apache.hadoop.hbase.regionser
 import org.apache.hadoop.hbase.security.access.Permission;
 import org.apache.hadoop.hbase.security.access.TablePermission;
 import org.apache.hadoop.hbase.security.access.UserPermission;
+import org.apache.hadoop.hbase.security.token.AuthenticationTokenIdentifier;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.Methods;
 import org.apache.hadoop.hbase.util.Pair;
@@ -141,6 +143,8 @@ import com.google.protobuf.Message;
 import com.google.protobuf.RpcChannel;
 import com.google.protobuf.Service;
 import com.google.protobuf.ServiceException;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.security.token.Token;
 
 /**
  * Protobufs utility.
@@ -1902,6 +1906,36 @@ public final class ProtobufUtil {
   }
 
   /**
+   * Converts a Token instance (with embedded identifier) to the protobuf representation.
+   *
+   * @param token the Token instance to copy
+   * @return the protobuf Token message
+   */
+  public static AuthenticationProtos.Token toToken(Token<AuthenticationTokenIdentifier> token) {
+    AuthenticationProtos.Token.Builder builder = AuthenticationProtos.Token.newBuilder();
+    builder.setIdentifier(ByteString.copyFrom(token.getIdentifier()));
+    builder.setPassword(ByteString.copyFrom(token.getPassword()));
+    if (token.getService() != null) {
+      builder.setService(ByteString.copyFromUtf8(token.getService().toString()));
+    }
+    return builder.build();
+  }
+
+  /**
+   * Converts a protobuf Token message back into a Token instance.
+   *
+   * @param proto the protobuf Token message
+   * @return the Token instance
+   */
+  public static Token<AuthenticationTokenIdentifier> toToken(AuthenticationProtos.Token proto) {
+    return new Token<AuthenticationTokenIdentifier>(
+        proto.hasIdentifier() ? proto.getIdentifier().toByteArray() : null,
+        proto.hasPassword() ? proto.getPassword().toByteArray() : null,
+        AuthenticationTokenIdentifier.AUTH_TOKEN_TYPE,
+        proto.hasService() ? new Text(proto.getService().toStringUtf8()) : null);
+  }
+
+  /**
    * Find the HRegion encoded name based on a region specifier
    *
    * @param regionSpecifier the region specifier

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/security/token/AuthenticationTokenIdentifier.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/security/token/AuthenticationTokenIdentifier.java?rev=1424122&r1=1424121&r2=1424122&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/security/token/AuthenticationTokenIdentifier.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/security/token/AuthenticationTokenIdentifier.java Wed Dec 19 21:22:36 2012
@@ -22,6 +22,8 @@ import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
 
+import com.google.protobuf.ByteString;
+import org.apache.hadoop.hbase.protobuf.generated.AuthenticationProtos;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.WritableUtils;
 import org.apache.hadoop.security.UserGroupInformation;
@@ -31,7 +33,6 @@ import org.apache.hadoop.security.token.
  * Represents the identity information stored in an HBase authentication token.
  */
 public class AuthenticationTokenIdentifier extends TokenIdentifier {
-  public static final byte VERSION = 1;
   public static final Text AUTH_TOKEN_TYPE = new Text("HBASE_AUTH_TOKEN");
 
   protected String username;
@@ -108,28 +109,56 @@ public class AuthenticationTokenIdentifi
     this.sequenceNumber = seq;
   }
 
+  public byte[] toBytes() {
+    AuthenticationProtos.TokenIdentifier.Builder builder =
+        AuthenticationProtos.TokenIdentifier.newBuilder();
+    builder.setKind(AuthenticationProtos.TokenIdentifier.Kind.HBASE_AUTH_TOKEN);
+    if (username != null) {
+      builder.setUsername(ByteString.copyFromUtf8(username));
+    }
+    builder.setIssueDate(issueDate)
+        .setExpirationDate(expirationDate)
+        .setKeyId(keyId)
+        .setSequenceNumber(sequenceNumber);
+    return builder.build().toByteArray();
+  }
+
   @Override
   public void write(DataOutput out) throws IOException {
-    out.writeByte(VERSION);
-    WritableUtils.writeString(out, username);
-    WritableUtils.writeVInt(out, keyId);
-    WritableUtils.writeVLong(out, issueDate);
-    WritableUtils.writeVLong(out, expirationDate);
-    WritableUtils.writeVLong(out, sequenceNumber);
+    byte[] pbBytes = toBytes();
+    out.writeInt(pbBytes.length);
+    out.write(pbBytes);
   }
 
   @Override
   public void readFields(DataInput in) throws IOException {
-    byte version = in.readByte();
-    if (version != VERSION) {
-      throw new IOException("Version mismatch in deserialization: " +
-          "expected="+VERSION+", got="+version);
-    }
-    username = WritableUtils.readString(in);
-    keyId = WritableUtils.readVInt(in);
-    issueDate = WritableUtils.readVLong(in);
-    expirationDate = WritableUtils.readVLong(in);
-    sequenceNumber = WritableUtils.readVLong(in);
+    int len = in.readInt();
+    byte[] inBytes = new byte[len];
+    in.readFully(inBytes);
+    AuthenticationProtos.TokenIdentifier identifier =
+        AuthenticationProtos.TokenIdentifier.newBuilder().mergeFrom(inBytes).build();
+    // sanity check on type
+    if (!identifier.hasKind() ||
+        identifier.getKind() != AuthenticationProtos.TokenIdentifier.Kind.HBASE_AUTH_TOKEN) {
+      throw new IOException("Invalid TokenIdentifier kind from input "+identifier.getKind());
+    }
+
+    // copy the field values
+    if (identifier.hasUsername()) {
+      username = identifier.getUsername().toStringUtf8();
+    }
+    if (identifier.hasKeyId()) {
+      keyId = identifier.getKeyId();
+    }
+    if (identifier.hasIssueDate()) {
+      issueDate = identifier.getIssueDate();
+    }
+    if (identifier.hasExpirationDate()) {
+      expirationDate = identifier.getExpirationDate();
+    }
+    if (identifier.hasSequenceNumber()) {
+      sequenceNumber = identifier.getSequenceNumber();
+    }
   }
 
   @Override

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/security/token/AuthenticationTokenSelector.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/security/token/AuthenticationTokenSelector.java?rev=1424122&r1=1424121&r2=1424122&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/security/token/AuthenticationTokenSelector.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/security/token/AuthenticationTokenSelector.java Wed Dec 19 21:22:36 2012
@@ -20,6 +20,8 @@ package org.apache.hadoop.hbase.security
 
 import java.util.Collection;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.security.token.TokenIdentifier;
@@ -27,6 +29,7 @@ import org.apache.hadoop.security.token.
 
 public class AuthenticationTokenSelector
     implements TokenSelector<AuthenticationTokenIdentifier> {
+  private static Log LOG = LogFactory.getLog(AuthenticationTokenSelector.class);
 
   public AuthenticationTokenSelector() {
   }
@@ -38,10 +41,14 @@ public class AuthenticationTokenSelector
       for (Token ident : tokens) {
         if (serviceName.equals(ident.getService()) &&
             AuthenticationTokenIdentifier.AUTH_TOKEN_TYPE.equals(ident.getKind())) {
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("Returning token "+ident);
+          }
           return (Token<AuthenticationTokenIdentifier>)ident;
         }
       }
     }
+    LOG.debug("No matching token found");
     return null;
   }
 }

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/security/token/TokenProvider.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/security/token/TokenProvider.java?rev=1424122&r1=1424121&r2=1424122&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/security/token/TokenProvider.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/security/token/TokenProvider.java Wed Dec 19 21:22:36 2012
@@ -20,14 +20,21 @@ package org.apache.hadoop.hbase.security
 
 import java.io.IOException;
 
+import com.google.protobuf.RpcCallback;
+import com.google.protobuf.RpcController;
+import com.google.protobuf.Service;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.Coprocessor;
 import org.apache.hadoop.hbase.CoprocessorEnvironment;
-import org.apache.hadoop.hbase.coprocessor.BaseEndpointCoprocessor;
+import org.apache.hadoop.hbase.coprocessor.CoprocessorService;
 import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
 import org.apache.hadoop.hbase.ipc.HBaseServer;
 import org.apache.hadoop.hbase.ipc.RequestContext;
 import org.apache.hadoop.hbase.ipc.RpcServer;
+import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
+import org.apache.hadoop.hbase.protobuf.ResponseConverter;
+import org.apache.hadoop.hbase.protobuf.generated.AuthenticationProtos;
 import org.apache.hadoop.hbase.security.AccessDeniedException;
 import org.apache.hadoop.hbase.security.User;
 import org.apache.hadoop.security.UserGroupInformation;
@@ -37,12 +44,11 @@ import org.apache.hadoop.security.token.
 
 /**
  * Provides a service for obtaining authentication tokens via the
- * {@link AuthenticationProtocol} coprocessor protocol.
+ * {@link AuthenticationProtos.AuthenticationService} coprocessor service.
  */
-public class TokenProvider extends BaseEndpointCoprocessor
-    implements AuthenticationProtocol {
+public class TokenProvider implements AuthenticationProtos.AuthenticationService.Interface,
+    Coprocessor, CoprocessorService {
 
-  public static final long VERSION = 0L;
   private static Log LOG = LogFactory.getLog(TokenProvider.class);
 
   private AuthenticationTokenSecretManager secretManager;
@@ -50,8 +56,6 @@ public class TokenProvider extends BaseE
 
   @Override
   public void start(CoprocessorEnvironment env) {
-    super.start(env);
-
     // if running at region
     if (env instanceof RegionCoprocessorEnvironment) {
       RegionCoprocessorEnvironment regionEnv =
@@ -65,28 +69,7 @@ public class TokenProvider extends BaseE
   }
 
   @Override
-  public Token<AuthenticationTokenIdentifier> getAuthenticationToken()
-      throws IOException {
-    if (secretManager == null) {
-      throw new IOException(
-          "No secret manager configured for token authentication");
-    }
-
-    User currentUser = RequestContext.getRequestUser();
-    UserGroupInformation ugi = null;
-    if (currentUser != null) {
-      ugi = currentUser.getUGI();
-    }
-    if (currentUser == null) {
-      throw new AccessDeniedException("No authenticated user for request!");
-    } else if (!isAllowedDelegationTokenOp(ugi)) {
-      LOG.warn("Token generation denied for user="+currentUser.getName()
-          +", authMethod="+ugi.getAuthenticationMethod());
-      throw new AccessDeniedException(
-          "Token generation only allowed for Kerberos authenticated clients");
-    }
-
-    return secretManager.generateToken(currentUser.getName());
+  public void stop(CoprocessorEnvironment env) throws IOException {
   }
 
   /**
@@ -106,18 +89,62 @@ public class TokenProvider extends BaseE
     return true;
   }
 
+  // AuthenticationService implementation
+
   @Override
-  public String whoami() {
-    return RequestContext.getRequestUserName();
+  public Service getService() {
+    return AuthenticationProtos.AuthenticationService.newReflectiveService(this);
   }
 
   @Override
-  public long getProtocolVersion(String protocol, long clientVersion)
-      throws IOException {
-    if (AuthenticationProtocol.class.getName().equals(protocol)) {
-      return TokenProvider.VERSION;
+  public void getAuthenticationToken(RpcController controller,
+                                     AuthenticationProtos.TokenRequest request,
+                                     RpcCallback<AuthenticationProtos.TokenResponse> done) {
+    AuthenticationProtos.TokenResponse.Builder response =
+        AuthenticationProtos.TokenResponse.newBuilder();
+
+    try {
+      if (secretManager == null) {
+        throw new IOException(
+            "No secret manager configured for token authentication");
+      }
+
+      User currentUser = RequestContext.getRequestUser();
+      UserGroupInformation ugi = null;
+      if (currentUser != null) {
+        ugi = currentUser.getUGI();
+      }
+      if (currentUser == null) {
+        throw new AccessDeniedException("No authenticated user for request!");
+      } else if (!isAllowedDelegationTokenOp(ugi)) {
+        LOG.warn("Token generation denied for user="+currentUser.getName()
+            +", authMethod="+ugi.getAuthenticationMethod());
+        throw new AccessDeniedException(
+            "Token generation only allowed for Kerberos authenticated clients");
+      }
+
+      Token<AuthenticationTokenIdentifier> token =
+          secretManager.generateToken(currentUser.getName());
+      response.setToken(ProtobufUtil.toToken(token)).build();
+    } catch (IOException ioe) {
+      ResponseConverter.setControllerException(controller, ioe);
+    }
+    done.run(response.build());
+  }
+
+  @Override
+  public void whoami(RpcController controller, AuthenticationProtos.WhoAmIRequest request,
+                     RpcCallback<AuthenticationProtos.WhoAmIResponse> done) {
+    User requestUser = RequestContext.getRequestUser();
+    AuthenticationProtos.WhoAmIResponse.Builder response =
+        AuthenticationProtos.WhoAmIResponse.newBuilder();
+    if (requestUser != null) {
+      response.setUsername(requestUser.getShortName());
+      AuthenticationMethod method = requestUser.getUGI().getAuthenticationMethod();
+      if (method != null) {
+        response.setAuthMethod(method.name());
+      }
     }
-    LOG.warn("Unknown protocol requested: "+protocol);
-    return -1;
+    done.run(response.build());
   }
 }

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/security/token/TokenUtil.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/security/token/TokenUtil.java?rev=1424122&r1=1424121&r2=1424122&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/security/token/TokenUtil.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/security/token/TokenUtil.java Wed Dec 19 21:22:36 2012
@@ -22,11 +22,15 @@ import java.io.IOException;
 import java.lang.reflect.UndeclaredThrowableException;
 import java.security.PrivilegedExceptionAction;
 
+import com.google.protobuf.ServiceException;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel;
+import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
+import org.apache.hadoop.hbase.protobuf.generated.AuthenticationProtos;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapreduce.Job;
@@ -49,14 +53,22 @@ public class TokenUtil {
     HTable meta = null;
     try {
       meta = new HTable(conf, ".META.");
-      AuthenticationProtocol prot = meta.coprocessorProxy(
-          AuthenticationProtocol.class, HConstants.EMPTY_START_ROW);
-      return prot.getAuthenticationToken();
+      CoprocessorRpcChannel rpcChannel = meta.coprocessorService(HConstants.EMPTY_START_ROW);
+      AuthenticationProtos.AuthenticationService.BlockingInterface service =
+          AuthenticationProtos.AuthenticationService.newBlockingStub(rpcChannel);
+      AuthenticationProtos.TokenResponse response = service.getAuthenticationToken(null,
+          AuthenticationProtos.TokenRequest.getDefaultInstance());
+
+      return ProtobufUtil.toToken(response.getToken());
+    } catch (ServiceException se) {
+      ProtobufUtil.toIOException(se);
     } finally {
       if (meta != null) {
         meta.close();
       }
     }
+    // dummy return for ServiceException catch block
+    return null;
   }
 
   private static Text getClusterId(Token<AuthenticationTokenIdentifier> token)

Modified: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/security/token/TestTokenAuthentication.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/security/token/TestTokenAuthentication.java?rev=1424122&r1=1424121&r2=1424122&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/security/token/TestTokenAuthentication.java (original)
+++ hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/security/token/TestTokenAuthentication.java Wed Dec 19 21:22:36 2012
@@ -21,27 +21,54 @@ package org.apache.hadoop.hbase.security
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 
+import java.io.IOException;
+import java.net.InetSocketAddress;
 import java.security.PrivilegedExceptionAction;
-import java.util.UUID;
+import java.util.concurrent.ConcurrentMap;
 
+import com.google.protobuf.RpcController;
+import com.google.protobuf.ServiceException;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.ClusterId;
+import org.apache.hadoop.hbase.Coprocessor;
 import org.apache.hadoop.hbase.HBaseTestingUtility;
 import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.LargeTests;
-import org.apache.hadoop.hbase.client.HTable;
-import org.apache.hadoop.hbase.coprocessor.BaseEndpointCoprocessor;
-import org.apache.hadoop.hbase.ipc.CoprocessorProtocol;
+import org.apache.hadoop.hbase.MediumTests;
+import org.apache.hadoop.hbase.Server;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.catalog.CatalogTracker;
+import org.apache.hadoop.hbase.client.HTableInterface;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.hadoop.hbase.ipc.BlockingRpcCallback;
 import org.apache.hadoop.hbase.ipc.HBaseRPC;
 import org.apache.hadoop.hbase.ipc.HBaseServer;
+import org.apache.hadoop.hbase.ipc.ProtocolSignature;
 import org.apache.hadoop.hbase.ipc.RequestContext;
 import org.apache.hadoop.hbase.ipc.RpcServer;
-import org.apache.hadoop.hbase.regionserver.HRegionServer;
+import org.apache.hadoop.hbase.ipc.ServerRpcController;
+import org.apache.hadoop.hbase.ipc.VersionedProtocol;
+import org.apache.hadoop.hbase.protobuf.generated.AuthenticationProtos;
+import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.regionserver.RegionServerServices;
+import org.apache.hadoop.hbase.security.KerberosInfo;
+import org.apache.hadoop.hbase.security.TokenInfo;
 import org.apache.hadoop.hbase.security.User;
 import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.hadoop.hbase.util.MockRegionServerServices;
+import org.apache.hadoop.hbase.util.Sleeper;
+import org.apache.hadoop.hbase.util.Strings;
+import org.apache.hadoop.hbase.util.Threads;
 import org.apache.hadoop.hbase.util.Writables;
+import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
+import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
+import org.apache.hadoop.net.DNS;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.token.SecretManager;
 import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.TokenIdentifier;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Test;
@@ -50,48 +77,259 @@ import org.junit.experimental.categories
 /**
  * Tests for authentication token creation and usage
  */
-@Category(LargeTests.class)
+@Category(MediumTests.class)
 public class TestTokenAuthentication {
-  public static interface IdentityProtocol extends CoprocessorProtocol {
-    public String whoami();
-    public String getAuthMethod();
-  }
-
-  public static class IdentityCoprocessor extends BaseEndpointCoprocessor
-      implements IdentityProtocol {
-    public String whoami() {
-      return RequestContext.getRequestUserName();
-    }
-
-    public String getAuthMethod() {
-      UserGroupInformation ugi = null;
-      User user = RequestContext.getRequestUser();
-      if (user != null) {
-        ugi = user.getUGI();
-      }
-      if (ugi != null) {
-        return ugi.getAuthenticationMethod().toString();
+  private static Log LOG = LogFactory.getLog(TestTokenAuthentication.class);
+
+  @KerberosInfo(
+      serverPrincipal = "hbase.test.kerberos.principal")
+  @TokenInfo("HBASE_AUTH_TOKEN")
+  private static interface BlockingAuthenticationService
+      extends AuthenticationProtos.AuthenticationService.BlockingInterface, VersionedProtocol {
+    long VERSION = 1L;
+  }
+
+  /**
+   * Basic server process for RPC authentication testing
+   */
+  private static class TokenServer extends TokenProvider
+      implements BlockingAuthenticationService, Runnable, Server {
+
+    private static Log LOG = LogFactory.getLog(TokenServer.class);
+
+    private Configuration conf;
+    private RpcServer rpcServer;
+    private InetSocketAddress isa;
+    private ZooKeeperWatcher zookeeper;
+    private Sleeper sleeper;
+    private boolean started = false;
+    private boolean aborted = false;
+    private boolean stopped = false;
+    private long startcode;
+    private AuthenticationProtos.AuthenticationService.BlockingInterface blockingService;
+
+    public TokenServer(Configuration conf) throws IOException {
+      this.conf = conf;
+      this.startcode = EnvironmentEdgeManager.currentTimeMillis();
+
+      // Server to handle client requests.
+      String hostname = Strings.domainNamePointerToHostName(
+          DNS.getDefaultHost("default", "default"));
+      int port = 0;
+      // Creation of an ISA will force a resolve.
+      InetSocketAddress initialIsa = new InetSocketAddress(hostname, port);
+      if (initialIsa.getAddress() == null) {
+        throw new IllegalArgumentException("Failed resolve of " + initialIsa);
       }
+
+      this.rpcServer = HBaseRPC.getServer(TokenServer.class, this,
+          new Class<?>[]{AuthenticationProtos.AuthenticationService.Interface.class},
+          initialIsa.getHostName(), // BindAddress is IP we got for this server.
+          initialIsa.getPort(),
+          3, // handlers
+          1, // meta handlers (not used)
+          true,
+          this.conf, HConstants.QOS_THRESHOLD);
+      // Set our address.
+      this.isa = this.rpcServer.getListenerAddress();
+      this.sleeper = new Sleeper(1000, this);
+    }
+
+    @Override
+    public Configuration getConfiguration() {
+      return conf;
+    }
+
+    @Override
+    public CatalogTracker getCatalogTracker() {
       return null;
     }
+
+    @Override
+    public ZooKeeperWatcher getZooKeeper() {
+      return zookeeper;
+    }
+
+    @Override
+    public boolean isAborted() {
+      return aborted;
+    }
+
+    @Override
+    public ServerName getServerName() {
+      return new ServerName(isa.getHostName(), isa.getPort(), startcode);
+    }
+
+    @Override
+    public void abort(String reason, Throwable error) {
+      LOG.fatal("Aborting on: "+reason, error);
+      this.aborted = true;
+      this.stopped = true;
+      sleeper.skipSleepCycle();
+    }
+
+    private void initialize() throws IOException {
+      // ZK configuration must _not_ have hbase.security.authentication or it will require SASL auth
+      Configuration zkConf = new Configuration(conf);
+      zkConf.set(User.HBASE_SECURITY_CONF_KEY, "simple");
+      this.zookeeper = new ZooKeeperWatcher(zkConf, TokenServer.class.getSimpleName(),
+          this, true);
+      this.rpcServer.start();
+
+      // mock RegionServerServices to provide to coprocessor environment
+      final RegionServerServices mockServices = new MockRegionServerServices() {
+        @Override
+        public RpcServer getRpcServer() { return rpcServer; }
+      };
+
+      // mock up coprocessor environment
+      super.start(new RegionCoprocessorEnvironment() {
+        @Override
+        public HRegion getRegion() { return null; }
+
+        @Override
+        public RegionServerServices getRegionServerServices() {
+          return mockServices;
+        }
+
+        @Override
+        public ConcurrentMap<String, Object> getSharedData() { return null; }
+
+        @Override
+        public int getVersion() { return 0; }
+
+        @Override
+        public String getHBaseVersion() { return null; }
+
+        @Override
+        public Coprocessor getInstance() { return null; }
+
+        @Override
+        public int getPriority() { return 0; }
+
+        @Override
+        public int getLoadSequence() { return 0; }
+
+        @Override
+        public Configuration getConfiguration() { return conf; }
+
+        @Override
+        public HTableInterface getTable(byte[] tableName) throws IOException { return null; }
+      });
+
+      started = true;
+    }
+
+    public void run() {
+      try {
+        initialize();
+        while (!stopped) {
+          this.sleeper.sleep();
+        }
+      } catch (Exception e) {
+        abort(e.getMessage(), e);
+      }
+      this.rpcServer.stop();
+    }
+
+    public boolean isStarted() {
+      return started;
+    }
+
+    @Override
+    public void stop(String reason) {
+      LOG.info("Stopping due to: "+reason);
+      this.stopped = true;
+      sleeper.skipSleepCycle();
+    }
+
+    @Override
+    public boolean isStopped() {
+      return stopped;
+    }
+
+    public InetSocketAddress getAddress() {
+      return isa;
+    }
+
+    public SecretManager<? extends TokenIdentifier> getSecretManager() {
+      return ((HBaseServer)rpcServer).getSecretManager();
+    }
+
+    @Override
+    public AuthenticationProtos.TokenResponse getAuthenticationToken(
+        RpcController controller, AuthenticationProtos.TokenRequest request)
+      throws ServiceException {
+      LOG.debug("Authentication token request from "+RequestContext.getRequestUserName());
+      // ignore passed in controller -- it's always null
+      ServerRpcController serverController = new ServerRpcController();
+      BlockingRpcCallback<AuthenticationProtos.TokenResponse> callback =
+          new BlockingRpcCallback<AuthenticationProtos.TokenResponse>();
+      getAuthenticationToken(serverController, request, callback);
+      try {
+        serverController.checkFailed();
+        return callback.get();
+      } catch (IOException ioe) {
+        throw new ServiceException(ioe);
+      }
+    }
+
+    @Override
+    public AuthenticationProtos.WhoAmIResponse whoami(
+        RpcController controller, AuthenticationProtos.WhoAmIRequest request)
+      throws ServiceException {
+      LOG.debug("whoami() request from "+RequestContext.getRequestUserName());
+      // ignore passed in controller -- it's always null
+      ServerRpcController serverController = new ServerRpcController();
+      BlockingRpcCallback<AuthenticationProtos.WhoAmIResponse> callback =
+          new BlockingRpcCallback<AuthenticationProtos.WhoAmIResponse>();
+      whoami(serverController, request, callback);
+      try {
+        serverController.checkFailed();
+        return callback.get();
+      } catch (IOException ioe) {
+        throw new ServiceException(ioe);
+      }
+    }
+
+    /* VersionedProtocol implementation */
+    @Override
+    public long getProtocolVersion(String protocol, long clientVersion) throws IOException {
+      return BlockingAuthenticationService.VERSION;
+    }
+
+    @Override
+    public ProtocolSignature getProtocolSignature(String protocol, long clientVersion, int clientMethodsHash) throws IOException {
+      return new ProtocolSignature(BlockingAuthenticationService.VERSION, null);
+    }
   }
 
+
   private static HBaseTestingUtility TEST_UTIL;
+  private static TokenServer server;
+  private static Thread serverThread;
   private static AuthenticationTokenSecretManager secretManager;
+  private static ClusterId clusterId = new ClusterId();
 
   @BeforeClass
   public static void setupBeforeClass() throws Exception {
     TEST_UTIL = new HBaseTestingUtility();
+    TEST_UTIL.startMiniZKCluster();
+    // security settings only added after startup so that ZK does not require SASL
     Configuration conf = TEST_UTIL.getConfiguration();
-    conf.set("hbase.coprocessor.region.classes",
-        IdentityCoprocessor.class.getName());
-    TEST_UTIL.startMiniCluster();
-    HRegionServer rs = TEST_UTIL.getMiniHBaseCluster().getRegionServer(0);
-    secretManager = new AuthenticationTokenSecretManager(conf, rs.getZooKeeper(),
-        rs.getServerName().toString(), 
-        conf.getLong("hbase.auth.key.update.interval", 24*60*60*1000), 
-        conf.getLong("hbase.auth.token.max.lifetime", 7*24*60*60*1000));
-    secretManager.start(); 
+    conf.set("hadoop.security.authentication", "kerberos");
+    conf.set("hbase.security.authentication", "kerberos");
+    server = new TokenServer(conf);
+    serverThread = new Thread(server);
+    Threads.setDaemonThreadRunning(serverThread,
+        "TokenServer:"+server.getServerName().toString());
+    // wait for startup
+    while (!server.isStarted() && !server.isStopped()) {
+      Thread.sleep(10);
+    }
+
+    ZKClusterId.setClusterId(server.getZooKeeper(), clusterId);
+    secretManager = (AuthenticationTokenSecretManager)server.getSecretManager();
     while(secretManager.getCurrentKey() == null) {
       Thread.sleep(1);
     }
@@ -99,7 +337,9 @@ public class TestTokenAuthentication {
 
   @AfterClass
   public static void tearDownAfterClass() throws Exception {
-    TEST_UTIL.shutdownMiniCluster();
+    server.stop("Test complete");
+    Threads.shutdown(serverThread);
+    TEST_UTIL.shutdownMiniZKCluster();
   }
 
   @Test
@@ -116,7 +356,7 @@ public class TestTokenAuthentication {
         Bytes.equals(token.getPassword(), passwd));
   }
 
-  // @Test - Disable due to kerberos requirement
+  @Test
   public void testTokenAuthentication() throws Exception {
     UserGroupInformation testuser =
         UserGroupInformation.createUserForTesting("testuser", new String[]{"testgroup"});
@@ -124,22 +364,31 @@ public class TestTokenAuthentication {
     testuser.setAuthenticationMethod(
         UserGroupInformation.AuthenticationMethod.TOKEN);
     final Configuration conf = TEST_UTIL.getConfiguration();
-    conf.set("hadoop.security.authentication", "kerberos");
-    conf.set("randomkey", UUID.randomUUID().toString());
     testuser.setConfiguration(conf);
     Token<AuthenticationTokenIdentifier> token =
         secretManager.generateToken("testuser");
+    LOG.debug("Got token: " + token.toString());
     testuser.addToken(token);
 
     // verify the server authenticates us as this token user
     testuser.doAs(new PrivilegedExceptionAction<Object>() {
       public Object run() throws Exception {
-        HTable table = new HTable(conf, ".META.");
-        IdentityProtocol prot = table.coprocessorProxy(
-            IdentityProtocol.class, HConstants.EMPTY_START_ROW);
-        String myname = prot.whoami();
+        Configuration c = server.getConfiguration();
+        c.set(HConstants.CLUSTER_ID, clusterId.toString());
+        AuthenticationProtos.AuthenticationService.BlockingInterface proxy =
+            (AuthenticationProtos.AuthenticationService.BlockingInterface)
+            HBaseRPC.waitForProxy(BlockingAuthenticationService.class,
+                BlockingAuthenticationService.VERSION,
+                server.getAddress(), c,
+                HConstants.DEFAULT_HBASE_CLIENT_RPC_MAXATTEMPTS,
+                HConstants.DEFAULT_HBASE_RPC_TIMEOUT,
+                HConstants.DEFAULT_HBASE_RPC_TIMEOUT);
+
+        AuthenticationProtos.WhoAmIResponse response =
+            proxy.whoami(null, AuthenticationProtos.WhoAmIRequest.getDefaultInstance());
+        String myname = response.getUsername();
         assertEquals("testuser", myname);
-        String authMethod = prot.getAuthMethod();
+        String authMethod = response.getAuthMethod();
         assertEquals("TOKEN", authMethod);
         return null;
       }