You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by vi...@apache.org on 2013/08/12 23:26:02 UTC
svn commit: r1513258 [4/9] - in
/hadoop/common/branches/YARN-321/hadoop-common-project: ./
hadoop-annotations/ hadoop-auth-examples/
hadoop-auth-examples/src/main/webapp/
hadoop-auth-examples/src/main/webapp/annonymous/
hadoop-auth-examples/src/main/we...
Modified: hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/RetryPolicies.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/RetryPolicies.java?rev=1513258&r1=1513257&r2=1513258&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/RetryPolicies.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/RetryPolicies.java Mon Aug 12 21:25:49 2013
@@ -153,7 +153,7 @@ public class RetryPolicies {
static class TryOnceThenFail implements RetryPolicy {
@Override
public RetryAction shouldRetry(Exception e, int retries, int failovers,
- boolean isMethodIdempotent) throws Exception {
+ boolean isIdempotentOrAtMostOnce) throws Exception {
return RetryAction.FAIL;
}
}
@@ -161,7 +161,7 @@ public class RetryPolicies {
static class RetryForever implements RetryPolicy {
@Override
public RetryAction shouldRetry(Exception e, int retries, int failovers,
- boolean isMethodIdempotent) throws Exception {
+ boolean isIdempotentOrAtMostOnce) throws Exception {
return RetryAction.RETRY;
}
}
@@ -196,7 +196,7 @@ public class RetryPolicies {
@Override
public RetryAction shouldRetry(Exception e, int retries, int failovers,
- boolean isMethodIdempotent) throws Exception {
+ boolean isIdempotentOrAtMostOnce) throws Exception {
if (retries >= maxRetries) {
return RetryAction.FAIL;
}
@@ -305,7 +305,7 @@ public class RetryPolicies {
@Override
public RetryAction shouldRetry(Exception e, int curRetry, int failovers,
- boolean isMethodIdempotent) throws Exception {
+ boolean isIdempotentOrAtMostOnce) throws Exception {
final Pair p = searchPair(curRetry);
if (p == null) {
//no more retries.
@@ -435,12 +435,12 @@ public class RetryPolicies {
@Override
public RetryAction shouldRetry(Exception e, int retries, int failovers,
- boolean isMethodIdempotent) throws Exception {
+ boolean isIdempotentOrAtMostOnce) throws Exception {
RetryPolicy policy = exceptionToPolicyMap.get(e.getClass());
if (policy == null) {
policy = defaultPolicy;
}
- return policy.shouldRetry(e, retries, failovers, isMethodIdempotent);
+ return policy.shouldRetry(e, retries, failovers, isIdempotentOrAtMostOnce);
}
}
@@ -463,7 +463,7 @@ public class RetryPolicies {
@Override
public RetryAction shouldRetry(Exception e, int retries, int failovers,
- boolean isMethodIdempotent) throws Exception {
+ boolean isIdempotentOrAtMostOnce) throws Exception {
RetryPolicy policy = null;
if (e instanceof RemoteException) {
policy = exceptionNameToPolicyMap.get(
@@ -472,7 +472,7 @@ public class RetryPolicies {
if (policy == null) {
policy = defaultPolicy;
}
- return policy.shouldRetry(e, retries, failovers, isMethodIdempotent);
+ return policy.shouldRetry(e, retries, failovers, isIdempotentOrAtMostOnce);
}
}
@@ -533,7 +533,7 @@ public class RetryPolicies {
@Override
public RetryAction shouldRetry(Exception e, int retries,
- int failovers, boolean isMethodIdempotent) throws Exception {
+ int failovers, boolean isIdempotentOrAtMostOnce) throws Exception {
if (failovers >= maxFailovers) {
return new RetryAction(RetryAction.RetryDecision.FAIL, 0,
"failovers (" + failovers + ") exceeded maximum allowed ("
@@ -553,7 +553,7 @@ public class RetryPolicies {
calculateExponentialTime(delayMillis, failovers, maxDelayBase));
} else if (e instanceof SocketException ||
(e instanceof IOException && !(e instanceof RemoteException))) {
- if (isMethodIdempotent) {
+ if (isIdempotentOrAtMostOnce) {
return RetryAction.FAILOVER_AND_RETRY;
} else {
return new RetryAction(RetryAction.RetryDecision.FAIL, 0,
@@ -562,7 +562,7 @@ public class RetryPolicies {
}
} else {
return fallbackPolicy.shouldRetry(e, retries, failovers,
- isMethodIdempotent);
+ isIdempotentOrAtMostOnce);
}
}
Modified: hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/RetryPolicy.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/RetryPolicy.java?rev=1513258&r1=1513257&r2=1513258&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/RetryPolicy.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/RetryPolicy.java Mon Aug 12 21:25:49 2013
@@ -75,24 +75,25 @@ public interface RetryPolicy {
/**
* <p>
- * Determines whether the framework should retry a
- * method for the given exception, and the number
- * of retries that have been made for that operation
+ * Determines whether the framework should retry a method for the given
+ * exception, and the number of retries that have been made for that operation
* so far.
* </p>
+ *
* @param e The exception that caused the method to fail
* @param retries The number of times the method has been retried
* @param failovers The number of times the method has failed over to a
- * different backend implementation
- * @param isMethodIdempotent <code>true</code> if the method is idempotent
- * and so can reasonably be retried on failover when we don't know if the
- * previous attempt reached the server or not
+ * different backend implementation
+ * @param isIdempotentOrAtMostOnce <code>true</code> if the method is
+ * {@link Idempotent} or {@link AtMostOnce} and so can reasonably be
+ * retried on failover when we don't know if the previous attempt
+ * reached the server or not
* @return <code>true</code> if the method should be retried,
- * <code>false</code> if the method should not be retried
- * but shouldn't fail with an exception (only for void methods)
- * @throws Exception The re-thrown exception <code>e</code> indicating
- * that the method failed and should not be retried further
+ * <code>false</code> if the method should not be retried but
+ * shouldn't fail with an exception (only for void methods)
+ * @throws Exception The re-thrown exception <code>e</code> indicating that
+ * the method failed and should not be retried further
*/
public RetryAction shouldRetry(Exception e, int retries, int failovers,
- boolean isMethodIdempotent) throws Exception;
+ boolean isIdempotentOrAtMostOnce) throws Exception;
}
Modified: hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/RetryProxy.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/RetryProxy.java?rev=1513258&r1=1513257&r2=1513258&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/RetryProxy.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/RetryProxy.java Mon Aug 12 21:25:49 2013
@@ -36,10 +36,10 @@ public class RetryProxy {
* @param retryPolicy the policy for retrying method call failures
* @return the retry proxy
*/
- public static Object create(Class<?> iface, Object implementation,
+ public static <T> Object create(Class<T> iface, T implementation,
RetryPolicy retryPolicy) {
return RetryProxy.create(iface,
- new DefaultFailoverProxyProvider(iface, implementation),
+ new DefaultFailoverProxyProvider<T>(iface, implementation),
retryPolicy);
}
@@ -53,12 +53,12 @@ public class RetryProxy {
* @param retryPolicy the policy for retrying or failing over method call failures
* @return the retry proxy
*/
- public static Object create(Class<?> iface, FailoverProxyProvider proxyProvider,
- RetryPolicy retryPolicy) {
+ public static <T> Object create(Class<T> iface,
+ FailoverProxyProvider<T> proxyProvider, RetryPolicy retryPolicy) {
return Proxy.newProxyInstance(
proxyProvider.getInterface().getClassLoader(),
new Class<?>[] { iface },
- new RetryInvocationHandler(proxyProvider, retryPolicy)
+ new RetryInvocationHandler<T>(proxyProvider, retryPolicy)
);
}
@@ -73,10 +73,10 @@ public class RetryProxy {
* @param methodNameToPolicyMap a map of method names to retry policies
* @return the retry proxy
*/
- public static Object create(Class<?> iface, Object implementation,
+ public static <T> Object create(Class<T> iface, T implementation,
Map<String,RetryPolicy> methodNameToPolicyMap) {
return create(iface,
- new DefaultFailoverProxyProvider(iface, implementation),
+ new DefaultFailoverProxyProvider<T>(iface, implementation),
methodNameToPolicyMap,
RetryPolicies.TRY_ONCE_THEN_FAIL);
}
@@ -92,13 +92,14 @@ public class RetryProxy {
* @param methodNameToPolicyMapa map of method names to retry policies
* @return the retry proxy
*/
- public static Object create(Class<?> iface, FailoverProxyProvider proxyProvider,
+ public static <T> Object create(Class<T> iface,
+ FailoverProxyProvider<T> proxyProvider,
Map<String,RetryPolicy> methodNameToPolicyMap,
RetryPolicy defaultPolicy) {
return Proxy.newProxyInstance(
proxyProvider.getInterface().getClassLoader(),
new Class<?>[] { iface },
- new RetryInvocationHandler(proxyProvider, defaultPolicy,
+ new RetryInvocationHandler<T>(proxyProvider, defaultPolicy,
methodNameToPolicyMap)
);
}
Propchange: hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/package.html
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/serializer/avro/package.html
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/serializer/package.html
------------------------------------------------------------------------------
svn:eol-style = native
Modified: hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java?rev=1513258&r1=1513257&r2=1513258&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java Mon Aug 12 21:25:49 2013
@@ -18,8 +18,11 @@
package org.apache.hadoop.ipc;
+import static org.apache.hadoop.ipc.RpcConstants.*;
+
import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
+import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.FilterInputStream;
@@ -33,6 +36,7 @@ import java.net.Socket;
import java.net.SocketTimeoutException;
import java.net.UnknownHostException;
import java.security.PrivilegedExceptionAction;
+import java.util.Arrays;
import java.util.Hashtable;
import java.util.Iterator;
import java.util.Map.Entry;
@@ -45,9 +49,11 @@ import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import javax.net.SocketFactory;
+import javax.security.sasl.Sasl;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -62,7 +68,10 @@ import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.retry.RetryPolicies;
import org.apache.hadoop.io.retry.RetryPolicy;
import org.apache.hadoop.io.retry.RetryPolicy.RetryAction;
+import org.apache.hadoop.ipc.ProtobufRpcEngine.RpcRequestMessageWrapper;
+import org.apache.hadoop.ipc.RPC.RpcKind;
import org.apache.hadoop.ipc.Server.AuthProtocol;
+import org.apache.hadoop.ipc.protobuf.IpcConnectionContextProtos.IpcConnectionContextProto;
import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcRequestHeaderProto;
import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcRequestHeaderProto.OperationProto;
import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcResponseHeaderProto;
@@ -75,15 +84,13 @@ import org.apache.hadoop.security.SaslRp
import org.apache.hadoop.security.SaslRpcServer.AuthMethod;
import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod;
-import org.apache.hadoop.security.token.Token;
-import org.apache.hadoop.security.token.TokenIdentifier;
-import org.apache.hadoop.security.token.TokenInfo;
-import org.apache.hadoop.security.token.TokenSelector;
import org.apache.hadoop.util.ProtoUtil;
import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.Time;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.google.protobuf.CodedOutputStream;
@@ -97,11 +104,26 @@ public class Client {
public static final Log LOG = LogFactory.getLog(Client.class);
+ /** A counter for generating call IDs. */
+ private static final AtomicInteger callIdCounter = new AtomicInteger();
+
+ private static final ThreadLocal<Integer> callId = new ThreadLocal<Integer>();
+ private static final ThreadLocal<Integer> retryCount = new ThreadLocal<Integer>();
+
+ /** Set call id and retry count for the next call. */
+ public static void setCallIdAndRetryCount(int cid, int rc) {
+ Preconditions.checkArgument(cid != RpcConstants.INVALID_CALL_ID);
+ Preconditions.checkState(callId.get() == null);
+ Preconditions.checkArgument(rc != RpcConstants.INVALID_RETRY_COUNT);
+
+ callId.set(cid);
+ retryCount.set(rc);
+ }
+
private Hashtable<ConnectionId, Connection> connections =
new Hashtable<ConnectionId, Connection>();
private Class<? extends Writable> valueClass; // class of call values
- private int counter; // counter for call ids
private AtomicBoolean running = new AtomicBoolean(true); // if client runs
final private Configuration conf;
@@ -111,8 +133,9 @@ public class Client {
private final int connectionTimeout;
private final boolean fallbackAllowed;
+ private final byte[] clientId;
- final static int PING_CALL_ID = -1;
+ final static int CONNECTION_CONTEXT_CALL_ID = -3;
/**
* Executor on which IPC calls' parameters are sent.
@@ -251,23 +274,58 @@ public class Client {
synchronized boolean isZeroReference() {
return refCount==0;
}
-
+
+ /** Check the rpc response header. */
+ void checkResponse(RpcResponseHeaderProto header) throws IOException {
+ if (header == null) {
+ throw new IOException("Response is null.");
+ }
+ if (header.hasClientId()) {
+ // check client IDs
+ final byte[] id = header.getClientId().toByteArray();
+ if (!Arrays.equals(id, RpcConstants.DUMMY_CLIENT_ID)) {
+ if (!Arrays.equals(id, clientId)) {
+ throw new IOException("Client IDs not matched: local ID="
+ + StringUtils.byteToHexString(clientId) + ", ID in reponse="
+ + StringUtils.byteToHexString(header.getClientId().toByteArray()));
+ }
+ }
+ }
+ }
+
+ Call createCall(RPC.RpcKind rpcKind, Writable rpcRequest) {
+ return new Call(rpcKind, rpcRequest);
+ }
+
/**
* Class that represents an RPC call
*/
- private class Call {
+ static class Call {
final int id; // call id
+ final int retry; // retry count
final Writable rpcRequest; // the serialized rpc request
Writable rpcResponse; // null if rpc has error
IOException error; // exception, null if success
final RPC.RpcKind rpcKind; // Rpc EngineKind
boolean done; // true when call is done
- protected Call(RPC.RpcKind rpcKind, Writable param) {
+ private Call(RPC.RpcKind rpcKind, Writable param) {
this.rpcKind = rpcKind;
this.rpcRequest = param;
- synchronized (Client.this) {
- this.id = counter++;
+
+ final Integer id = callId.get();
+ if (id == null) {
+ this.id = nextCallId();
+ } else {
+ callId.set(null);
+ this.id = id;
+ }
+
+ final Integer rc = retryCount.get();
+ if (rc == null) {
+ this.retry = 0;
+ } else {
+ this.retry = rc;
}
}
@@ -308,10 +366,9 @@ public class Client {
* socket: responses may be delivered out of order. */
private class Connection extends Thread {
private InetSocketAddress server; // server ip:port
- private String serverPrincipal; // server's krb5 principal name
private final ConnectionId remoteId; // connection id
private AuthMethod authMethod; // authentication method
- private Token<? extends TokenIdentifier> token;
+ private AuthProtocol authProtocol;
private int serviceClass;
private SaslRpcClient saslRpcClient;
@@ -326,6 +383,7 @@ public class Client {
private boolean tcpNoDelay; // if T then disable Nagle's Algorithm
private boolean doPing; //do we need to send ping message
private int pingInterval; // how often sends ping to the server in msecs
+ private ByteArrayOutputStream pingRequest; // ping message
// currently active calls
private Hashtable<Integer, Call> calls = new Hashtable<Integer, Call>();
@@ -351,6 +409,15 @@ public class Client {
this.maxRetriesOnSocketTimeouts = remoteId.getMaxRetriesOnSocketTimeouts();
this.tcpNoDelay = remoteId.getTcpNoDelay();
this.doPing = remoteId.getDoPing();
+ if (doPing) {
+ // construct a RPC header with the callId as the ping callId
+ pingRequest = new ByteArrayOutputStream();
+ RpcRequestHeaderProto pingHeader = ProtoUtil
+ .makeRpcRequestHeader(RpcKind.RPC_PROTOCOL_BUFFER,
+ OperationProto.RPC_FINAL_PACKET, PING_CALL_ID,
+ RpcConstants.INVALID_RETRY_COUNT, clientId);
+ pingHeader.writeDelimitedTo(pingRequest);
+ }
this.pingInterval = remoteId.getPingInterval();
this.serviceClass = serviceClass;
if (LOG.isDebugEnabled()) {
@@ -358,45 +425,11 @@ public class Client {
}
UserGroupInformation ticket = remoteId.getTicket();
- Class<?> protocol = remoteId.getProtocol();
- if (protocol != null) {
- TokenInfo tokenInfo = SecurityUtil.getTokenInfo(protocol, conf);
- if (tokenInfo != null) {
- TokenSelector<? extends TokenIdentifier> tokenSelector = null;
- try {
- tokenSelector = tokenInfo.value().newInstance();
- } catch (InstantiationException e) {
- throw new IOException(e.toString());
- } catch (IllegalAccessException e) {
- throw new IOException(e.toString());
- }
- token = tokenSelector.selectToken(
- SecurityUtil.buildTokenService(server),
- ticket.getTokens());
- }
- KerberosInfo krbInfo = SecurityUtil.getKerberosInfo(protocol, conf);
- if (krbInfo != null) {
- serverPrincipal = remoteId.getServerPrincipal();
- if (LOG.isDebugEnabled()) {
- LOG.debug("RPC Server's Kerberos principal name for protocol="
- + protocol.getCanonicalName() + " is " + serverPrincipal);
- }
- }
- }
-
- AuthenticationMethod authentication;
- if (token != null) {
- authentication = AuthenticationMethod.TOKEN;
- } else if (ticket != null) {
- authentication = ticket.getRealAuthenticationMethod();
- } else { // this only happens in lazy tests
- authentication = AuthenticationMethod.SIMPLE;
- }
- authMethod = authentication.getAuthMethod();
-
- if (LOG.isDebugEnabled())
- LOG.debug("Use " + authMethod + " authentication for protocol "
- + protocol.getSimpleName());
+ // try SASL if security is enabled or if the ugi contains tokens.
+ // this causes a SIMPLE client with tokens to attempt SASL
+ boolean trySasl = UserGroupInformation.isSecurityEnabled() ||
+ (ticket != null && !ticket.getTokens().isEmpty());
+ this.authProtocol = trySasl ? AuthProtocol.SASL : AuthProtocol.NONE;
this.setName("IPC Client (" + socketFactory.hashCode() +") connection to " +
server.toString() +
@@ -507,11 +540,10 @@ public class Client {
return false;
}
- private synchronized boolean setupSaslConnection(final InputStream in2,
- final OutputStream out2)
- throws IOException {
- saslRpcClient = new SaslRpcClient(authMethod, token, serverPrincipal,
- fallbackAllowed);
+ private synchronized AuthMethod setupSaslConnection(final InputStream in2,
+ final OutputStream out2) throws IOException, InterruptedException {
+ saslRpcClient = new SaslRpcClient(remoteId.getTicket(),
+ remoteId.getProtocol(), remoteId.getAddress(), conf);
return saslRpcClient.saslConnect(in2, out2);
}
@@ -549,7 +581,8 @@ public class Client {
* client, to ensure Server matching address of the client connection
* to host name in principal passed.
*/
- if (UserGroupInformation.isSecurityEnabled()) {
+ UserGroupInformation ticket = remoteId.getTicket();
+ if (ticket != null && ticket.hasKerberosCredentials()) {
KerberosInfo krbInfo =
remoteId.getProtocol().getAnnotation(KerberosInfo.class);
if (krbInfo != null && krbInfo.clientPrincipal() != null) {
@@ -627,7 +660,7 @@ public class Client {
} else {
String msg = "Couldn't setup connection for "
+ UserGroupInformation.getLoginUser().getUserName() + " to "
- + serverPrincipal;
+ + remoteId;
LOG.warn(msg);
throw (IOException) new IOException(msg).initCause(ex);
}
@@ -647,7 +680,7 @@ public class Client {
* a header to the server and starts
* the connection thread that waits for responses.
*/
- private synchronized void setupIOstreams() throws InterruptedException {
+ private synchronized void setupIOstreams() {
if (socket != null || shouldCloseConnection.get()) {
return;
}
@@ -663,23 +696,24 @@ public class Client {
InputStream inStream = NetUtils.getInputStream(socket);
OutputStream outStream = NetUtils.getOutputStream(socket);
writeConnectionHeader(outStream);
- if (authMethod != AuthMethod.SIMPLE) {
+ if (authProtocol == AuthProtocol.SASL) {
final InputStream in2 = inStream;
final OutputStream out2 = outStream;
UserGroupInformation ticket = remoteId.getTicket();
if (ticket.getRealUser() != null) {
ticket = ticket.getRealUser();
}
- boolean continueSasl = false;
try {
- continueSasl = ticket
- .doAs(new PrivilegedExceptionAction<Boolean>() {
+ authMethod = ticket
+ .doAs(new PrivilegedExceptionAction<AuthMethod>() {
@Override
- public Boolean run() throws IOException {
+ public AuthMethod run()
+ throws IOException, InterruptedException {
return setupSaslConnection(in2, out2);
}
});
} catch (Exception ex) {
+ authMethod = saslRpcClient.getAuthMethod();
if (rand == null) {
rand = new Random();
}
@@ -687,23 +721,32 @@ public class Client {
ticket);
continue;
}
- if (continueSasl) {
+ if (authMethod != AuthMethod.SIMPLE) {
// Sasl connect is successful. Let's set up Sasl i/o streams.
inStream = saslRpcClient.getInputStream(inStream);
outStream = saslRpcClient.getOutputStream(outStream);
- } else {
- // fall back to simple auth because server told us so.
- authMethod = AuthMethod.SIMPLE;
+ // for testing
+ remoteId.saslQop =
+ (String)saslRpcClient.getNegotiatedProperty(Sasl.QOP);
+ } else if (UserGroupInformation.isSecurityEnabled() &&
+ !fallbackAllowed) {
+ throw new IOException("Server asks us to fall back to SIMPLE " +
+ "auth, but this client is configured to only allow secure " +
+ "connections.");
}
}
if (doPing) {
- this.in = new DataInputStream(new BufferedInputStream(
- new PingInputStream(inStream)));
- } else {
- this.in = new DataInputStream(new BufferedInputStream(inStream));
+ inStream = new PingInputStream(inStream);
}
- this.out = new DataOutputStream(new BufferedOutputStream(outStream));
+ this.in = new DataInputStream(new BufferedInputStream(inStream));
+
+ // SASL may have already buffered the stream
+ if (!(outStream instanceof BufferedOutputStream)) {
+ outStream = new BufferedOutputStream(outStream);
+ }
+ this.out = new DataOutputStream(outStream);
+
writeConnectionContext(remoteId, authMethod);
// update last activity time
@@ -810,17 +853,9 @@ public class Client {
throws IOException {
DataOutputStream out = new DataOutputStream(new BufferedOutputStream(outStream));
// Write out the header, version and authentication method
- out.write(Server.HEADER.array());
- out.write(Server.CURRENT_VERSION);
+ out.write(RpcConstants.HEADER.array());
+ out.write(RpcConstants.CURRENT_VERSION);
out.write(serviceClass);
- final AuthProtocol authProtocol;
- switch (authMethod) {
- case SIMPLE:
- authProtocol = AuthProtocol.NONE;
- break;
- default:
- authProtocol = AuthProtocol.SASL;
- }
out.write(authProtocol.callId);
out.flush();
}
@@ -832,17 +867,20 @@ public class Client {
AuthMethod authMethod)
throws IOException {
// Write out the ConnectionHeader
- DataOutputBuffer buf = new DataOutputBuffer();
- ProtoUtil.makeIpcConnectionContext(
+ IpcConnectionContextProto message = ProtoUtil.makeIpcConnectionContext(
RPC.getProtocolName(remoteId.getProtocol()),
remoteId.getTicket(),
- authMethod).writeTo(buf);
+ authMethod);
+ RpcRequestHeaderProto connectionContextHeader = ProtoUtil
+ .makeRpcRequestHeader(RpcKind.RPC_PROTOCOL_BUFFER,
+ OperationProto.RPC_FINAL_PACKET, CONNECTION_CONTEXT_CALL_ID,
+ RpcConstants.INVALID_RETRY_COUNT, clientId);
+ RpcRequestMessageWrapper request =
+ new RpcRequestMessageWrapper(connectionContextHeader, message);
// Write out the packet length
- int bufLen = buf.getLength();
-
- out.writeInt(bufLen);
- out.write(buf.getData(), 0, bufLen);
+ out.writeInt(request.getLength());
+ request.write(out);
}
/* wait till someone signals us to start reading RPC response or
@@ -888,7 +926,8 @@ public class Client {
if ( curTime - lastActivity.get() >= pingInterval) {
lastActivity.set(curTime);
synchronized (out) {
- out.writeInt(PING_CALL_ID);
+ out.writeInt(pingRequest.size());
+ pingRequest.writeTo(out);
out.flush();
}
}
@@ -944,7 +983,8 @@ public class Client {
// Items '1' and '2' are prepared here.
final DataOutputBuffer d = new DataOutputBuffer();
RpcRequestHeaderProto header = ProtoUtil.makeRpcRequestHeader(
- call.rpcKind, OperationProto.RPC_FINAL_PACKET, call.id);
+ call.rpcKind, OperationProto.RPC_FINAL_PACKET, call.id, call.retry,
+ clientId);
header.writeDelimitedTo(d);
call.rpcRequest.write(d);
@@ -1009,9 +1049,8 @@ public class Client {
int totalLen = in.readInt();
RpcResponseHeaderProto header =
RpcResponseHeaderProto.parseDelimitedFrom(in);
- if (header == null) {
- throw new IOException("Response is null.");
- }
+ checkResponse(header);
+
int headerLen = header.getSerializedSize();
headerLen += CodedOutputStream.computeRawVarint32Size(headerLen);
@@ -1144,6 +1183,7 @@ public class Client {
CommonConfigurationKeys.IPC_CLIENT_CONNECT_TIMEOUT_DEFAULT);
this.fallbackAllowed = conf.getBoolean(CommonConfigurationKeys.IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_KEY,
CommonConfigurationKeys.IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_DEFAULT);
+ this.clientId = ClientId.getClientId();
this.sendParamsExecutor = clientExcecutorFactory.refAndGetInstance();
}
@@ -1198,7 +1238,7 @@ public class Client {
* for RPC_BUILTIN
*/
public Writable call(Writable param, InetSocketAddress address)
- throws InterruptedException, IOException {
+ throws IOException {
return call(RPC.RpcKind.RPC_BUILTIN, param, address);
}
@@ -1210,7 +1250,7 @@ public class Client {
*/
@Deprecated
public Writable call(RPC.RpcKind rpcKind, Writable param, InetSocketAddress address)
- throws InterruptedException, IOException {
+ throws IOException {
return call(rpcKind, param, address, null);
}
@@ -1224,8 +1264,7 @@ public class Client {
*/
@Deprecated
public Writable call(RPC.RpcKind rpcKind, Writable param, InetSocketAddress addr,
- UserGroupInformation ticket)
- throws InterruptedException, IOException {
+ UserGroupInformation ticket) throws IOException {
ConnectionId remoteId = ConnectionId.getConnectionId(addr, null, ticket, 0,
conf);
return call(rpcKind, param, remoteId);
@@ -1243,8 +1282,7 @@ public class Client {
@Deprecated
public Writable call(RPC.RpcKind rpcKind, Writable param, InetSocketAddress addr,
Class<?> protocol, UserGroupInformation ticket,
- int rpcTimeout)
- throws InterruptedException, IOException {
+ int rpcTimeout) throws IOException {
ConnectionId remoteId = ConnectionId.getConnectionId(addr, protocol,
ticket, rpcTimeout, conf);
return call(rpcKind, param, remoteId);
@@ -1258,8 +1296,7 @@ public class Client {
*/
public Writable call(Writable param, InetSocketAddress addr,
Class<?> protocol, UserGroupInformation ticket,
- int rpcTimeout, Configuration conf)
- throws InterruptedException, IOException {
+ int rpcTimeout, Configuration conf) throws IOException {
ConnectionId remoteId = ConnectionId.getConnectionId(addr, protocol,
ticket, rpcTimeout, conf);
return call(RPC.RpcKind.RPC_BUILTIN, param, remoteId);
@@ -1273,7 +1310,7 @@ public class Client {
public Writable call(Writable param, InetSocketAddress addr,
Class<?> protocol, UserGroupInformation ticket,
int rpcTimeout, int serviceClass, Configuration conf)
- throws InterruptedException, IOException {
+ throws IOException {
ConnectionId remoteId = ConnectionId.getConnectionId(addr, protocol,
ticket, rpcTimeout, conf);
return call(RPC.RpcKind.RPC_BUILTIN, param, remoteId, serviceClass);
@@ -1289,8 +1326,7 @@ public class Client {
*/
public Writable call(RPC.RpcKind rpcKind, Writable param, InetSocketAddress addr,
Class<?> protocol, UserGroupInformation ticket,
- int rpcTimeout, Configuration conf)
- throws InterruptedException, IOException {
+ int rpcTimeout, Configuration conf) throws IOException {
ConnectionId remoteId = ConnectionId.getConnectionId(addr, protocol,
ticket, rpcTimeout, conf);
return call(rpcKind, param, remoteId);
@@ -1300,8 +1336,8 @@ public class Client {
* Same as {link {@link #call(RPC.RpcKind, Writable, ConnectionId)}
* except the rpcKind is RPC_BUILTIN
*/
- public Writable call(Writable param, ConnectionId remoteId)
- throws InterruptedException, IOException {
+ public Writable call(Writable param, ConnectionId remoteId)
+ throws IOException {
return call(RPC.RpcKind.RPC_BUILTIN, param, remoteId);
}
@@ -1317,7 +1353,7 @@ public class Client {
* threw an exception.
*/
public Writable call(RPC.RpcKind rpcKind, Writable rpcRequest,
- ConnectionId remoteId) throws InterruptedException, IOException {
+ ConnectionId remoteId) throws IOException {
return call(rpcKind, rpcRequest, remoteId, RPC.RPC_SERVICE_CLASS_DEFAULT);
}
@@ -1334,9 +1370,8 @@ public class Client {
* threw an exception.
*/
public Writable call(RPC.RpcKind rpcKind, Writable rpcRequest,
- ConnectionId remoteId, int serviceClass)
- throws InterruptedException, IOException {
- Call call = new Call(rpcKind, rpcRequest);
+ ConnectionId remoteId, int serviceClass) throws IOException {
+ final Call call = createCall(rpcKind, rpcRequest);
Connection connection = getConnection(remoteId, call, serviceClass);
try {
connection.sendRpcRequest(call); // send the rpc request
@@ -1394,8 +1429,7 @@ public class Client {
/** Get a connection from the pool, or create a new one and add it to the
* pool. Connections to a given ConnectionId are reused. */
private Connection getConnection(ConnectionId remoteId,
- Call call, int serviceClass)
- throws IOException, InterruptedException {
+ Call call, int serviceClass) throws IOException {
if (!running.get()) {
// the client is stopped
throw new IOException("The client is stopped");
@@ -1435,7 +1469,6 @@ public class Client {
final Class<?> protocol;
private static final int PRIME = 16777619;
private final int rpcTimeout;
- private final String serverPrincipal;
private final int maxIdleTime; //connections will be culled if it was idle for
//maxIdleTime msecs
private final RetryPolicy connectionRetryPolicy;
@@ -1444,17 +1477,16 @@ public class Client {
private final boolean tcpNoDelay; // if T then disable Nagle's Algorithm
private final boolean doPing; //do we need to send ping message
private final int pingInterval; // how often sends ping to the server in msecs
+ private String saslQop; // here for testing
ConnectionId(InetSocketAddress address, Class<?> protocol,
- UserGroupInformation ticket, int rpcTimeout,
- String serverPrincipal, int maxIdleTime,
+ UserGroupInformation ticket, int rpcTimeout, int maxIdleTime,
RetryPolicy connectionRetryPolicy, int maxRetriesOnSocketTimeouts,
boolean tcpNoDelay, boolean doPing, int pingInterval) {
this.protocol = protocol;
this.address = address;
this.ticket = ticket;
this.rpcTimeout = rpcTimeout;
- this.serverPrincipal = serverPrincipal;
this.maxIdleTime = maxIdleTime;
this.connectionRetryPolicy = connectionRetryPolicy;
this.maxRetriesOnSocketTimeouts = maxRetriesOnSocketTimeouts;
@@ -1479,10 +1511,6 @@ public class Client {
return rpcTimeout;
}
- String getServerPrincipal() {
- return serverPrincipal;
- }
-
int getMaxIdleTime() {
return maxIdleTime;
}
@@ -1504,6 +1532,11 @@ public class Client {
return pingInterval;
}
+ @VisibleForTesting
+ String getSaslQop() {
+ return saslQop;
+ }
+
static ConnectionId getConnectionId(InetSocketAddress addr,
Class<?> protocol, UserGroupInformation ticket, int rpcTimeout,
Configuration conf) throws IOException {
@@ -1532,11 +1565,9 @@ public class Client {
max, 1, TimeUnit.SECONDS);
}
- String remotePrincipal = getRemotePrincipal(conf, addr, protocol);
boolean doPing =
conf.getBoolean(CommonConfigurationKeys.IPC_CLIENT_PING_KEY, true);
- return new ConnectionId(addr, protocol, ticket,
- rpcTimeout, remotePrincipal,
+ return new ConnectionId(addr, protocol, ticket, rpcTimeout,
conf.getInt(CommonConfigurationKeysPublic.IPC_CLIENT_CONNECTION_MAXIDLETIME_KEY,
CommonConfigurationKeysPublic.IPC_CLIENT_CONNECTION_MAXIDLETIME_DEFAULT),
connectionRetryPolicy,
@@ -1549,25 +1580,6 @@ public class Client {
(doPing ? Client.getPingInterval(conf) : 0));
}
- private static String getRemotePrincipal(Configuration conf,
- InetSocketAddress address, Class<?> protocol) throws IOException {
- if (!UserGroupInformation.isSecurityEnabled() || protocol == null) {
- return null;
- }
- KerberosInfo krbInfo = SecurityUtil.getKerberosInfo(protocol, conf);
- if (krbInfo != null) {
- String serverKey = krbInfo.serverPrincipal();
- if (serverKey == null) {
- throw new IOException(
- "Can't obtain server Kerberos config key from protocol="
- + protocol.getCanonicalName());
- }
- return SecurityUtil.getServerPrincipal(conf.get(serverKey), address
- .getAddress());
- }
- return null;
- }
-
static boolean isEqual(Object a, Object b) {
return a == null ? b == null : a.equals(b);
}
@@ -1586,7 +1598,6 @@ public class Client {
&& this.pingInterval == that.pingInterval
&& isEqual(this.protocol, that.protocol)
&& this.rpcTimeout == that.rpcTimeout
- && isEqual(this.serverPrincipal, that.serverPrincipal)
&& this.tcpNoDelay == that.tcpNoDelay
&& isEqual(this.ticket, that.ticket);
}
@@ -1602,8 +1613,6 @@ public class Client {
result = PRIME * result + pingInterval;
result = PRIME * result + ((protocol == null) ? 0 : protocol.hashCode());
result = PRIME * result + rpcTimeout;
- result = PRIME * result
- + ((serverPrincipal == null) ? 0 : serverPrincipal.hashCode());
result = PRIME * result + (tcpNoDelay ? 1231 : 1237);
result = PRIME * result + ((ticket == null) ? 0 : ticket.hashCode());
return result;
@@ -1611,7 +1620,21 @@ public class Client {
@Override
public String toString() {
- return serverPrincipal + "@" + address;
+ return address.toString();
}
}
+
+ /**
+ * Returns the next valid sequential call ID by incrementing an atomic counter
+ * and masking off the sign bit. Valid call IDs are non-negative integers in
+ * the range [ 0, 2^31 - 1 ]. Negative numbers are reserved for special
+ * purposes. The values can overflow back to 0 and be reused. Note that prior
+ * versions of the client did not mask off the sign bit, so a server may still
+ * see a negative call ID if it receives connections from an old client.
+ *
+ * @return next call ID
+ */
+ public static int nextCallId() {
+ return callIdCounter.getAndIncrement() & 0x7FFFFFFF;
+ }
}
Modified: hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java?rev=1513258&r1=1513257&r2=1513258&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java Mon Aug 12 21:25:49 2013
@@ -124,7 +124,7 @@ public class ProtobufRpcEngine implement
/**
* This constructor takes a connectionId, instead of creating a new one.
*/
- public Invoker(Class<?> protocol, Client.ConnectionId connId,
+ private Invoker(Class<?> protocol, Client.ConnectionId connId,
Configuration conf, SocketFactory factory) {
this.remoteId = connId;
this.client = CLIENTS.getClient(conf, factory, RpcResponseWrapper.class);
@@ -192,7 +192,6 @@ public class ProtobufRpcEngine implement
}
RequestHeaderProto rpcRequestHeader = constructRpcRequestHeader(method);
- RpcResponseWrapper val = null;
if (LOG.isTraceEnabled()) {
LOG.trace(Thread.currentThread().getId() + ": Call -> " +
@@ -202,6 +201,7 @@ public class ProtobufRpcEngine implement
Message theRequest = (Message) args[1];
+ final RpcResponseWrapper val;
try {
val = (RpcResponseWrapper) client.call(RPC.RpcKind.RPC_PROTOCOL_BUFFER,
new RpcRequestWrapper(rpcRequestHeader, theRequest), remoteId);
Modified: hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RPC.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RPC.java?rev=1513258&r1=1513257&r2=1513258&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RPC.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RPC.java Mon Aug 12 21:25:49 2013
@@ -642,104 +642,6 @@ public class RPC {
+ proxy.getClass());
}
- /** Construct a server for a protocol implementation instance listening on a
- * port and address.
- * @deprecated Please use {@link Builder} to build the {@link Server}
- */
- @Deprecated
- public static Server getServer(final Object instance, final String bindAddress, final int port, Configuration conf)
- throws IOException {
- return getServer(instance, bindAddress, port, 1, false, conf);
- }
-
- /** Construct a server for a protocol implementation instance listening on a
- * port and address.
- * @deprecated Please use {@link Builder} to build the {@link Server}
- */
- @Deprecated
- public static Server getServer(final Object instance, final String bindAddress, final int port,
- final int numHandlers,
- final boolean verbose, Configuration conf)
- throws IOException {
- return getServer(instance.getClass(), // use impl class for protocol
- instance, bindAddress, port, numHandlers, false, conf, null,
- null);
- }
-
- /** Construct a server for a protocol implementation instance.
- * @deprecated Please use {@link Builder} to build the {@link Server}
- */
- @Deprecated
- public static Server getServer(Class<?> protocol,
- Object instance, String bindAddress,
- int port, Configuration conf)
- throws IOException {
- return getServer(protocol, instance, bindAddress, port, 1, false, conf, null,
- null);
- }
-
- /** Construct a server for a protocol implementation instance.
- * @deprecated Please use {@link Builder} to build the {@link Server}
- */
- @Deprecated
- public static Server getServer(Class<?> protocol,
- Object instance, String bindAddress, int port,
- int numHandlers,
- boolean verbose, Configuration conf)
- throws IOException {
-
- return getServer(protocol, instance, bindAddress, port, numHandlers, verbose,
- conf, null, null);
- }
-
- /** Construct a server for a protocol implementation instance.
- * @deprecated Please use {@link Builder} to build the {@link Server}
- */
- @Deprecated
- public static Server getServer(Class<?> protocol,
- Object instance, String bindAddress, int port,
- int numHandlers,
- boolean verbose, Configuration conf,
- SecretManager<? extends TokenIdentifier> secretManager)
- throws IOException {
- return getServer(protocol, instance, bindAddress, port, numHandlers, verbose,
- conf, secretManager, null);
- }
-
- /**
- * @deprecated Please use {@link Builder} to build the {@link Server}
- */
- @Deprecated
- public static Server getServer(Class<?> protocol,
- Object instance, String bindAddress, int port,
- int numHandlers,
- boolean verbose, Configuration conf,
- SecretManager<? extends TokenIdentifier> secretManager,
- String portRangeConfig)
- throws IOException {
- return getProtocolEngine(protocol, conf)
- .getServer(protocol, instance, bindAddress, port, numHandlers, -1, -1,
- verbose, conf, secretManager, portRangeConfig);
- }
-
- /** Construct a server for a protocol implementation instance.
- * @deprecated Please use {@link Builder} to build the {@link Server}
- */
- @Deprecated
- public static <PROTO extends VersionedProtocol, IMPL extends PROTO>
- Server getServer(Class<PROTO> protocol,
- IMPL instance, String bindAddress, int port,
- int numHandlers, int numReaders, int queueSizePerHandler,
- boolean verbose, Configuration conf,
- SecretManager<? extends TokenIdentifier> secretManager)
- throws IOException {
-
- return getProtocolEngine(protocol, conf)
- .getServer(protocol, instance, bindAddress, port, numHandlers,
- numReaders, queueSizePerHandler, verbose, conf, secretManager,
- null);
- }
-
/**
* Class to construct instances of RPC server with specific options.
*/
@@ -913,7 +815,7 @@ public class RPC {
// Register protocol and its impl for rpc calls
void registerProtocolAndImpl(RpcKind rpcKind, Class<?> protocolClass,
- Object protocolImpl) throws IOException {
+ Object protocolImpl) {
String protocolName = RPC.getProtocolName(protocolClass);
long version;
@@ -943,8 +845,6 @@ public class RPC {
}
}
-
- @SuppressWarnings("unused") // will be useful later.
VerProtocolImpl[] getSupportedProtocolVersions(RPC.RpcKind rpcKind,
String protocolName) {
VerProtocolImpl[] resultk =
@@ -999,8 +899,7 @@ public class RPC {
initProtocolMetaInfo(conf);
}
- private void initProtocolMetaInfo(Configuration conf)
- throws IOException {
+ private void initProtocolMetaInfo(Configuration conf) {
RPC.setProtocolEngine(conf, ProtocolMetaInfoPB.class,
ProtobufRpcEngine.class);
ProtocolMetaInfoServerSideTranslatorPB xlator =
@@ -1018,7 +917,7 @@ public class RPC {
* @return the server (for convenience)
*/
public Server addProtocol(RpcKind rpcKind, Class<?> protocolClass,
- Object protocolImpl) throws IOException {
+ Object protocolImpl) {
registerProtocolAndImpl(rpcKind, protocolClass, protocolImpl);
return this;
}