You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by su...@apache.org on 2012/03/08 01:57:13 UTC
svn commit: r1298245 - in
/hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common: ./
dev-support/ src/main/java/org/apache/hadoop/ipc/
src/main/java/org/apache/hadoop/util/ src/main/proto/
Author: suresh
Date: Thu Mar 8 00:57:13 2012
New Revision: 1298245
URL: http://svn.apache.org/viewvc?rev=1298245&view=rev
Log:
HADOOP-7557. Merge r1295261 from trunk to 0.23
Added:
hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/IpcException.java
- copied unchanged from r1295261, hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/IpcException.java
hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/main/proto/IpcConnectionContext.proto
- copied unchanged from r1295261, hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/proto/IpcConnectionContext.proto
Removed:
hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ConnectionHeader.java
Modified:
hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/CHANGES.txt
hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/dev-support/findbugsExcludeFile.xml
hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java
hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java
hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/ProtoUtil.java
Modified: hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/CHANGES.txt?rev=1298245&r1=1298244&r2=1298245&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/CHANGES.txt (original)
+++ hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/CHANGES.txt Thu Mar 8 00:57:13 2012
@@ -79,6 +79,8 @@ Release 0.23.3 - UNRELEASED
HADOOP-8108. Move method getHostPortString() from NameNode to NetUtils.
(Brandon Li via jitendra)
+ HADOOP-7557 Make IPC header be extensible (sanjay radia)
+
OPTIMIZATIONS
BUG FIXES
Modified: hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/dev-support/findbugsExcludeFile.xml
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/dev-support/findbugsExcludeFile.xml?rev=1298245&r1=1298244&r2=1298245&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/dev-support/findbugsExcludeFile.xml (original)
+++ hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/dev-support/findbugsExcludeFile.xml Thu Mar 8 00:57:13 2012
@@ -278,4 +278,8 @@
<!-- protobuf generated code -->
<Class name="~org\.apache\.hadoop\.ipc\.protobuf\.ProtocolInfoProtos.*"/>
</Match>
+ <Match>
+ <!-- protobuf generated code -->
+ <Class name="~org\.apache\.hadoop\.ipc\.protobuf\.IpcConnectionContextProtos.*"/>
+ </Match>
</FindBugsFilter>
Modified: hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java?rev=1298245&r1=1298244&r2=1298245&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java Thu Mar 8 00:57:13 2012
@@ -51,6 +51,7 @@ import org.apache.hadoop.conf.Configurat
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.ipc.RpcPayloadHeader.*;
+import org.apache.hadoop.ipc.protobuf.IpcConnectionContextProtos.IpcConnectionContextProto;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
@@ -66,6 +67,7 @@ import org.apache.hadoop.security.token.
import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.hadoop.security.token.TokenSelector;
import org.apache.hadoop.security.token.TokenInfo;
+import org.apache.hadoop.util.ProtoUtil;
import org.apache.hadoop.util.ReflectionUtils;
/** A client for an IPC service. IPC calls take a single {@link Writable} as a
@@ -211,7 +213,7 @@ public class Client {
private class Connection extends Thread {
private InetSocketAddress server; // server ip:port
private String serverPrincipal; // server's krb5 principal name
- private ConnectionHeader header; // connection header
+ private IpcConnectionContextProto connectionContext; // connection context
private final ConnectionId remoteId; // connection id
private AuthMethod authMethod; // authentication method
private boolean useSasl;
@@ -292,8 +294,8 @@ public class Client {
authMethod = AuthMethod.KERBEROS;
}
- header =
- new ConnectionHeader(RPC.getProtocolName(protocol), ticket, authMethod);
+ connectionContext = ProtoUtil.makeIpcConnectionContext(
+ RPC.getProtocolName(protocol), ticket, authMethod);
if (LOG.isDebugEnabled())
LOG.debug("Use " + authMethod + " authentication for protocol "
@@ -563,7 +565,7 @@ public class Client {
setupConnection();
InputStream inStream = NetUtils.getInputStream(socket);
OutputStream outStream = NetUtils.getOutputStream(socket);
- writeRpcHeader(outStream);
+ writeConnectionHeader(outStream);
if (useSasl) {
final InputStream in2 = inStream;
final OutputStream out2 = outStream;
@@ -597,8 +599,11 @@ public class Client {
} else {
// fall back to simple auth because server told us so.
authMethod = AuthMethod.SIMPLE;
- header = new ConnectionHeader(header.getProtocol(), header
- .getUgi(), authMethod);
+ // remake the connectionContext
+ connectionContext = ProtoUtil.makeIpcConnectionContext(
+ connectionContext.getProtocol(),
+ ProtoUtil.getUgi(connectionContext.getUserInfo()),
+ authMethod);
useSasl = false;
}
}
@@ -678,13 +683,26 @@ public class Client {
". Already tried " + curRetries + " time(s).");
}
- /* Write the RPC header */
- private void writeRpcHeader(OutputStream outStream) throws IOException {
+ /**
+ * Write the connection header - this is sent when connection is established
+ * +----------------------------------+
+ * | "hrpc" 4 bytes |
+ * +----------------------------------+
+ * | Version (1 bytes) |
+ * +----------------------------------+
+ * | Authmethod (1 byte) |
+ * +----------------------------------+
+ * | IpcSerializationType (1 byte) |
+ * +----------------------------------+
+ */
+ private void writeConnectionHeader(OutputStream outStream)
+ throws IOException {
DataOutputStream out = new DataOutputStream(new BufferedOutputStream(outStream));
// Write out the header, version and authentication method
out.write(Server.HEADER.array());
out.write(Server.CURRENT_VERSION);
authMethod.write(out);
+ Server.IpcSerializationType.PROTOBUF.write(out);
out.flush();
}
@@ -694,7 +712,7 @@ public class Client {
private void writeHeader() throws IOException {
// Write out the ConnectionHeader
DataOutputBuffer buf = new DataOutputBuffer();
- header.write(buf);
+ connectionContext.writeTo(buf);
// Write out the payload length
int bufLen = buf.getLength();
@@ -1261,16 +1279,16 @@ public class Client {
public static class ConnectionId {
InetSocketAddress address;
UserGroupInformation ticket;
- Class<?> protocol;
+ final Class<?> protocol;
private static final int PRIME = 16777619;
- private int rpcTimeout;
- private String serverPrincipal;
- private int maxIdleTime; //connections will be culled if it was idle for
+ private final int rpcTimeout;
+ private final String serverPrincipal;
+ private final int maxIdleTime; //connections will be culled if it was idle for
//maxIdleTime msecs
- private int maxRetries; //the max. no. of retries for socket connections
- private boolean tcpNoDelay; // if T then disable Nagle's Algorithm
- private boolean doPing; //do we need to send ping message
- private int pingInterval; // how often sends ping to the server in msecs
+ private final int maxRetries; //the max. no. of retries for socket connections
+ private final boolean tcpNoDelay; // if T then disable Nagle's Algorithm
+ private final boolean doPing; //do we need to send ping message
+ private final int pingInterval; // how often sends ping to the server in msecs
ConnectionId(InetSocketAddress address, Class<?> protocol,
UserGroupInformation ticket, int rpcTimeout,
Modified: hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java?rev=1298245&r1=1298244&r2=1298245&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java Thu Mar 8 00:57:13 2012
@@ -21,6 +21,7 @@ package org.apache.hadoop.ipc;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
+import java.io.DataOutput;
import java.io.DataOutputStream;
import java.io.IOException;
import java.net.BindException;
@@ -74,6 +75,7 @@ import org.apache.hadoop.ipc.RpcPayloadH
import org.apache.hadoop.ipc.RpcPayloadHeader.RpcPayloadOperation;
import org.apache.hadoop.ipc.metrics.RpcDetailedMetrics;
import org.apache.hadoop.ipc.metrics.RpcMetrics;
+import org.apache.hadoop.ipc.protobuf.IpcConnectionContextProtos.IpcConnectionContextProto;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.security.SaslRpcServer;
@@ -90,6 +92,7 @@ import org.apache.hadoop.security.author
import org.apache.hadoop.security.token.SecretManager;
import org.apache.hadoop.security.token.SecretManager.InvalidToken;
import org.apache.hadoop.security.token.TokenIdentifier;
+import org.apache.hadoop.util.ProtoUtil;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.StringUtils;
@@ -111,6 +114,22 @@ public abstract class Server {
public static final ByteBuffer HEADER = ByteBuffer.wrap("hrpc".getBytes());
/**
+ * Serialization type for ConnectionContext and RpcPayloadHeader
+ */
+ public enum IpcSerializationType {
+ // Add new serialization type to the end without affecting the enum order
+ PROTOBUF;
+
+ void write(DataOutput out) throws IOException {
+ out.writeByte(this.ordinal());
+ }
+
+ static IpcSerializationType fromByte(byte b) {
+ return IpcSerializationType.values()[b];
+ }
+ }
+
+ /**
* If the user accidentally sends an HTTP GET to an IPC port, we detect this
* and send back a nicer response.
*/
@@ -133,7 +152,8 @@ public abstract class Server {
// 5 : Introduced use of {@link ArrayPrimitiveWritable$Internal}
// in ObjectWritable to efficiently transmit arrays of primitives
// 6 : Made RPC payload header explicit
- public static final byte CURRENT_VERSION = 6;
+ // 7 : Changed Ipc Connection Header to use Protocol buffers
+ public static final byte CURRENT_VERSION = 7;
/**
* Initial and max size of response buffer
@@ -968,9 +988,9 @@ public abstract class Server {
/** Reads calls from a connection and queues them for handling. */
public class Connection {
- private boolean rpcHeaderRead = false; // if initial rpc header is read
- private boolean headerRead = false; //if the connection header that
- //follows version is read.
+ private boolean connectionHeaderRead = false; // connection header is read?
+ private boolean connectionContextRead = false; //if connection context that
+ //follows connection header is read
private SocketChannel channel;
private ByteBuffer data;
@@ -986,14 +1006,14 @@ public abstract class Server {
private int remotePort;
private InetAddress addr;
- ConnectionHeader header = new ConnectionHeader();
+ IpcConnectionContextProto connectionContext;
String protocolName;
boolean useSasl;
SaslServer saslServer;
private AuthMethod authMethod;
private boolean saslContextEstablished;
private boolean skipInitialSaslHandshake;
- private ByteBuffer rpcHeaderBuffer;
+ private ByteBuffer connectionHeaderBuf = null;
private ByteBuffer unwrappedData;
private ByteBuffer unwrappedDataLengthBuffer;
@@ -1241,17 +1261,17 @@ public abstract class Server {
return count;
}
- if (!rpcHeaderRead) {
+ if (!connectionHeaderRead) {
//Every connection is expected to send the header.
- if (rpcHeaderBuffer == null) {
- rpcHeaderBuffer = ByteBuffer.allocate(2);
+ if (connectionHeaderBuf == null) {
+ connectionHeaderBuf = ByteBuffer.allocate(3);
}
- count = channelRead(channel, rpcHeaderBuffer);
- if (count < 0 || rpcHeaderBuffer.remaining() > 0) {
+ count = channelRead(channel, connectionHeaderBuf);
+ if (count < 0 || connectionHeaderBuf.remaining() > 0) {
return count;
}
- int version = rpcHeaderBuffer.get(0);
- byte[] method = new byte[] {rpcHeaderBuffer.get(1)};
+ int version = connectionHeaderBuf.get(0);
+ byte[] method = new byte[] {connectionHeaderBuf.get(1)};
authMethod = AuthMethod.read(new DataInputStream(
new ByteArrayInputStream(method)));
dataLengthBuffer.flip();
@@ -1273,6 +1293,14 @@ public abstract class Server {
setupBadVersionResponse(version);
return -1;
}
+
+ IpcSerializationType serializationType = IpcSerializationType
+ .fromByte(connectionHeaderBuf.get(2));
+ if (serializationType != IpcSerializationType.PROTOBUF) {
+ respondUnsupportedSerialization(serializationType);
+ return -1;
+ }
+
dataLengthBuffer.clear();
if (authMethod == null) {
throw new IOException("Unable to read authentication method");
@@ -1302,8 +1330,8 @@ public abstract class Server {
useSasl = true;
}
- rpcHeaderBuffer = null;
- rpcHeaderRead = true;
+ connectionHeaderBuf = null;
+ connectionHeaderRead = true;
continue;
}
@@ -1334,7 +1362,7 @@ public abstract class Server {
skipInitialSaslHandshake = false;
continue;
}
- boolean isHeaderRead = headerRead;
+ boolean isHeaderRead = connectionContextRead;
if (useSasl) {
saslReadAndProcess(data.array());
} else {
@@ -1383,6 +1411,17 @@ public abstract class Server {
}
}
+ private void respondUnsupportedSerialization(IpcSerializationType st) throws IOException {
+ String errMsg = "Server IPC version " + CURRENT_VERSION
+ + " do not support serilization " + st.toString();
+ ByteArrayOutputStream buffer = new ByteArrayOutputStream();
+
+ Call fakeCall = new Call(-1, null, this);
+ setupResponse(buffer, fakeCall, Status.FATAL, null,
+ IpcException.class.getName(), errMsg);
+ responder.doRespond(fakeCall);
+ }
+
private void setupHttpRequestOnIpcPortResponse() throws IOException {
Call fakeCall = new Call(0, null, this);
fakeCall.setResponse(ByteBuffer.wrap(
@@ -1390,15 +1429,15 @@ public abstract class Server {
responder.doRespond(fakeCall);
}
- /// Reads the connection header following version
- private void processHeader(byte[] buf) throws IOException {
+ /** Reads the connection context following the connection header */
+ private void processConnectionContext(byte[] buf) throws IOException {
DataInputStream in =
new DataInputStream(new ByteArrayInputStream(buf));
- header.readFields(in);
- protocolName = header.getProtocol();
+ connectionContext = IpcConnectionContextProto.parseFrom(in);
+ protocolName = connectionContext.hasProtocol() ? connectionContext
+ .getProtocol() : null;
-
- UserGroupInformation protocolUser = header.getUgi();
+ UserGroupInformation protocolUser = ProtoUtil.getUgi(connectionContext);
if (!useSasl) {
user = protocolUser;
if (user != null) {
@@ -1472,15 +1511,15 @@ public abstract class Server {
private void processOneRpc(byte[] buf) throws IOException,
InterruptedException {
- if (headerRead) {
+ if (connectionContextRead) {
processData(buf);
} else {
- processHeader(buf);
- headerRead = true;
+ processConnectionContext(buf);
+ connectionContextRead = true;
if (!authorizeConnection()) {
throw new AccessControlException("Connection from " + this
- + " for protocol " + header.getProtocol()
- + " is unauthorized for user " + user);
+ + " for protocol " + connectionContext.getProtocol()
+ + " is unauthorized for user " + user);
}
}
}
@@ -1549,9 +1588,9 @@ public abstract class Server {
&& (authMethod != AuthMethod.DIGEST)) {
ProxyUsers.authorize(user, this.getHostAddress(), conf);
}
- authorize(user, header, getHostInetAddress());
+ authorize(user, protocolName, getHostInetAddress());
if (LOG.isDebugEnabled()) {
- LOG.debug("Successfully authorized " + header);
+ LOG.debug("Successfully authorized " + connectionContext);
}
rpcMetrics.incrAuthorizationSuccesses();
} catch (AuthorizationException ae) {
@@ -1596,11 +1635,10 @@ public abstract class Server {
while (running) {
try {
final Call call = callQueue.take(); // pop the queue; maybe blocked here
-
- if (LOG.isDebugEnabled())
+ if (LOG.isDebugEnabled()) {
LOG.debug(getName() + ": has Call#" + call.callId +
"for RpcKind " + call.rpcKind + " from " + call.connection);
-
+ }
String errorClass = null;
String error = null;
Writable value = null;
@@ -1921,21 +1959,22 @@ public abstract class Server {
* Authorize the incoming client connection.
*
* @param user client user
- * @param connection incoming connection
+ * @param protocolName - the protocol
* @param addr InetAddress of incoming connection
* @throws AuthorizationException when the client isn't authorized to talk the protocol
*/
- public void authorize(UserGroupInformation user,
- ConnectionHeader connection,
- InetAddress addr
- ) throws AuthorizationException {
+ private void authorize(UserGroupInformation user, String protocolName,
+ InetAddress addr) throws AuthorizationException {
if (authorize) {
+ if (protocolName == null) {
+ throw new AuthorizationException("Null protocol not authorized");
+ }
Class<?> protocol = null;
try {
- protocol = getProtocolClass(connection.getProtocol(), getConf());
+ protocol = getProtocolClass(protocolName, getConf());
} catch (ClassNotFoundException cfne) {
throw new AuthorizationException("Unknown protocol: " +
- connection.getProtocol());
+ protocolName);
}
serviceAuthorizationManager.authorize(user, protocol, getConf(), addr);
}
Modified: hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/ProtoUtil.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/ProtoUtil.java?rev=1298245&r1=1298244&r2=1298245&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/ProtoUtil.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/ProtoUtil.java Thu Mar 8 00:57:13 2012
@@ -21,6 +21,11 @@ package org.apache.hadoop.util;
import java.io.DataInput;
import java.io.IOException;
+import org.apache.hadoop.ipc.protobuf.IpcConnectionContextProtos.IpcConnectionContextProto;
+import org.apache.hadoop.ipc.protobuf.IpcConnectionContextProtos.UserInformationProto;
+import org.apache.hadoop.security.SaslRpcServer.AuthMethod;
+import org.apache.hadoop.security.UserGroupInformation;
+
public abstract class ProtoUtil {
/**
@@ -63,4 +68,71 @@ public abstract class ProtoUtil {
return result;
}
+
+ /**
+ * This method creates the connection context using exactly the same logic
+ * as the old connection context as was done for writable where
+ * the effective and real users are set based on the auth method.
+ *
+ */
+ public static IpcConnectionContextProto makeIpcConnectionContext(
+ final String protocol,
+ final UserGroupInformation ugi, final AuthMethod authMethod) {
+ IpcConnectionContextProto.Builder result = IpcConnectionContextProto.newBuilder();
+ if (protocol != null) {
+ result.setProtocol(protocol);
+ }
+ UserInformationProto.Builder ugiProto = UserInformationProto.newBuilder();
+ if (ugi != null) {
+ /*
+ * In the connection context we send only additional user info that
+ * is not derived from the authentication done during connection setup.
+ */
+ if (authMethod == AuthMethod.KERBEROS) {
+ // Real user was established as part of the connection.
+ // Send effective user only.
+ ugiProto.setEffectiveUser(ugi.getUserName());
+ } else if (authMethod == AuthMethod.DIGEST) {
+ // With token, the connection itself establishes
+ // both real and effective user. Hence send none in header.
+ } else { // Simple authentication
+ // No user info is established as part of the connection.
+ // Send both effective user and real user
+ ugiProto.setEffectiveUser(ugi.getUserName());
+ if (ugi.getRealUser() != null) {
+ ugiProto.setRealUser(ugi.getRealUser().getUserName());
+ }
+ }
+ }
+ result.setUserInfo(ugiProto);
+ return result.build();
+ }
+
+ public static UserGroupInformation getUgi(IpcConnectionContextProto context) {
+ if (context.hasUserInfo()) {
+ UserInformationProto userInfo = context.getUserInfo();
+ return getUgi(userInfo);
+ } else {
+ return null;
+ }
+ }
+
+ public static UserGroupInformation getUgi(UserInformationProto userInfo) {
+ UserGroupInformation ugi = null;
+ String effectiveUser = userInfo.hasEffectiveUser() ? userInfo
+ .getEffectiveUser() : null;
+ String realUser = userInfo.hasRealUser() ? userInfo.getRealUser() : null;
+ if (effectiveUser != null) {
+ if (realUser != null) {
+ UserGroupInformation realUserUgi = UserGroupInformation
+ .createRemoteUser(realUser);
+ ugi = UserGroupInformation
+ .createProxyUser(effectiveUser, realUserUgi);
+ } else {
+ ugi = org.apache.hadoop.security.UserGroupInformation
+ .createRemoteUser(effectiveUser);
+ }
+ }
+ return ugi;
+ }
}