You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by st...@apache.org on 2012/05/12 00:06:59 UTC
svn commit: r1337396 [1/5] - in /hbase/trunk: ./
security/src/main/java/org/apache/hadoop/hbase/security/
security/src/main/java/org/apache/hadoop/hbase/security/access/
security/src/main/java/org/apache/hadoop/hbase/security/token/
security/src/test/ ...
Author: stack
Date: Fri May 11 22:06:57 2012
New Revision: 1337396
URL: http://svn.apache.org/viewvc?rev=1337396&view=rev
Log:
HBASE-5732 Remove the SecureRPCEngine and merge the security-related logic in the core engine
Added:
hbase/trunk/src/main/java/org/apache/hadoop/hbase/security/AccessDeniedException.java
hbase/trunk/src/main/java/org/apache/hadoop/hbase/security/HBasePolicyProvider.java
hbase/trunk/src/main/java/org/apache/hadoop/hbase/security/HBaseSaslRpcClient.java
hbase/trunk/src/main/java/org/apache/hadoop/hbase/security/HBaseSaslRpcServer.java
hbase/trunk/src/main/java/org/apache/hadoop/hbase/security/access/
hbase/trunk/src/main/java/org/apache/hadoop/hbase/security/access/AccessControlFilter.java
hbase/trunk/src/main/java/org/apache/hadoop/hbase/security/access/AccessControlLists.java
hbase/trunk/src/main/java/org/apache/hadoop/hbase/security/access/AccessController.java
hbase/trunk/src/main/java/org/apache/hadoop/hbase/security/access/AccessControllerProtocol.java
hbase/trunk/src/main/java/org/apache/hadoop/hbase/security/access/Permission.java
hbase/trunk/src/main/java/org/apache/hadoop/hbase/security/access/TableAuthManager.java
hbase/trunk/src/main/java/org/apache/hadoop/hbase/security/access/TablePermission.java
hbase/trunk/src/main/java/org/apache/hadoop/hbase/security/access/UserPermission.java
hbase/trunk/src/main/java/org/apache/hadoop/hbase/security/access/ZKPermissionWatcher.java
hbase/trunk/src/main/java/org/apache/hadoop/hbase/security/token/
hbase/trunk/src/main/java/org/apache/hadoop/hbase/security/token/AuthenticationKey.java
hbase/trunk/src/main/java/org/apache/hadoop/hbase/security/token/AuthenticationProtocol.java
hbase/trunk/src/main/java/org/apache/hadoop/hbase/security/token/AuthenticationTokenIdentifier.java
hbase/trunk/src/main/java/org/apache/hadoop/hbase/security/token/AuthenticationTokenSecretManager.java
hbase/trunk/src/main/java/org/apache/hadoop/hbase/security/token/AuthenticationTokenSelector.java
hbase/trunk/src/main/java/org/apache/hadoop/hbase/security/token/TokenProvider.java
hbase/trunk/src/main/java/org/apache/hadoop/hbase/security/token/TokenUtil.java
hbase/trunk/src/main/java/org/apache/hadoop/hbase/security/token/ZKSecretWatcher.java
hbase/trunk/src/test/java/org/apache/hadoop/hbase/security/access/
hbase/trunk/src/test/java/org/apache/hadoop/hbase/security/access/SecureTestUtil.java
hbase/trunk/src/test/java/org/apache/hadoop/hbase/security/access/TestAccessControlFilter.java
hbase/trunk/src/test/java/org/apache/hadoop/hbase/security/access/TestAccessController.java
hbase/trunk/src/test/java/org/apache/hadoop/hbase/security/access/TestTablePermissions.java
hbase/trunk/src/test/java/org/apache/hadoop/hbase/security/access/TestZKPermissionsWatcher.java
hbase/trunk/src/test/java/org/apache/hadoop/hbase/security/token/
hbase/trunk/src/test/java/org/apache/hadoop/hbase/security/token/TestTokenAuthentication.java
hbase/trunk/src/test/java/org/apache/hadoop/hbase/security/token/TestZKSecretWatcher.java
Removed:
hbase/trunk/security/src/main/java/org/apache/hadoop/hbase/security/HBaseSaslRpcServer.java
hbase/trunk/security/src/main/java/org/apache/hadoop/hbase/security/access/AccessControlFilter.java
hbase/trunk/security/src/main/java/org/apache/hadoop/hbase/security/access/AccessControlLists.java
hbase/trunk/security/src/main/java/org/apache/hadoop/hbase/security/access/AccessController.java
hbase/trunk/security/src/main/java/org/apache/hadoop/hbase/security/access/AccessControllerProtocol.java
hbase/trunk/security/src/main/java/org/apache/hadoop/hbase/security/access/Permission.java
hbase/trunk/security/src/main/java/org/apache/hadoop/hbase/security/access/TableAuthManager.java
hbase/trunk/security/src/main/java/org/apache/hadoop/hbase/security/access/TablePermission.java
hbase/trunk/security/src/main/java/org/apache/hadoop/hbase/security/access/UserPermission.java
hbase/trunk/security/src/main/java/org/apache/hadoop/hbase/security/token/AuthenticationKey.java
hbase/trunk/security/src/main/java/org/apache/hadoop/hbase/security/token/AuthenticationProtocol.java
hbase/trunk/security/src/main/java/org/apache/hadoop/hbase/security/token/AuthenticationTokenIdentifier.java
hbase/trunk/security/src/main/java/org/apache/hadoop/hbase/security/token/AuthenticationTokenSecretManager.java
hbase/trunk/security/src/main/java/org/apache/hadoop/hbase/security/token/AuthenticationTokenSelector.java
hbase/trunk/security/src/main/java/org/apache/hadoop/hbase/security/token/TokenProvider.java
hbase/trunk/security/src/main/java/org/apache/hadoop/hbase/security/token/TokenUtil.java
hbase/trunk/security/src/main/java/org/apache/hadoop/hbase/security/token/ZKSecretWatcher.java
hbase/trunk/security/src/test/
Modified:
hbase/trunk/pom.xml
hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/AdminProtocol.java
hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/ClientProtocol.java
hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/ConnectionHeader.java
hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/HBaseClient.java
hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java
hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/RegionServerStatusProtocol.java
hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/WritableRpcEngine.java
hbase/trunk/src/main/java/org/apache/hadoop/hbase/protobuf/generated/RPCProtos.java
hbase/trunk/src/main/java/org/apache/hadoop/hbase/security/User.java
hbase/trunk/src/main/protobuf/RPC.proto
hbase/trunk/src/test/resources/hbase-site.xml
Modified: hbase/trunk/pom.xml
URL: http://svn.apache.org/viewvc/hbase/trunk/pom.xml?rev=1337396&r1=1337395&r2=1337396&view=diff
==============================================================================
--- hbase/trunk/pom.xml (original)
+++ hbase/trunk/pom.xml Fri May 11 22:06:57 2012
@@ -1624,64 +1624,6 @@
</build>
</profile>
- <!-- profile for building against Hadoop 0.20+security-->
- <profile>
- <id>security</id>
- <properties>
- <hadoop.version>1.0.2</hadoop.version>
- </properties>
- <build>
- <finalName>${project.artifactId}-${project.version}-security</finalName>
- <plugins>
- <plugin>
- <groupId>org.codehaus.mojo</groupId>
- <artifactId>build-helper-maven-plugin</artifactId>
- <executions>
- <execution>
- <id>add-source</id>
- <goals>
- <goal>add-source</goal>
- </goals>
- <configuration>
- <sources>
- <source>${project.basedir}/security/src/main/java</source>
- </sources>
- </configuration>
- </execution>
- <execution>
- <id>add-test-source</id>
- <goals>
- <goal>add-test-source</goal>
- </goals>
- <configuration>
- <sources>
- <source>${project.basedir}/security/src/test/java</source>
- </sources>
- </configuration>
- </execution>
- <execution>
- <id>add-test-resource</id>
- <goals>
- <goal>add-test-resource</goal>
- </goals>
- <configuration>
- <resources>
- <resource>
- <directory>${project.basedir}/security/src/test/resources</directory>
- <includes>
- <include>hbase-site.xml</include>
- </includes>
- </resource>
- </resources>
- </configuration>
- </execution>
- </executions>
- </plugin>
- </plugins>
- </build>
- </profile>
-
-
<!--
profile for building against Hadoop 0.22.0. Activate using:
mvn -Dhadoop.profile=22
Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/AdminProtocol.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/AdminProtocol.java?rev=1337396&r1=1337395&r2=1337396&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/AdminProtocol.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/AdminProtocol.java Fri May 11 22:06:57 2012
@@ -22,7 +22,7 @@ import org.apache.hadoop.classification.
import org.apache.hadoop.hbase.ipc.VersionedProtocol;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.AdminService;
import org.apache.hadoop.hbase.security.TokenInfo;
-import org.apache.hadoop.security.KerberosInfo;
+import org.apache.hadoop.hbase.security.KerberosInfo;
/**
* Protocol that a HBase client uses to communicate with a region server.
Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/ClientProtocol.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/ClientProtocol.java?rev=1337396&r1=1337395&r2=1337396&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/ClientProtocol.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/ClientProtocol.java Fri May 11 22:06:57 2012
@@ -23,7 +23,7 @@ import org.apache.hadoop.classification.
import org.apache.hadoop.hbase.ipc.VersionedProtocol;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ClientService;
import org.apache.hadoop.hbase.security.TokenInfo;
-import org.apache.hadoop.security.KerberosInfo;
+import org.apache.hadoop.hbase.security.KerberosInfo;
/**
* Protocol that a HBase client uses to communicate with a region server.
Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/ConnectionHeader.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/ConnectionHeader.java?rev=1337396&r1=1337395&r2=1337396&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/ConnectionHeader.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/ConnectionHeader.java Fri May 11 22:06:57 2012
@@ -1,77 +0,0 @@
-/*
- * Copyright 2010 The Apache Software Foundation
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.ipc;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.security.User;
-
-/**
- * The IPC connection header sent by the client to the server
- * on connection establishment.
- */
-@InterfaceAudience.Private
-class ConnectionHeader implements Writable {
- protected String protocol;
-
- public ConnectionHeader() {}
-
- /**
- * Create a new {@link ConnectionHeader} with the given <code>protocol</code>
- * and {@link User}.
- * @param protocol protocol used for communication between the IPC client
- * and the server
- * @param user {@link User} of the client communicating with
- * the server
- */
- public ConnectionHeader(String protocol, User user) {
- this.protocol = protocol;
- }
-
- @Override
- public void readFields(DataInput in) throws IOException {
- protocol = Text.readString(in);
- if (protocol.isEmpty()) {
- protocol = null;
- }
- }
-
- @Override
- public void write(DataOutput out) throws IOException {
- Text.writeString(out, (protocol == null) ? "" : protocol);
- }
-
- public String getProtocol() {
- return protocol;
- }
-
- public User getUser() {
- return null;
- }
-
- public String toString() {
- return protocol;
- }
-}
Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/HBaseClient.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/HBaseClient.java?rev=1337396&r1=1337395&r2=1337396&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/HBaseClient.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/HBaseClient.java Fri May 11 22:06:57 2012
@@ -28,13 +28,18 @@ import java.io.EOFException;
import java.io.FilterInputStream;
import java.io.IOException;
import java.io.InputStream;
+import java.io.OutputStream;
import java.net.ConnectException;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.net.SocketException;
import java.net.SocketTimeoutException;
import java.net.UnknownHostException;
+import java.security.PrivilegedExceptionAction;
+import java.util.HashMap;
import java.util.Iterator;
+import java.util.Map;
+import java.util.Random;
import java.util.Map.Entry;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -48,18 +53,32 @@ import org.apache.hadoop.classification.
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.protobuf.generated.RPCProtos;
+import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RpcResponse.Status;
import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.ConnectionHeader;
import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RpcRequest;
import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RpcResponse;
+import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.UserInformation;
+import org.apache.hadoop.hbase.security.HBaseSaslRpcClient;
+import org.apache.hadoop.hbase.security.KerberosInfo;
+import org.apache.hadoop.hbase.security.TokenInfo;
+import org.apache.hadoop.hbase.security.HBaseSaslRpcServer.AuthMethod;
import org.apache.hadoop.hbase.security.User;
+import org.apache.hadoop.hbase.security.token.AuthenticationTokenIdentifier;
+import org.apache.hadoop.hbase.security.token.AuthenticationTokenSelector;
import org.apache.hadoop.hbase.util.PoolMap;
import org.apache.hadoop.hbase.util.PoolMap.PoolType;
import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.hbase.io.DataOutputOutputStream;
import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.security.SecurityUtil;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.TokenIdentifier;
+import org.apache.hadoop.security.token.TokenSelector;
import org.apache.hadoop.util.ReflectionUtils;
import com.google.protobuf.ByteString;
@@ -213,7 +232,12 @@ public class HBaseClient {
return this.startTime;
}
}
-
+ protected static Map<String,TokenSelector<? extends TokenIdentifier>> tokenHandlers =
+ new HashMap<String,TokenSelector<? extends TokenIdentifier>>();
+ static {
+ tokenHandlers.put(AuthenticationTokenIdentifier.AUTH_TOKEN_TYPE.toString(),
+ new AuthenticationTokenSelector());
+ }
/** Thread that reads responses and notifies callers. Each connection owns a
* socket connected to a remote address. Calls are multiplexed through this
* socket: responses may be delivered out of order. */
@@ -223,6 +247,13 @@ public class HBaseClient {
protected Socket socket = null; // connected socket
protected DataInputStream in;
protected DataOutputStream out;
+ private InetSocketAddress server; // server ip:port
+ private String serverPrincipal; // server's krb5 principal name
+ private AuthMethod authMethod; // authentication method
+ private boolean useSasl;
+ private Token<? extends TokenIdentifier> token;
+ private HBaseSaslRpcClient saslRpcClient;
+ private int reloginMaxBackoff; // max pause before relogin on sasl failure
// currently active calls
protected final ConcurrentSkipListMap<Integer, Call> calls = new ConcurrentSkipListMap<Integer, Call>();
@@ -235,20 +266,90 @@ public class HBaseClient {
throw new UnknownHostException("unknown host: " +
remoteId.getAddress().getHostName());
}
+ this.server = remoteId.getAddress();
+
+ UserGroupInformation ticket = remoteId.getTicket().getUGI();
+ Class<?> protocol = remoteId.getProtocol();
+ this.useSasl = UserGroupInformation.isSecurityEnabled();
+ if (useSasl && protocol != null) {
+ TokenInfo tokenInfo = protocol.getAnnotation(TokenInfo.class);
+ if (tokenInfo != null) {
+ TokenSelector<? extends TokenIdentifier> tokenSelector =
+ tokenHandlers.get(tokenInfo.value());
+ if (tokenSelector != null) {
+ token = tokenSelector.selectToken(new Text(clusterId),
+ ticket.getTokens());
+ } else if (LOG.isDebugEnabled()) {
+ LOG.debug("No token selector found for type "+tokenInfo.value());
+ }
+ }
+ KerberosInfo krbInfo = protocol.getAnnotation(KerberosInfo.class);
+ if (krbInfo != null) {
+ String serverKey = krbInfo.serverPrincipal();
+ if (serverKey == null) {
+ throw new IOException(
+ "Can't obtain server Kerberos config key from KerberosInfo");
+ }
+ serverPrincipal = SecurityUtil.getServerPrincipal(
+ conf.get(serverKey), server.getAddress().getCanonicalHostName().toLowerCase());
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("RPC Server Kerberos principal name for protocol="
+ + protocol.getCanonicalName() + " is " + serverPrincipal);
+ }
+ }
+ }
+
+ if (!useSasl) {
+ authMethod = AuthMethod.SIMPLE;
+ } else if (token != null) {
+ authMethod = AuthMethod.DIGEST;
+ } else {
+ authMethod = AuthMethod.KERBEROS;
+ }
+
+ if (LOG.isDebugEnabled())
+ LOG.debug("Use " + authMethod + " authentication for protocol "
+ + protocol.getSimpleName());
+
+ reloginMaxBackoff = conf.getInt("hbase.security.relogin.maxbackoff", 5000);
this.remoteId = remoteId;
- User ticket = remoteId.getTicket();
- Class<? extends VersionedProtocol> protocol = remoteId.getProtocol();
ConnectionHeader.Builder builder = ConnectionHeader.newBuilder();
builder.setProtocol(protocol == null ? "" : protocol.getName());
+ UserInformation userInfoPB;
+ if ((userInfoPB = getUserInfoPB(ticket)) != null) {
+ builder.setUserInfo(userInfoPB);
+ }
this.header = builder.build();
this.setName("IPC Client (" + socketFactory.hashCode() +") connection to " +
remoteId.getAddress().toString() +
- ((ticket==null)?" from an unknown user": (" from " + ticket.getName())));
+ ((ticket==null)?" from an unknown user": (" from "
+ + ticket.getUserName())));
this.setDaemon(true);
}
+ private UserInformation getUserInfoPB(UserGroupInformation ugi) {
+ if (ugi == null || authMethod == AuthMethod.DIGEST) {
+ // Don't send user for token auth
+ return null;
+ }
+ UserInformation.Builder userInfoPB = UserInformation.newBuilder();
+ if (ugi != null) {
+ if (authMethod == AuthMethod.KERBEROS) {
+ // Send effective user for Kerberos auth
+ userInfoPB.setEffectiveUser(ugi.getUserName());
+ } else if (authMethod == AuthMethod.SIMPLE) {
+ //Send both effective user and real user for simple auth
+ userInfoPB.setEffectiveUser(ugi.getUserName());
+ if (ugi.getRealUser() != null) {
+ userInfoPB.setRealUser(ugi.getRealUser().getUserName());
+ }
+ }
+ }
+ return userInfoPB.build();
+ }
+
/** Update lastActivity with the current time. */
protected void touch() {
lastActivity.set(System.currentTimeMillis());
@@ -352,42 +453,6 @@ public class HBaseClient {
}
}
- /** Connect to the server and set up the I/O streams. It then sends
- * a header to the server and starts
- * the connection thread that waits for responses.
- * @throws java.io.IOException e
- */
- protected synchronized void setupIOstreams()
- throws IOException, InterruptedException {
-
- if (socket != null || shouldCloseConnection.get()) {
- return;
- }
-
- try {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Connecting to "+remoteId);
- }
- setupConnection();
- this.in = new DataInputStream(new BufferedInputStream
- (new PingInputStream(NetUtils.getInputStream(socket))));
- this.out = new DataOutputStream
- (new BufferedOutputStream(NetUtils.getOutputStream(socket)));
- writeHeader();
-
- // update last activity time
- touch();
-
- // start the receiver thread after the socket connection has been set up
- start();
- } catch (IOException e) {
- markClosed(e);
- close();
-
- throw e;
- }
- }
-
protected void closeConnection() {
// close the current connection
if (socket != null) {
@@ -437,16 +502,6 @@ public class HBaseClient {
" time(s).");
}
- /* Write the header for each connection
- * Out is not synchronized because only the first thread does this.
- */
- private void writeHeader() throws IOException {
- out.write(HBaseServer.HEADER.array());
- out.write(HBaseServer.CURRENT_VERSION);
- out.writeInt(header.getSerializedSize());
- header.writeTo(out);
- }
-
/* wait till someone signals us to start reading RPC response or
* it is idle too long, it is marked as to be closed,
* or the client is marked as not running.
@@ -519,6 +574,230 @@ public class HBaseClient {
+ connections.size());
}
+ private synchronized void disposeSasl() {
+ if (saslRpcClient != null) {
+ try {
+ saslRpcClient.dispose();
+ saslRpcClient = null;
+ } catch (IOException ioe) {
+ LOG.error("Error disposing of SASL client", ioe);
+ }
+ }
+ }
+
+ private synchronized boolean shouldAuthenticateOverKrb() throws IOException {
+ UserGroupInformation loginUser = UserGroupInformation.getLoginUser();
+ UserGroupInformation currentUser =
+ UserGroupInformation.getCurrentUser();
+ UserGroupInformation realUser = currentUser.getRealUser();
+ return authMethod == AuthMethod.KERBEROS &&
+ loginUser != null &&
+ //Make sure user logged in using Kerberos either keytab or TGT
+ loginUser.hasKerberosCredentials() &&
+ // relogin only in case it is the login user (e.g. JT)
+ // or superuser (like oozie).
+ (loginUser.equals(currentUser) || loginUser.equals(realUser));
+ }
+
+ private synchronized boolean setupSaslConnection(final InputStream in2,
+ final OutputStream out2) throws IOException {
+ saslRpcClient = new HBaseSaslRpcClient(authMethod, token, serverPrincipal);
+ return saslRpcClient.saslConnect(in2, out2);
+ }
+
+ /**
+ * If multiple clients with the same principal try to connect
+ * to the same server at the same time, the server assumes a
+ * replay attack is in progress. This is a feature of kerberos.
+ * In order to work around this, what is done is that the client
+ * backs off randomly and tries to initiate the connection
+ * again.
+ * The other problem is to do with ticket expiry. To handle that,
+ * a relogin is attempted.
+ */
+ private synchronized void handleSaslConnectionFailure(
+ final int currRetries,
+ final int maxRetries, final Exception ex, final Random rand,
+ final UserGroupInformation user)
+ throws IOException, InterruptedException{
+ user.doAs(new PrivilegedExceptionAction<Object>() {
+ public Object run() throws IOException, InterruptedException {
+ closeConnection();
+ if (shouldAuthenticateOverKrb()) {
+ if (currRetries < maxRetries) {
+ LOG.debug("Exception encountered while connecting to " +
+ "the server : " + ex);
+ //try re-login
+ if (UserGroupInformation.isLoginKeytabBased()) {
+ UserGroupInformation.getLoginUser().reloginFromKeytab();
+ } else {
+ UserGroupInformation.getLoginUser().reloginFromTicketCache();
+ }
+ disposeSasl();
+ //have granularity of milliseconds
+ //we are sleeping with the Connection lock held but since this
+ //connection instance is being used for connecting to the server
+ //in question, it is okay
+ Thread.sleep((rand.nextInt(reloginMaxBackoff) + 1));
+ return null;
+ } else {
+ String msg = "Couldn't setup connection for " +
+ UserGroupInformation.getLoginUser().getUserName() +
+ " to " + serverPrincipal;
+ LOG.warn(msg);
+ throw (IOException) new IOException(msg).initCause(ex);
+ }
+ } else {
+ LOG.warn("Exception encountered while connecting to " +
+ "the server : " + ex);
+ }
+ if (ex instanceof RemoteException)
+ throw (RemoteException)ex;
+ throw new IOException(ex);
+ }
+ });
+ }
+
+ protected synchronized void setupIOstreams()
+ throws IOException, InterruptedException {
+ if (socket != null || shouldCloseConnection.get()) {
+ return;
+ }
+
+ try {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Connecting to "+server);
+ }
+ short numRetries = 0;
+ final short MAX_RETRIES = 5;
+ Random rand = null;
+ while (true) {
+ setupConnection();
+ InputStream inStream = NetUtils.getInputStream(socket);
+ OutputStream outStream = NetUtils.getOutputStream(socket);
+ writeRpcHeader(outStream);
+ if (useSasl) {
+ final InputStream in2 = inStream;
+ final OutputStream out2 = outStream;
+ UserGroupInformation ticket = remoteId.getTicket().getUGI();
+ if (authMethod == AuthMethod.KERBEROS) {;
+ if (ticket != null && ticket.getRealUser() != null) {
+ ticket = ticket.getRealUser();
+ }
+ }
+ boolean continueSasl = false;
+ try {
+ continueSasl =
+ ticket.doAs(new PrivilegedExceptionAction<Boolean>() {
+ @Override
+ public Boolean run() throws IOException {
+ return setupSaslConnection(in2, out2);
+ }
+ });
+ } catch (Exception ex) {
+ if (rand == null) {
+ rand = new Random();
+ }
+ handleSaslConnectionFailure(numRetries++, MAX_RETRIES, ex, rand,
+ ticket);
+ continue;
+ }
+ if (continueSasl) {
+ // Sasl connect is successful. Let's set up Sasl i/o streams.
+ inStream = saslRpcClient.getInputStream(inStream);
+ outStream = saslRpcClient.getOutputStream(outStream);
+ } else {
+ // fall back to simple auth because server told us so.
+ authMethod = AuthMethod.SIMPLE;
+ useSasl = false;
+ }
+ }
+ this.in = new DataInputStream(new BufferedInputStream
+ (new PingInputStream(inStream)));
+ this.out = new DataOutputStream
+ (new BufferedOutputStream(outStream));
+ writeHeader();
+
+ // update last activity time
+ touch();
+
+ // start the receiver thread after the socket connection has been set up
+ start();
+ return;
+ }
+ } catch (IOException e) {
+ markClosed(e);
+ close();
+
+ throw e;
+ }
+ }
+
+ /* Write the RPC header */
+ private void writeRpcHeader(OutputStream outStream) throws IOException {
+ DataOutputStream out = new DataOutputStream(new BufferedOutputStream(outStream));
+ // Write out the header, version and authentication method
+ out.write(HBaseServer.HEADER.array());
+ out.write(HBaseServer.CURRENT_VERSION);
+ authMethod.write(out);
+ out.flush();
+ }
+
+ /**
+ * Write the protocol header for each connection
+ * Out is not synchronized because only the first thread does this.
+ */
+ private void writeHeader() throws IOException {
+ // Write out the ConnectionHeader
+ out.writeInt(header.getSerializedSize());
+ header.writeTo(out);
+ }
+
+ /** Close the connection. */
+ protected synchronized void close() {
+ if (!shouldCloseConnection.get()) {
+ LOG.error("The connection is not in the closed state");
+ return;
+ }
+
+ // release the resources
+ // first thing to do;take the connection out of the connection list
+ synchronized (connections) {
+ if (connections.get(remoteId) == this) {
+ connections.remove(remoteId);
+ }
+ }
+
+ // close the streams and therefore the socket
+ IOUtils.closeStream(out);
+ IOUtils.closeStream(in);
+ disposeSasl();
+
+ // clean up all calls
+ if (closeException == null) {
+ if (!calls.isEmpty()) {
+ LOG.warn(
+ "A connection is closed for no cause and calls are not empty. " +
+ "#Calls: " + calls.size());
+
+ // clean up calls anyway
+ closeException = new IOException("Unexpected closed connection");
+ cleanupCalls();
+ }
+ } else {
+ // log the info
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("closing ipc connection to " + server + ": " +
+ closeException.getMessage(),closeException);
+ }
+
+ // cleanup calls
+ cleanupCalls();
+ }
+ if (LOG.isDebugEnabled())
+ LOG.debug(getName() + ": closed");
+ }
+
/* Initiates a call by sending the parameter to the remote server.
* Note: this is not called from the Connection thread, but by other
* threads.
@@ -575,15 +854,8 @@ public class HBaseClient {
LOG.debug(getName() + " got value #" + id);
Call call = calls.remove(id);
- boolean isError = response.getError();
- if (isError) {
- if (call != null) {
- //noinspection ThrowableInstanceNeverThrown
- call.setException(new RemoteException(
- response.getException().getExceptionName(),
- response.getException().getStackTrace()));
- }
- } else {
+ Status status = response.getStatus();
+ if (status == Status.SUCCESS) {
ByteString responseObj = response.getResponse();
DataInputStream dis =
new DataInputStream(responseObj.newInput());
@@ -594,6 +866,18 @@ public class HBaseClient {
if (call != null) {
call.setValue(value);
}
+ } else if (status == Status.ERROR) {
+ if (call != null) {
+ //noinspection ThrowableInstanceNeverThrown
+ call.setException(new RemoteException(
+ response.getException().getExceptionName(),
+ response.getException().getStackTrace()));
+ }
+ } else if (status == Status.FATAL) {
+ // Close the connection
+ markClosed(new RemoteException(
+ response.getException().getExceptionName(),
+ response.getException().getStackTrace()));
}
} catch (IOException e) {
if (e instanceof SocketTimeoutException && remoteId.rpcTimeout > 0) {
@@ -620,47 +904,6 @@ public class HBaseClient {
}
}
- /** Close the connection. */
- protected synchronized void close() {
- if (!shouldCloseConnection.get()) {
- LOG.error("The connection is not in the closed state");
- return;
- }
-
- // release the resources
- // first thing to do;take the connection out of the connection list
- synchronized (connections) {
- connections.remove(remoteId, this);
- }
-
- // close the streams and therefore the socket
- IOUtils.closeStream(out);
- IOUtils.closeStream(in);
-
- // clean up all calls
- if (closeException == null) {
- if (!calls.isEmpty()) {
- LOG.warn(
- "A connection is closed for no cause and calls are not empty");
-
- // clean up calls anyway
- closeException = new IOException("Unexpected closed connection");
- cleanupCalls();
- }
- } else {
- // log the info
- if (LOG.isDebugEnabled()) {
- LOG.debug("closing ipc connection to " + remoteId.address + ": " +
- closeException.getMessage(),closeException);
- }
-
- // cleanup calls
- cleanupCalls();
- }
- if (LOG.isDebugEnabled())
- LOG.debug(getName() + ": closed");
- }
-
/* Cleanup all calls and mark them as done */
protected void cleanupCalls() {
cleanupCalls(0);
Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java?rev=1337396&r1=1337395&r2=1337396&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java Fri May 11 22:06:57 2012
@@ -20,6 +20,8 @@
package org.apache.hadoop.hbase.ipc;
+import static org.apache.hadoop.fs.CommonConfigurationKeys.HADOOP_SECURITY_AUTHORIZATION;
+
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
@@ -34,6 +36,7 @@ import java.net.SocketException;
import java.net.UnknownHostException;
import java.nio.ByteBuffer;
import java.nio.channels.CancelledKeyException;
+import java.nio.channels.Channels;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.ReadableByteChannel;
import java.nio.channels.SelectionKey;
@@ -41,6 +44,7 @@ import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.nio.channels.WritableByteChannel;
+import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
@@ -56,6 +60,10 @@ import java.util.concurrent.ExecutorServ
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
+import javax.security.sasl.Sasl;
+import javax.security.sasl.SaslException;
+import javax.security.sasl.SaslServer;
+
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
@@ -68,14 +76,34 @@ import org.apache.hadoop.hbase.protobuf.
import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RpcRequest;
import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RpcResponse;
import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RpcException;
+import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RpcResponse.Status;
+import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.UserInformation;
import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler;
import org.apache.hadoop.hbase.monitoring.TaskMonitor;
import org.apache.hadoop.hbase.security.User;
+import org.apache.hadoop.hbase.security.HBaseSaslRpcServer;
+import org.apache.hadoop.hbase.security.HBaseSaslRpcServer.AuthMethod;
+import org.apache.hadoop.hbase.security.HBaseSaslRpcServer.SaslDigestCallbackHandler;
+import org.apache.hadoop.hbase.security.HBaseSaslRpcServer.SaslGssCallbackHandler;
+import org.apache.hadoop.hbase.security.HBaseSaslRpcServer.SaslStatus;
import org.apache.hadoop.hbase.util.ByteBufferOutputStream;
import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.DataOutputBuffer;
+import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableUtils;
+import org.apache.hadoop.ipc.Server;
import org.apache.hadoop.ipc.RPC.VersionMismatch;
+import org.apache.hadoop.security.AccessControlException;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod;
+import org.apache.hadoop.security.authorize.AuthorizationException;
+import org.apache.hadoop.security.authorize.ProxyUsers;
+import org.apache.hadoop.security.authorize.ServiceAuthorizationManager;
+import org.apache.hadoop.security.token.SecretManager;
+import org.apache.hadoop.security.token.TokenIdentifier;
+import org.apache.hadoop.security.token.SecretManager.InvalidToken;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.StringUtils;
@@ -96,7 +124,8 @@ import org.cliffc.high_scale_lib.Counter
*/
@InterfaceAudience.Private
public abstract class HBaseServer implements RpcServer {
-
+ private final boolean authorize;
+ private boolean isSecurityEnabled;
/**
* The first four bytes of Hadoop RPC connections
*/
@@ -130,6 +159,13 @@ public abstract class HBaseServer implem
protected static final Log TRACELOG =
LogFactory.getLog("org.apache.hadoop.ipc.HBaseServer.trace");
+ private static final String AUTH_FAILED_FOR = "Auth failed for ";
+ private static final String AUTH_SUCCESSFUL_FOR = "Auth successful for ";
+ private static final Log AUDITLOG =
+ LogFactory.getLog("SecurityLogger."+Server.class.getName());
+ protected SecretManager<TokenIdentifier> secretManager;
+ protected ServiceAuthorizationManager authManager;
+
protected static final ThreadLocal<RpcServer> SERVER =
new ThreadLocal<RpcServer>();
private volatile boolean started = false;
@@ -303,11 +339,12 @@ public abstract class HBaseServer implem
return param.toString() + " from " + connection.toString();
}
+ protected synchronized void setSaslTokenResponse(ByteBuffer response) {
+ this.response = response;
+ }
+
protected synchronized void setResponse(Object value, Status status,
String errorClass, String error) {
- // Avoid overwriting an error value in the response. This can happen if
- // endDelayThrowing is called by another thread before the actual call
- // returning.
if (this.isError)
return;
if (errorClass != null) {
@@ -328,8 +365,7 @@ public abstract class HBaseServer implem
if (result instanceof WritableWithSize) {
// get the size hint.
WritableWithSize ohint = (WritableWithSize) result;
- long hint = ohint.getWritableSize() + Bytes.SIZEOF_BYTE +
- (2 * Bytes.SIZEOF_INT);
+ long hint = ohint.getWritableSize() + 2*Bytes.SIZEOF_INT;
if (hint > Integer.MAX_VALUE) {
// oops, new problem.
IOException ioe =
@@ -342,12 +378,11 @@ public abstract class HBaseServer implem
}
ByteBufferOutputStream buf = new ByteBufferOutputStream(size);
- DataOutputStream out = new DataOutputStream(buf);
try {
RpcResponse.Builder builder = RpcResponse.newBuilder();
// Call id.
builder.setCallId(this.id);
- builder.setError(error != null);
+ builder.setStatus(status);
if (error != null) {
RpcException.Builder b = RpcException.newBuilder();
b.setExceptionName(errorClass);
@@ -359,8 +394,10 @@ public abstract class HBaseServer implem
byte[] response = d.getData();
builder.setResponse(ByteString.copyFrom(response));
}
- builder.build().writeDelimitedTo(
- DataOutputOutputStream.constructOutputStream(out));
+ builder.build().writeDelimitedTo(buf);
+ if (connection.useWrap) {
+ wrapWithSasl(buf);
+ }
} catch (IOException e) {
LOG.warn("Exception while creating response " + e);
}
@@ -369,6 +406,28 @@ public abstract class HBaseServer implem
this.response = bb;
}
+ private void wrapWithSasl(ByteBufferOutputStream response)
+ throws IOException {
+ if (connection.useSasl) {
+ // getByteBuffer calls flip()
+ ByteBuffer buf = response.getByteBuffer();
+ byte[] token;
+ // synchronization may be needed since there can be multiple Handler
+ // threads using saslServer to wrap responses.
+ synchronized (connection.saslServer) {
+ token = connection.saslServer.wrap(buf.array(),
+ buf.arrayOffset(), buf.remaining());
+ }
+ if (LOG.isDebugEnabled())
+ LOG.debug("Adding saslServer wrapped token of size " + token.length
+ + " as call response.");
+ buf.clear();
+ DataOutputStream saslOut = new DataOutputStream(response);
+ saslOut.writeInt(token.length);
+ saslOut.write(token, 0, token.length);
+ }
+ }
+
@Override
public synchronized void endDelay(Object result) throws IOException {
assert this.delayResponse;
@@ -1056,8 +1115,8 @@ public abstract class HBaseServer implem
}
/** Reads calls from a connection and queues them for handling. */
- protected class Connection {
- private boolean versionRead = false; //if initial signature and
+ public class Connection {
+ private boolean rpcHeaderRead = false; //if initial signature and
//version are read
private boolean headerRead = false; //if the connection header that
//follows version is read.
@@ -1068,6 +1127,7 @@ public abstract class HBaseServer implem
private volatile int rpcCount = 0; // number of outstanding rpcs
private long lastContact;
private int dataLength;
+ private InetAddress addr;
protected Socket socket;
// Cache the remote host & port info so that even if the socket is
// disconnected, we can say where it used to connect to.
@@ -1075,8 +1135,27 @@ public abstract class HBaseServer implem
protected int remotePort;
ConnectionHeader header;
Class<? extends VersionedProtocol> protocol;
- protected User user = null;
+ protected UserGroupInformation user = null;
+ private AuthMethod authMethod;
+ private boolean saslContextEstablished;
+ private boolean skipInitialSaslHandshake;
+ private ByteBuffer rpcHeaderBuffer;
+ private ByteBuffer unwrappedData;
+ private ByteBuffer unwrappedDataLengthBuffer;
+ boolean useSasl;
+ SaslServer saslServer;
+ private boolean useWrap = false;
+ // Fake 'call' for failed authorization response
+ private final int AUTHROIZATION_FAILED_CALLID = -1;
+ private final Call authFailedCall =
+ new Call(AUTHROIZATION_FAILED_CALLID, null, this, null, 0);
+ private ByteArrayOutputStream authFailedResponse =
+ new ByteArrayOutputStream();
+ // Fake 'call' for SASL context setup
+ private static final int SASL_CALLID = -33;
+ private final Call saslCall = new Call(SASL_CALLID, null, this, null, 0);
+ public UserGroupInformation attemptingUser = null; // user name before auth
public Connection(SocketChannel channel, long lastContact) {
this.channel = channel;
this.lastContact = lastContact;
@@ -1110,6 +1189,10 @@ public abstract class HBaseServer implem
return hostAddress;
}
+ public InetAddress getHostInetAddress() {
+ return addr;
+ }
+
public int getRemotePort() {
return remotePort;
}
@@ -1141,39 +1224,218 @@ public abstract class HBaseServer implem
return isIdle() && currentTime - lastContact > maxIdleTime;
}
+ private UserGroupInformation getAuthorizedUgi(String authorizedId)
+ throws IOException {
+ if (authMethod == AuthMethod.DIGEST) {
+ TokenIdentifier tokenId = HBaseSaslRpcServer.getIdentifier(authorizedId,
+ secretManager);
+ UserGroupInformation ugi = tokenId.getUser();
+ if (ugi == null) {
+ throw new AccessControlException(
+ "Can't retrieve username from tokenIdentifier.");
+ }
+ ugi.addTokenIdentifier(tokenId);
+ return ugi;
+ } else {
+ return UserGroupInformation.createRemoteUser(authorizedId);
+ }
+ }
+
+ private void saslReadAndProcess(byte[] saslToken) throws IOException,
+ InterruptedException {
+ if (saslContextEstablished) {
+ if (LOG.isDebugEnabled())
+ LOG.debug("Have read input token of size " + saslToken.length
+ + " for processing by saslServer.unwrap()");
+
+ if (!useWrap) {
+ processOneRpc(saslToken);
+ } else {
+ byte[] plaintextData = saslServer.unwrap(saslToken, 0,
+ saslToken.length);
+ processUnwrappedData(plaintextData);
+ }
+ } else {
+ byte[] replyToken = null;
+ try {
+ if (saslServer == null) {
+ switch (authMethod) {
+ case DIGEST:
+ if (secretManager == null) {
+ throw new AccessControlException(
+ "Server is not configured to do DIGEST authentication.");
+ }
+ saslServer = Sasl.createSaslServer(AuthMethod.DIGEST
+ .getMechanismName(), null, HBaseSaslRpcServer.SASL_DEFAULT_REALM,
+ HBaseSaslRpcServer.SASL_PROPS, new SaslDigestCallbackHandler(
+ secretManager, this));
+ break;
+ default:
+ UserGroupInformation current = UserGroupInformation
+ .getCurrentUser();
+ String fullName = current.getUserName();
+ if (LOG.isDebugEnabled())
+ LOG.debug("Kerberos principal name is " + fullName);
+ final String names[] = HBaseSaslRpcServer.splitKerberosName(fullName);
+ if (names.length != 3) {
+ throw new AccessControlException(
+ "Kerberos principal name does NOT have the expected "
+ + "hostname part: " + fullName);
+ }
+ current.doAs(new PrivilegedExceptionAction<Object>() {
+ @Override
+ public Object run() throws SaslException {
+ saslServer = Sasl.createSaslServer(AuthMethod.KERBEROS
+ .getMechanismName(), names[0], names[1],
+ HBaseSaslRpcServer.SASL_PROPS, new SaslGssCallbackHandler());
+ return null;
+ }
+ });
+ }
+ if (saslServer == null)
+ throw new AccessControlException(
+ "Unable to find SASL server implementation for "
+ + authMethod.getMechanismName());
+ if (LOG.isDebugEnabled())
+ LOG.debug("Created SASL server with mechanism = "
+ + authMethod.getMechanismName());
+ }
+ if (LOG.isDebugEnabled())
+ LOG.debug("Have read input token of size " + saslToken.length
+ + " for processing by saslServer.evaluateResponse()");
+ replyToken = saslServer.evaluateResponse(saslToken);
+ } catch (IOException e) {
+ IOException sendToClient = e;
+ Throwable cause = e;
+ while (cause != null) {
+ if (cause instanceof InvalidToken) {
+ sendToClient = (InvalidToken) cause;
+ break;
+ }
+ cause = cause.getCause();
+ }
+ doRawSaslReply(SaslStatus.ERROR, null, sendToClient.getClass().getName(),
+ sendToClient.getLocalizedMessage());
+ rpcMetrics.authenticationFailures.inc();
+ String clientIP = this.toString();
+ // attempting user could be null
+ AUDITLOG.warn(AUTH_FAILED_FOR + clientIP + ":" + attemptingUser);
+ throw e;
+ }
+ if (replyToken != null) {
+ if (LOG.isDebugEnabled())
+ LOG.debug("Will send token of size " + replyToken.length
+ + " from saslServer.");
+ doRawSaslReply(SaslStatus.SUCCESS, new BytesWritable(replyToken), null,
+ null);
+ }
+ if (saslServer.isComplete()) {
+ String qop = (String) saslServer.getNegotiatedProperty(Sasl.QOP);
+ useWrap = qop != null && !"auth".equalsIgnoreCase(qop);
+ user = getAuthorizedUgi(saslServer.getAuthorizationID());
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("SASL server context established. Authenticated client: "
+ + user + ". Negotiated QoP is "
+ + saslServer.getNegotiatedProperty(Sasl.QOP));
+ }
+ rpcMetrics.authenticationSuccesses.inc();
+ AUDITLOG.trace(AUTH_SUCCESSFUL_FOR + user);
+ saslContextEstablished = true;
+ }
+ }
+ }
+ /**
+ * No protobuf encoding of raw sasl messages
+ */
+ private void doRawSaslReply(SaslStatus status, Writable rv,
+ String errorClass, String error) throws IOException {
+ //In my testing, have noticed that sasl messages are usually
+ //in the ballpark of 100-200. That's why the initialcapacity is 256.
+ ByteBufferOutputStream saslResponse = new ByteBufferOutputStream(256);
+ DataOutputStream out = new DataOutputStream(saslResponse);
+ out.writeInt(status.state); // write status
+ if (status == SaslStatus.SUCCESS) {
+ rv.write(out);
+ } else {
+ WritableUtils.writeString(out, errorClass);
+ WritableUtils.writeString(out, error);
+ }
+ saslCall.setSaslTokenResponse(saslResponse.getByteBuffer());
+ saslCall.responder = responder;
+ saslCall.sendResponseIfReady();
+ }
+
+ private void disposeSasl() {
+ if (saslServer != null) {
+ try {
+ saslServer.dispose();
+ saslServer = null;
+ } catch (SaslException ignored) {
+ }
+ }
+ }
+
public int readAndProcess() throws IOException, InterruptedException {
while (true) {
/* Read at most one RPC. If the header is not read completely yet
* then iterate until we read first RPC or until there is no data left.
*/
- int count;
+ int count = -1;
if (dataLengthBuffer.remaining() > 0) {
count = channelRead(channel, dataLengthBuffer);
if (count < 0 || dataLengthBuffer.remaining() > 0)
return count;
}
- if (!versionRead) {
+ if (!rpcHeaderRead) {
//Every connection is expected to send the header.
- ByteBuffer versionBuffer = ByteBuffer.allocate(1);
- count = channelRead(channel, versionBuffer);
- if (count <= 0) {
+ if (rpcHeaderBuffer == null) {
+ rpcHeaderBuffer = ByteBuffer.allocate(2);
+ }
+ count = channelRead(channel, rpcHeaderBuffer);
+ if (count < 0 || rpcHeaderBuffer.remaining() > 0) {
return count;
}
- int version = versionBuffer.get(0);
-
+ int version = rpcHeaderBuffer.get(0);
+ byte[] method = new byte[] {rpcHeaderBuffer.get(1)};
+ authMethod = AuthMethod.read(new DataInputStream(
+ new ByteArrayInputStream(method)));
dataLengthBuffer.flip();
if (!HEADER.equals(dataLengthBuffer) || version != CURRENT_VERSION) {
- //Warning is ok since this is not supposed to happen.
- LOG.warn("Incorrect header or version mismatch from " +
- hostAddress + ":" + remotePort +
- " got version " + version +
- " expected version " + CURRENT_VERSION);
+ LOG.warn("Incorrect header or version mismatch from " +
+ hostAddress + ":" + remotePort +
+ " got version " + version +
+ " expected version " + CURRENT_VERSION);
setupBadVersionResponse(version);
return -1;
}
dataLengthBuffer.clear();
- versionRead = true;
+ if (authMethod == null) {
+ throw new IOException("Unable to read authentication method");
+ }
+ if (isSecurityEnabled && authMethod == AuthMethod.SIMPLE) {
+ AccessControlException ae = new AccessControlException(
+ "Authentication is required");
+ setupResponse(authFailedResponse, authFailedCall, Status.FATAL,
+ null, ae.getClass().getName(), ae.getMessage());
+ responder.doRespond(authFailedCall);
+ throw ae;
+ }
+ if (!isSecurityEnabled && authMethod != AuthMethod.SIMPLE) {
+ doRawSaslReply(SaslStatus.SUCCESS, new IntWritable(
+ HBaseSaslRpcServer.SWITCH_TO_SIMPLE_AUTH), null, null);
+ authMethod = AuthMethod.SIMPLE;
+ // client has already sent the initial Sasl message and we
+ // should ignore it. Both client and server should fall back
+ // to simple auth from now on.
+ skipInitialSaslHandshake = true;
+ }
+ if (authMethod != AuthMethod.SIMPLE) {
+ useSasl = true;
+ }
+
+ rpcHeaderBuffer = null;
+ rpcHeaderRead = true;
continue;
}
@@ -1182,8 +1444,14 @@ public abstract class HBaseServer implem
dataLength = dataLengthBuffer.getInt();
if (dataLength == HBaseClient.PING_CALL_ID) {
- dataLengthBuffer.clear();
- return 0; //ping message
+ if(!useWrap) { //covers the !useSasl too
+ dataLengthBuffer.clear();
+ return 0; //ping message
+ }
+ }
+ if (dataLength < 0) {
+ throw new IllegalArgumentException("Unexpected data length "
+ + dataLength + "!! from " + getHostAddress());
}
data = ByteBuffer.allocate(dataLength);
incRpcCount(); // Increment the rpc count
@@ -1194,15 +1462,21 @@ public abstract class HBaseServer implem
if (data.remaining() == 0) {
dataLengthBuffer.clear();
data.flip();
- if (headerRead) {
- processData(data.array());
+ if (skipInitialSaslHandshake) {
data = null;
- return count;
+ skipInitialSaslHandshake = false;
+ continue;
+ }
+ boolean isHeaderRead = headerRead;
+ if (useSasl) {
+ saslReadAndProcess(data.array());
+ } else {
+ processOneRpc(data.array());
}
- processHeader();
- headerRead = true;
data = null;
- continue;
+ if (!isHeaderRead) {
+ continue;
+ }
}
return count;
}
@@ -1238,16 +1512,104 @@ public abstract class HBaseServer implem
}
/// Reads the connection header following version
- private void processHeader() throws IOException {
- header = ConnectionHeader.parseFrom(new ByteArrayInputStream(data.array()));
+ private void processHeader(byte[] buf) throws IOException {
+ DataInputStream in =
+ new DataInputStream(new ByteArrayInputStream(buf));
+ header = ConnectionHeader.parseFrom(in);
try {
String protocolClassName = header.getProtocol();
- protocol = getProtocolClass(protocolClassName, conf);
+ if (protocolClassName != null) {
+ protocol = getProtocolClass(header.getProtocol(), conf);
+ }
} catch (ClassNotFoundException cnfe) {
throw new IOException("Unknown protocol: " + header.getProtocol());
}
- user = User.createUser(header);
+ UserGroupInformation protocolUser = createUser(header);
+ if (!useSasl) {
+ user = protocolUser;
+ if (user != null) {
+ user.setAuthenticationMethod(AuthMethod.SIMPLE.authenticationMethod);
+ }
+ } else {
+ // user is authenticated
+ user.setAuthenticationMethod(authMethod.authenticationMethod);
+ //Now we check if this is a proxy user case. If the protocol user is
+ //different from the 'user', it is a proxy user scenario. However,
+ //this is not allowed if user authenticated with DIGEST.
+ if ((protocolUser != null)
+ && (!protocolUser.getUserName().equals(user.getUserName()))) {
+ if (authMethod == AuthMethod.DIGEST) {
+ // Not allowed to doAs if token authentication is used
+ throw new AccessControlException("Authenticated user (" + user
+ + ") doesn't match what the client claims to be ("
+ + protocolUser + ")");
+ } else {
+ // Effective user can be different from authenticated user
+ // for simple auth or kerberos auth
+ // The user is the real user. Now we create a proxy user
+ UserGroupInformation realUser = user;
+ user = UserGroupInformation.createProxyUser(protocolUser
+ .getUserName(), realUser);
+ // Now the user is a proxy user, set Authentication method Proxy.
+ user.setAuthenticationMethod(AuthenticationMethod.PROXY);
+ }
+ }
+ }
+ }
+
+ private void processUnwrappedData(byte[] inBuf) throws IOException,
+ InterruptedException {
+ ReadableByteChannel ch = Channels.newChannel(new ByteArrayInputStream(
+ inBuf));
+ // Read all RPCs contained in the inBuf, even partial ones
+ while (true) {
+ int count = -1;
+ if (unwrappedDataLengthBuffer.remaining() > 0) {
+ count = channelRead(ch, unwrappedDataLengthBuffer);
+ if (count <= 0 || unwrappedDataLengthBuffer.remaining() > 0)
+ return;
+ }
+
+ if (unwrappedData == null) {
+ unwrappedDataLengthBuffer.flip();
+ int unwrappedDataLength = unwrappedDataLengthBuffer.getInt();
+
+ if (unwrappedDataLength == HBaseClient.PING_CALL_ID) {
+ if (LOG.isDebugEnabled())
+ LOG.debug("Received ping message");
+ unwrappedDataLengthBuffer.clear();
+ continue; // ping message
+ }
+ unwrappedData = ByteBuffer.allocate(unwrappedDataLength);
+ }
+
+ count = channelRead(ch, unwrappedData);
+ if (count <= 0 || unwrappedData.remaining() > 0)
+ return;
+
+ if (unwrappedData.remaining() == 0) {
+ unwrappedDataLengthBuffer.clear();
+ unwrappedData.flip();
+ processOneRpc(unwrappedData.array());
+ unwrappedData = null;
+ }
+ }
+ }
+
+ private void processOneRpc(byte[] buf) throws IOException,
+ InterruptedException {
+ if (headerRead) {
+ processData(buf);
+ } else {
+ processHeader(buf);
+ headerRead = true;
+ if (!authorizeConnection()) {
+ throw new AccessControlException("Connection from " + this
+ + " for protocol " + header.getProtocol()
+ + " is unauthorized for user " + user);
+ }
+ }
}
protected void processData(byte[] buf) throws IOException, InterruptedException {
@@ -1303,7 +1665,34 @@ public abstract class HBaseServer implem
}
}
+ private boolean authorizeConnection() throws IOException {
+ try {
+ // If auth method is DIGEST, the token was obtained by the
+ // real user for the effective user, therefore not required to
+ // authorize real user. doAs is allowed only for simple or kerberos
+ // authentication
+ if (user != null && user.getRealUser() != null
+ && (authMethod != AuthMethod.DIGEST)) {
+ ProxyUsers.authorize(user, this.getHostAddress(), conf);
+ }
+ authorize(user, header, getHostInetAddress());
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Successfully authorized " + header);
+ }
+ rpcMetrics.authorizationSuccesses.inc();
+ } catch (AuthorizationException ae) {
+ LOG.debug("Connection authorization failed: "+ae.getMessage(), ae);
+ rpcMetrics.authorizationFailures.inc();
+ setupResponse(authFailedResponse, authFailedCall, Status.FATAL, null,
+ ae.getClass().getName(), ae.getMessage());
+ responder.doRespond(authFailedCall);
+ return false;
+ }
+ return true;
+ }
+
protected synchronized void close() {
+ disposeSasl();
data = null;
dataLengthBuffer = null;
if (!channel.isOpen())
@@ -1314,6 +1703,33 @@ public abstract class HBaseServer implem
}
try {socket.close();} catch(Exception ignored) {}
}
+
+ private UserGroupInformation createUser(ConnectionHeader head) {
+ UserGroupInformation ugi = null;
+
+ if (!head.hasUserInfo()) {
+ return null;
+ }
+ UserInformation userInfoProto = head.getUserInfo();
+ String effectiveUser = null;
+ if (userInfoProto.hasEffectiveUser()) {
+ effectiveUser = userInfoProto.getEffectiveUser();
+ }
+ String realUser = null;
+ if (userInfoProto.hasRealUser()) {
+ realUser = userInfoProto.getRealUser();
+ }
+ if (effectiveUser != null) {
+ if (realUser != null) {
+ UserGroupInformation realUserUgi =
+ UserGroupInformation.createRemoteUser(realUser);
+ ugi = UserGroupInformation.createProxyUser(effectiveUser, realUserUgi);
+ } else {
+ ugi = UserGroupInformation.createRemoteUser(effectiveUser);
+ }
+ }
+ return ugi;
+ }
}
/**
@@ -1377,15 +1793,16 @@ public abstract class HBaseServer implem
throw new ServerNotRunningYetException("Server is not running yet");
if (LOG.isDebugEnabled()) {
- User remoteUser = call.connection.user;
+ UserGroupInformation remoteUser = call.connection.user;
LOG.debug(getName() + ": call #" + call.id + " executing as "
- + (remoteUser == null ? "NULL principal" : remoteUser.getName()));
+ + (remoteUser == null ? "NULL principal" :
+ remoteUser.getUserName()));
}
- RequestContext.set(call.connection.user, getRemoteIp(),
+ RequestContext.set(User.create(call.connection.user), getRemoteIp(),
call.connection.protocol);
// make the call
- value = call(call.connection.protocol, call.param, call.timestamp,
+ value = call(call.connection.protocol, call.param, call.timestamp,
status);
} catch (Throwable e) {
LOG.debug(getName()+", call "+call+": error: " + e, e);
@@ -1517,6 +1934,12 @@ public abstract class HBaseServer implem
// Create the responder here
responder = new Responder();
+ this.authorize =
+ conf.getBoolean(HADOOP_SECURITY_AUTHORIZATION, false);
+ this.isSecurityEnabled = UserGroupInformation.isSecurityEnabled();
+ if (isSecurityEnabled) {
+ HBaseSaslRpcServer.init(conf);
+ }
}
/**
@@ -1572,6 +1995,10 @@ public abstract class HBaseServer implem
rpcMetrics.numOpenConnections.set(numConnections);
}
+ Configuration getConf() {
+ return conf;
+ }
+
/** Sets the socket buffer size used for responding to RPCs.
* @param size send size
*/
@@ -1617,6 +2044,14 @@ public abstract class HBaseServer implem
}
}
+ public SecretManager<? extends TokenIdentifier> getSecretManager() {
+ return this.secretManager;
+ }
+
+ public void setSecretManager(SecretManager<? extends TokenIdentifier> secretManager) {
+ this.secretManager = (SecretManager<TokenIdentifier>) secretManager;
+ }
+
/** Stops the service. No new calls will be handled after this is called. */
@Override
public synchronized void stop() {
@@ -1683,6 +2118,31 @@ public abstract class HBaseServer implem
}
/**
+ * Authorize the incoming client connection.
+ *
+ * @param user client user
+ * @param connection incoming connection
+ * @param addr InetAddress of incoming connection
+ * @throws org.apache.hadoop.security.authorize.AuthorizationException when the client isn't authorized to talk the protocol
+ */
+ public void authorize(UserGroupInformation user,
+ ConnectionHeader connection,
+ InetAddress addr
+ ) throws AuthorizationException {
+ if (authorize) {
+ Class<?> protocol = null;
+ try {
+ protocol = getProtocolClass(connection.getProtocol(), getConf());
+ } catch (ClassNotFoundException cfne) {
+ throw new AuthorizationException("Unknown protocol: " +
+ connection.getProtocol());
+ }
+ authManager.authorize(user != null ? user : null,
+ protocol, getConf(), addr);
+ }
+ }
+
+ /**
* When the read or write buffer size is larger than this limit, i/o will be
* done in chunks of this size. Most RPC requests and responses would be
* be smaller.
Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/RegionServerStatusProtocol.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/RegionServerStatusProtocol.java?rev=1337396&r1=1337395&r2=1337396&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/RegionServerStatusProtocol.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/RegionServerStatusProtocol.java Fri May 11 22:06:57 2012
@@ -23,7 +23,7 @@ import org.apache.hadoop.classification.
import org.apache.hadoop.hbase.ipc.VersionedProtocol;
import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionServerStatusService;
import org.apache.hadoop.hbase.security.TokenInfo;
-import org.apache.hadoop.security.KerberosInfo;
+import org.apache.hadoop.hbase.security.KerberosInfo;
/**
* Protocol that a RegionServer uses to communicate its status to the Master.
Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/WritableRpcEngine.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/WritableRpcEngine.java?rev=1337396&r1=1337395&r2=1337396&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/WritableRpcEngine.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/WritableRpcEngine.java Fri May 11 22:06:57 2012
@@ -50,8 +50,11 @@ import org.apache.hadoop.hbase.util.Obje
import org.apache.hadoop.io.*;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.RemoteException;
+import org.apache.hadoop.security.authorize.ServiceAuthorizationManager;
+import org.apache.hadoop.hbase.security.HBasePolicyProvider;
import org.apache.hadoop.hbase.ipc.VersionedProtocol;
import org.apache.hadoop.hbase.security.User;
+import org.apache.hadoop.hbase.security.token.AuthenticationTokenSecretManager;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.*;
@@ -252,9 +255,6 @@ class WritableRpcEngine implements RpcEn
private Class<?>[] ifaces;
private boolean verbose;
- // for JSON encoding
- private static ObjectMapper mapper = new ObjectMapper();
-
private static final String WARN_RESPONSE_TIME =
"hbase.ipc.warn.response.time";
private static final String WARN_RESPONSE_SIZE =
@@ -310,6 +310,36 @@ class WritableRpcEngine implements RpcEn
DEFAULT_WARN_RESPONSE_SIZE);
}
+ public AuthenticationTokenSecretManager createSecretManager(){
+ if (!User.isSecurityEnabled() ||
+ !(instance instanceof org.apache.hadoop.hbase.Server)) {
+ return null;
+ }
+ org.apache.hadoop.hbase.Server server =
+ (org.apache.hadoop.hbase.Server)instance;
+ Configuration conf = server.getConfiguration();
+ long keyUpdateInterval =
+ conf.getLong("hbase.auth.key.update.interval", 24*60*60*1000);
+ long maxAge =
+ conf.getLong("hbase.auth.token.max.lifetime", 7*24*60*60*1000);
+ return new AuthenticationTokenSecretManager(conf, server.getZooKeeper(),
+ server.getServerName().toString(), keyUpdateInterval, maxAge);
+ }
+
+ @Override
+ public void startThreads() {
+ AuthenticationTokenSecretManager mgr = createSecretManager();
+ if (mgr != null) {
+ setSecretManager(mgr);
+ mgr.start();
+ }
+ this.authManager = new ServiceAuthorizationManager();
+ HBasePolicyProvider.init(conf, authManager);
+
+ // continue with base startup
+ super.startThreads();
+ }
+
@Override
public Writable call(Class<? extends VersionedProtocol> protocol,
Writable param, long receivedTime, MonitoredRPCHandler status)
Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/protobuf/generated/RPCProtos.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/protobuf/generated/RPCProtos.java?rev=1337396&r1=1337395&r2=1337396&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/protobuf/generated/RPCProtos.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/protobuf/generated/RPCProtos.java Fri May 11 22:06:57 2012
@@ -15,7 +15,7 @@ public final class RPCProtos {
boolean hasEffectiveUser();
String getEffectiveUser();
- // required string realUser = 2;
+ // optional string realUser = 2;
boolean hasRealUser();
String getRealUser();
}
@@ -80,7 +80,7 @@ public final class RPCProtos {
}
}
- // required string realUser = 2;
+ // optional string realUser = 2;
public static final int REALUSER_FIELD_NUMBER = 2;
private java.lang.Object realUser_;
public boolean hasRealUser() {
@@ -125,10 +125,6 @@ public final class RPCProtos {
memoizedIsInitialized = 0;
return false;
}
- if (!hasRealUser()) {
- memoizedIsInitialized = 0;
- return false;
- }
memoizedIsInitialized = 1;
return true;
}
@@ -406,10 +402,6 @@ public final class RPCProtos {
return false;
}
- if (!hasRealUser()) {
-
- return false;
- }
return true;
}
@@ -488,7 +480,7 @@ public final class RPCProtos {
onChanged();
}
- // required string realUser = 2;
+ // optional string realUser = 2;
private java.lang.Object realUser_ = "";
public boolean hasRealUser() {
return ((bitField0_ & 0x00000002) == 0x00000002);
@@ -2081,9 +2073,9 @@ public final class RPCProtos {
boolean hasCallId();
int getCallId();
- // required bool error = 2;
- boolean hasError();
- boolean getError();
+ // required .RpcResponse.Status status = 2;
+ boolean hasStatus();
+ org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RpcResponse.Status getStatus();
// optional bytes response = 3;
boolean hasResponse();
@@ -2122,6 +2114,78 @@ public final class RPCProtos {
return org.apache.hadoop.hbase.protobuf.generated.RPCProtos.internal_static_RpcResponse_fieldAccessorTable;
}
+ public enum Status
+ implements com.google.protobuf.ProtocolMessageEnum {
+ SUCCESS(0, 0),
+ ERROR(1, 1),
+ FATAL(2, 2),
+ ;
+
+ public static final int SUCCESS_VALUE = 0;
+ public static final int ERROR_VALUE = 1;
+ public static final int FATAL_VALUE = 2;
+
+
+ public final int getNumber() { return value; }
+
+ public static Status valueOf(int value) {
+ switch (value) {
+ case 0: return SUCCESS;
+ case 1: return ERROR;
+ case 2: return FATAL;
+ default: return null;
+ }
+ }
+
+ public static com.google.protobuf.Internal.EnumLiteMap<Status>
+ internalGetValueMap() {
+ return internalValueMap;
+ }
+ private static com.google.protobuf.Internal.EnumLiteMap<Status>
+ internalValueMap =
+ new com.google.protobuf.Internal.EnumLiteMap<Status>() {
+ public Status findValueByNumber(int number) {
+ return Status.valueOf(number);
+ }
+ };
+
+ public final com.google.protobuf.Descriptors.EnumValueDescriptor
+ getValueDescriptor() {
+ return getDescriptor().getValues().get(index);
+ }
+ public final com.google.protobuf.Descriptors.EnumDescriptor
+ getDescriptorForType() {
+ return getDescriptor();
+ }
+ public static final com.google.protobuf.Descriptors.EnumDescriptor
+ getDescriptor() {
+ return org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RpcResponse.getDescriptor().getEnumTypes().get(0);
+ }
+
+ private static final Status[] VALUES = {
+ SUCCESS, ERROR, FATAL,
+ };
+
+ public static Status valueOf(
+ com.google.protobuf.Descriptors.EnumValueDescriptor desc) {
+ if (desc.getType() != getDescriptor()) {
+ throw new java.lang.IllegalArgumentException(
+ "EnumValueDescriptor is not for this type.");
+ }
+ return VALUES[desc.getIndex()];
+ }
+
+ private final int index;
+ private final int value;
+
+ private Status(int index, int value) {
+ this.index = index;
+ this.value = value;
+ }
+
+ // @@protoc_insertion_point(enum_scope:RpcResponse.Status)
+ }
+
private int bitField0_;
// required int32 callId = 1;
public static final int CALLID_FIELD_NUMBER = 1;
@@ -2133,14 +2197,14 @@ public final class RPCProtos {
return callId_;
}
- // required bool error = 2;
- public static final int ERROR_FIELD_NUMBER = 2;
- private boolean error_;
- public boolean hasError() {
+ // required .RpcResponse.Status status = 2;
+ public static final int STATUS_FIELD_NUMBER = 2;
+ private org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RpcResponse.Status status_;
+ public boolean hasStatus() {
return ((bitField0_ & 0x00000002) == 0x00000002);
}
- public boolean getError() {
- return error_;
+ public org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RpcResponse.Status getStatus() {
+ return status_;
}
// optional bytes response = 3;
@@ -2168,7 +2232,7 @@ public final class RPCProtos {
private void initFields() {
callId_ = 0;
- error_ = false;
+ status_ = org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RpcResponse.Status.SUCCESS;
response_ = com.google.protobuf.ByteString.EMPTY;
exception_ = org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RpcException.getDefaultInstance();
}
@@ -2181,7 +2245,7 @@ public final class RPCProtos {
memoizedIsInitialized = 0;
return false;
}
- if (!hasError()) {
+ if (!hasStatus()) {
memoizedIsInitialized = 0;
return false;
}
@@ -2202,7 +2266,7 @@ public final class RPCProtos {
output.writeInt32(1, callId_);
}
if (((bitField0_ & 0x00000002) == 0x00000002)) {
- output.writeBool(2, error_);
+ output.writeEnum(2, status_.getNumber());
}
if (((bitField0_ & 0x00000004) == 0x00000004)) {
output.writeBytes(3, response_);
@@ -2225,7 +2289,7 @@ public final class RPCProtos {
}
if (((bitField0_ & 0x00000002) == 0x00000002)) {
size += com.google.protobuf.CodedOutputStream
- .computeBoolSize(2, error_);
+ .computeEnumSize(2, status_.getNumber());
}
if (((bitField0_ & 0x00000004) == 0x00000004)) {
size += com.google.protobuf.CodedOutputStream
@@ -2263,10 +2327,10 @@ public final class RPCProtos {
result = result && (getCallId()
== other.getCallId());
}
- result = result && (hasError() == other.hasError());
- if (hasError()) {
- result = result && (getError()
- == other.getError());
+ result = result && (hasStatus() == other.hasStatus());
+ if (hasStatus()) {
+ result = result &&
+ (getStatus() == other.getStatus());
}
result = result && (hasResponse() == other.hasResponse());
if (hasResponse()) {
@@ -2291,9 +2355,9 @@ public final class RPCProtos {
hash = (37 * hash) + CALLID_FIELD_NUMBER;
hash = (53 * hash) + getCallId();
}
- if (hasError()) {
- hash = (37 * hash) + ERROR_FIELD_NUMBER;
- hash = (53 * hash) + hashBoolean(getError());
+ if (hasStatus()) {
+ hash = (37 * hash) + STATUS_FIELD_NUMBER;
+ hash = (53 * hash) + hashEnum(getStatus());
}
if (hasResponse()) {
hash = (37 * hash) + RESPONSE_FIELD_NUMBER;
@@ -2422,7 +2486,7 @@ public final class RPCProtos {
super.clear();
callId_ = 0;
bitField0_ = (bitField0_ & ~0x00000001);
- error_ = false;
+ status_ = org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RpcResponse.Status.SUCCESS;
bitField0_ = (bitField0_ & ~0x00000002);
response_ = com.google.protobuf.ByteString.EMPTY;
bitField0_ = (bitField0_ & ~0x00000004);
@@ -2477,7 +2541,7 @@ public final class RPCProtos {
if (((from_bitField0_ & 0x00000002) == 0x00000002)) {
to_bitField0_ |= 0x00000002;
}
- result.error_ = error_;
+ result.status_ = status_;
if (((from_bitField0_ & 0x00000004) == 0x00000004)) {
to_bitField0_ |= 0x00000004;
}
@@ -2509,8 +2573,8 @@ public final class RPCProtos {
if (other.hasCallId()) {
setCallId(other.getCallId());
}
- if (other.hasError()) {
- setError(other.getError());
+ if (other.hasStatus()) {
+ setStatus(other.getStatus());
}
if (other.hasResponse()) {
setResponse(other.getResponse());
@@ -2527,7 +2591,7 @@ public final class RPCProtos {
return false;
}
- if (!hasError()) {
+ if (!hasStatus()) {
return false;
}
@@ -2569,8 +2633,14 @@ public final class RPCProtos {
break;
}
case 16: {
- bitField0_ |= 0x00000002;
- error_ = input.readBool();
+ int rawValue = input.readEnum();
+ org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RpcResponse.Status value = org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RpcResponse.Status.valueOf(rawValue);
+ if (value == null) {
+ unknownFields.mergeVarintField(2, rawValue);
+ } else {
+ bitField0_ |= 0x00000002;
+ status_ = value;
+ }
break;
}
case 26: {
@@ -2614,23 +2684,26 @@ public final class RPCProtos {
return this;
}
- // required bool error = 2;
- private boolean error_ ;
- public boolean hasError() {
+ // required .RpcResponse.Status status = 2;
+ private org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RpcResponse.Status status_ = org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RpcResponse.Status.SUCCESS;
+ public boolean hasStatus() {
return ((bitField0_ & 0x00000002) == 0x00000002);
}
- public boolean getError() {
- return error_;
+ public org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RpcResponse.Status getStatus() {
+ return status_;
}
- public Builder setError(boolean value) {
+ public Builder setStatus(org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RpcResponse.Status value) {
+ if (value == null) {
+ throw new NullPointerException();
+ }
bitField0_ |= 0x00000002;
- error_ = value;
+ status_ = value;
onChanged();
return this;
}
- public Builder clearError() {
+ public Builder clearStatus() {
bitField0_ = (bitField0_ & ~0x00000002);
- error_ = false;
+ status_ = org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RpcResponse.Status.SUCCESS;
onChanged();
return this;
}
@@ -2795,17 +2868,19 @@ public final class RPCProtos {
static {
java.lang.String[] descriptorData = {
"\n\tRPC.proto\":\n\017UserInformation\022\025\n\reffect" +
- "iveUser\030\001 \002(\t\022\020\n\010realUser\030\002 \002(\t\"w\n\020Conne" +
+ "iveUser\030\001 \002(\t\022\020\n\010realUser\030\002 \001(\t\"w\n\020Conne" +
"ctionHeader\022\"\n\010userInfo\030\001 \001(\0132\020.UserInfo" +
"rmation\022?\n\010protocol\030\002 \001(\t:-org.apache.ha" +
"doop.hbase.client.ClientProtocol\"-\n\nRpcR" +
"equest\022\016\n\006callId\030\001 \002(\005\022\017\n\007request\030\002 \001(\014\"" +
"9\n\014RpcException\022\025\n\rexceptionName\030\001 \002(\t\022\022" +
- "\n\nstackTrace\030\002 \001(\t\"`\n\013RpcResponse\022\016\n\006cal" +
- "lId\030\001 \002(\005\022\r\n\005error\030\002 \002(\010\022\020\n\010response\030\003 \001" +
- "(\014\022 \n\texception\030\004 \001(\0132\r.RpcExceptionB<\n*",
- "org.apache.hadoop.hbase.protobuf.generat" +
- "edB\tRPCProtosH\001\240\001\001"
+ "\n\nstackTrace\030\002 \001(\t\"\243\001\n\013RpcResponse\022\016\n\006ca" +
+ "llId\030\001 \002(\005\022#\n\006status\030\002 \002(\0162\023.RpcResponse" +
+ ".Status\022\020\n\010response\030\003 \001(\014\022 \n\texception\030\004",
+ " \001(\0132\r.RpcException\"+\n\006Status\022\013\n\007SUCCESS" +
+ "\020\000\022\t\n\005ERROR\020\001\022\t\n\005FATAL\020\002B<\n*org.apache.h" +
+ "adoop.hbase.protobuf.generatedB\tRPCProto" +
+ "sH\001\240\001\001"
};
com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {
@@ -2849,7 +2924,7 @@ public final class RPCProtos {
internal_static_RpcResponse_fieldAccessorTable = new
com.google.protobuf.GeneratedMessage.FieldAccessorTable(
internal_static_RpcResponse_descriptor,
- new java.lang.String[] { "CallId", "Error", "Response", "Exception", },
+ new java.lang.String[] { "CallId", "Status", "Response", "Exception", },
org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RpcResponse.class,
org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RpcResponse.Builder.class);
return null;
Added: hbase/trunk/src/main/java/org/apache/hadoop/hbase/security/AccessDeniedException.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/security/AccessDeniedException.java?rev=1337396&view=auto
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/security/AccessDeniedException.java (added)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/security/AccessDeniedException.java Fri May 11 22:06:57 2012
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.security;
+
+import org.apache.hadoop.hbase.DoNotRetryIOException;
+
+/**
+ * Exception thrown by access-related methods.
+ */
+public class AccessDeniedException extends DoNotRetryIOException {
+ private static final long serialVersionUID = 1913879564363001780L;
+
+ public AccessDeniedException() {
+ super();
+ }
+
+ public AccessDeniedException(Class<?> clazz, String s) {
+ super( "AccessDenied [" + clazz.getName() + "]: " + s);
+ }
+
+ public AccessDeniedException(String s) {
+ super(s);
+ }
+}
Added: hbase/trunk/src/main/java/org/apache/hadoop/hbase/security/HBasePolicyProvider.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/security/HBasePolicyProvider.java?rev=1337396&view=auto
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/security/HBasePolicyProvider.java (added)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/security/HBasePolicyProvider.java Fri May 11 22:06:57 2012
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.security;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.client.AdminProtocol;
+import org.apache.hadoop.hbase.client.ClientProtocol;
+import org.apache.hadoop.hbase.ipc.HMasterInterface;
+import org.apache.hadoop.hbase.ipc.RegionServerStatusProtocol;
+import org.apache.hadoop.security.authorize.PolicyProvider;
+import org.apache.hadoop.security.authorize.Service;
+import org.apache.hadoop.security.authorize.ServiceAuthorizationManager;
+
+/**
+ * Implementation of secure Hadoop policy provider for mapping
+ * protocol interfaces to hbase-policy.xml entries.
+ */
+public class HBasePolicyProvider extends PolicyProvider {
+ protected static Service[] services = {
+ new Service("security.client.protocol.acl", ClientProtocol.class),
+ new Service("security.client.protocol.acl", AdminProtocol.class),
+ new Service("security.admin.protocol.acl", HMasterInterface.class),
+ new Service("security.masterregion.protocol.acl", RegionServerStatusProtocol.class)
+ };
+
+ @Override
+ public Service[] getServices() {
+ return services;
+ }
+
+ public static void init(Configuration conf,
+ ServiceAuthorizationManager authManager) {
+ // set service-level authorization security policy
+ conf.set("hadoop.policy.file", "hbase-policy.xml");
+ if (conf.getBoolean(
+ ServiceAuthorizationManager.SERVICE_AUTHORIZATION_CONFIG, false)) {
+ authManager.refresh(conf, new HBasePolicyProvider());
+ }
+ }
+}