You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by vi...@apache.org on 2013/08/12 23:26:02 UTC

svn commit: r1513258 [4/9] - in /hadoop/common/branches/YARN-321/hadoop-common-project: ./ hadoop-annotations/ hadoop-auth-examples/ hadoop-auth-examples/src/main/webapp/ hadoop-auth-examples/src/main/webapp/annonymous/ hadoop-auth-examples/src/main/we...

Modified: hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/RetryPolicies.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/RetryPolicies.java?rev=1513258&r1=1513257&r2=1513258&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/RetryPolicies.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/RetryPolicies.java Mon Aug 12 21:25:49 2013
@@ -153,7 +153,7 @@ public class RetryPolicies {
   static class TryOnceThenFail implements RetryPolicy {
     @Override
     public RetryAction shouldRetry(Exception e, int retries, int failovers,
-        boolean isMethodIdempotent) throws Exception {
+        boolean isIdempotentOrAtMostOnce) throws Exception {
       return RetryAction.FAIL;
     }
   }
@@ -161,7 +161,7 @@ public class RetryPolicies {
   static class RetryForever implements RetryPolicy {
     @Override
     public RetryAction shouldRetry(Exception e, int retries, int failovers,
-        boolean isMethodIdempotent) throws Exception {
+        boolean isIdempotentOrAtMostOnce) throws Exception {
       return RetryAction.RETRY;
     }
   }
@@ -196,7 +196,7 @@ public class RetryPolicies {
 
     @Override
     public RetryAction shouldRetry(Exception e, int retries, int failovers,
-        boolean isMethodIdempotent) throws Exception {
+        boolean isIdempotentOrAtMostOnce) throws Exception {
       if (retries >= maxRetries) {
         return RetryAction.FAIL;
       }
@@ -305,7 +305,7 @@ public class RetryPolicies {
 
     @Override
     public RetryAction shouldRetry(Exception e, int curRetry, int failovers,
-        boolean isMethodIdempotent) throws Exception {
+        boolean isIdempotentOrAtMostOnce) throws Exception {
       final Pair p = searchPair(curRetry);
       if (p == null) {
         //no more retries.
@@ -435,12 +435,12 @@ public class RetryPolicies {
 
     @Override
     public RetryAction shouldRetry(Exception e, int retries, int failovers,
-        boolean isMethodIdempotent) throws Exception {
+        boolean isIdempotentOrAtMostOnce) throws Exception {
       RetryPolicy policy = exceptionToPolicyMap.get(e.getClass());
       if (policy == null) {
         policy = defaultPolicy;
       }
-      return policy.shouldRetry(e, retries, failovers, isMethodIdempotent);
+      return policy.shouldRetry(e, retries, failovers, isIdempotentOrAtMostOnce);
     }
     
   }
@@ -463,7 +463,7 @@ public class RetryPolicies {
 
     @Override
     public RetryAction shouldRetry(Exception e, int retries, int failovers,
-        boolean isMethodIdempotent) throws Exception {
+        boolean isIdempotentOrAtMostOnce) throws Exception {
       RetryPolicy policy = null;
       if (e instanceof RemoteException) {
         policy = exceptionNameToPolicyMap.get(
@@ -472,7 +472,7 @@ public class RetryPolicies {
       if (policy == null) {
         policy = defaultPolicy;
       }
-      return policy.shouldRetry(e, retries, failovers, isMethodIdempotent);
+      return policy.shouldRetry(e, retries, failovers, isIdempotentOrAtMostOnce);
     }
   }
   
@@ -533,7 +533,7 @@ public class RetryPolicies {
 
     @Override
     public RetryAction shouldRetry(Exception e, int retries,
-        int failovers, boolean isMethodIdempotent) throws Exception {
+        int failovers, boolean isIdempotentOrAtMostOnce) throws Exception {
       if (failovers >= maxFailovers) {
         return new RetryAction(RetryAction.RetryDecision.FAIL, 0,
             "failovers (" + failovers + ") exceeded maximum allowed ("
@@ -553,7 +553,7 @@ public class RetryPolicies {
                 calculateExponentialTime(delayMillis, failovers, maxDelayBase));
       } else if (e instanceof SocketException ||
                  (e instanceof IOException && !(e instanceof RemoteException))) {
-        if (isMethodIdempotent) {
+        if (isIdempotentOrAtMostOnce) {
           return RetryAction.FAILOVER_AND_RETRY;
         } else {
           return new RetryAction(RetryAction.RetryDecision.FAIL, 0,
@@ -562,7 +562,7 @@ public class RetryPolicies {
         }
       } else {
         return fallbackPolicy.shouldRetry(e, retries, failovers,
-            isMethodIdempotent);
+            isIdempotentOrAtMostOnce);
       }
     }
     

Modified: hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/RetryPolicy.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/RetryPolicy.java?rev=1513258&r1=1513257&r2=1513258&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/RetryPolicy.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/RetryPolicy.java Mon Aug 12 21:25:49 2013
@@ -75,24 +75,25 @@ public interface RetryPolicy {
   
   /**
    * <p>
-   * Determines whether the framework should retry a
-   * method for the given exception, and the number
-   * of retries that have been made for that operation
+   * Determines whether the framework should retry a method for the given
+   * exception, and the number of retries that have been made for that operation
    * so far.
    * </p>
+   * 
    * @param e The exception that caused the method to fail
    * @param retries The number of times the method has been retried
    * @param failovers The number of times the method has failed over to a
-   *   different backend implementation
-   * @param isMethodIdempotent <code>true</code> if the method is idempotent
-   *   and so can reasonably be retried on failover when we don't know if the
-   *   previous attempt reached the server or not
+   *          different backend implementation
+   * @param isIdempotentOrAtMostOnce <code>true</code> if the method is
+   *          {@link Idempotent} or {@link AtMostOnce} and so can reasonably be
+   *          retried on failover when we don't know if the previous attempt
+   *          reached the server or not
    * @return <code>true</code> if the method should be retried,
-   *   <code>false</code> if the method should not be retried
-   *   but shouldn't fail with an exception (only for void methods)
-   * @throws Exception The re-thrown exception <code>e</code> indicating
-   *   that the method failed and should not be retried further
+   *         <code>false</code> if the method should not be retried but
+   *         shouldn't fail with an exception (only for void methods)
+   * @throws Exception The re-thrown exception <code>e</code> indicating that
+   *           the method failed and should not be retried further
    */
   public RetryAction shouldRetry(Exception e, int retries, int failovers,
-      boolean isMethodIdempotent) throws Exception;
+      boolean isIdempotentOrAtMostOnce) throws Exception;
 }

Modified: hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/RetryProxy.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/RetryProxy.java?rev=1513258&r1=1513257&r2=1513258&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/RetryProxy.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/RetryProxy.java Mon Aug 12 21:25:49 2013
@@ -36,10 +36,10 @@ public class RetryProxy {
    * @param retryPolicy the policy for retrying method call failures
    * @return the retry proxy
    */
-  public static Object create(Class<?> iface, Object implementation,
+  public static <T> Object create(Class<T> iface, T implementation,
                               RetryPolicy retryPolicy) {
     return RetryProxy.create(iface,
-        new DefaultFailoverProxyProvider(iface, implementation),
+        new DefaultFailoverProxyProvider<T>(iface, implementation),
         retryPolicy);
   }
 
@@ -53,12 +53,12 @@ public class RetryProxy {
    * @param retryPolicy the policy for retrying or failing over method call failures
    * @return the retry proxy
    */
-  public static Object create(Class<?> iface, FailoverProxyProvider proxyProvider,
-      RetryPolicy retryPolicy) {
+  public static <T> Object create(Class<T> iface,
+      FailoverProxyProvider<T> proxyProvider, RetryPolicy retryPolicy) {
     return Proxy.newProxyInstance(
         proxyProvider.getInterface().getClassLoader(),
         new Class<?>[] { iface },
-        new RetryInvocationHandler(proxyProvider, retryPolicy)
+        new RetryInvocationHandler<T>(proxyProvider, retryPolicy)
         );
   }
   
@@ -73,10 +73,10 @@ public class RetryProxy {
    * @param methodNameToPolicyMap a map of method names to retry policies
    * @return the retry proxy
    */
-  public static Object create(Class<?> iface, Object implementation,
+  public static <T> Object create(Class<T> iface, T implementation,
                               Map<String,RetryPolicy> methodNameToPolicyMap) {
     return create(iface,
-        new DefaultFailoverProxyProvider(iface, implementation),
+        new DefaultFailoverProxyProvider<T>(iface, implementation),
         methodNameToPolicyMap,
         RetryPolicies.TRY_ONCE_THEN_FAIL);
   }
@@ -92,13 +92,14 @@ public class RetryProxy {
    * @param methodNameToPolicyMapa map of method names to retry policies
    * @return the retry proxy
    */
-  public static Object create(Class<?> iface, FailoverProxyProvider proxyProvider,
+  public static <T> Object create(Class<T> iface,
+      FailoverProxyProvider<T> proxyProvider,
       Map<String,RetryPolicy> methodNameToPolicyMap,
       RetryPolicy defaultPolicy) {
     return Proxy.newProxyInstance(
         proxyProvider.getInterface().getClassLoader(),
         new Class<?>[] { iface },
-        new RetryInvocationHandler(proxyProvider, defaultPolicy,
+        new RetryInvocationHandler<T>(proxyProvider, defaultPolicy,
             methodNameToPolicyMap)
         );
   }

Propchange: hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/package.html
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/serializer/avro/package.html
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/serializer/package.html
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java?rev=1513258&r1=1513257&r2=1513258&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java Mon Aug 12 21:25:49 2013
@@ -18,8 +18,11 @@
 
 package org.apache.hadoop.ipc;
 
+import static org.apache.hadoop.ipc.RpcConstants.*;
+
 import java.io.BufferedInputStream;
 import java.io.BufferedOutputStream;
+import java.io.ByteArrayOutputStream;
 import java.io.DataInputStream;
 import java.io.DataOutputStream;
 import java.io.FilterInputStream;
@@ -33,6 +36,7 @@ import java.net.Socket;
 import java.net.SocketTimeoutException;
 import java.net.UnknownHostException;
 import java.security.PrivilegedExceptionAction;
+import java.util.Arrays;
 import java.util.Hashtable;
 import java.util.Iterator;
 import java.util.Map.Entry;
@@ -45,9 +49,11 @@ import java.util.concurrent.Future;
 import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 
 import javax.net.SocketFactory;
+import javax.security.sasl.Sasl;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -62,7 +68,10 @@ import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.retry.RetryPolicies;
 import org.apache.hadoop.io.retry.RetryPolicy;
 import org.apache.hadoop.io.retry.RetryPolicy.RetryAction;
+import org.apache.hadoop.ipc.ProtobufRpcEngine.RpcRequestMessageWrapper;
+import org.apache.hadoop.ipc.RPC.RpcKind;
 import org.apache.hadoop.ipc.Server.AuthProtocol;
+import org.apache.hadoop.ipc.protobuf.IpcConnectionContextProtos.IpcConnectionContextProto;
 import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcRequestHeaderProto;
 import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcRequestHeaderProto.OperationProto;
 import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcResponseHeaderProto;
@@ -75,15 +84,13 @@ import org.apache.hadoop.security.SaslRp
 import org.apache.hadoop.security.SaslRpcServer.AuthMethod;
 import org.apache.hadoop.security.SecurityUtil;
 import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod;
-import org.apache.hadoop.security.token.Token;
-import org.apache.hadoop.security.token.TokenIdentifier;
-import org.apache.hadoop.security.token.TokenInfo;
-import org.apache.hadoop.security.token.TokenSelector;
 import org.apache.hadoop.util.ProtoUtil;
 import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.util.Time;
 
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import com.google.protobuf.CodedOutputStream;
 
@@ -97,11 +104,26 @@ public class Client {
   
   public static final Log LOG = LogFactory.getLog(Client.class);
 
+  /** A counter for generating call IDs. */
+  private static final AtomicInteger callIdCounter = new AtomicInteger();
+
+  private static final ThreadLocal<Integer> callId = new ThreadLocal<Integer>();
+  private static final ThreadLocal<Integer> retryCount = new ThreadLocal<Integer>();
+
+  /** Set call id and retry count for the next call. */
+  public static void setCallIdAndRetryCount(int cid, int rc) {
+    Preconditions.checkArgument(cid != RpcConstants.INVALID_CALL_ID);
+    Preconditions.checkState(callId.get() == null);
+    Preconditions.checkArgument(rc != RpcConstants.INVALID_RETRY_COUNT);
+
+    callId.set(cid);
+    retryCount.set(rc);
+  }
+
   private Hashtable<ConnectionId, Connection> connections =
     new Hashtable<ConnectionId, Connection>();
 
   private Class<? extends Writable> valueClass;   // class of call values
-  private int counter;                            // counter for call ids
   private AtomicBoolean running = new AtomicBoolean(true); // if client runs
   final private Configuration conf;
 
@@ -111,8 +133,9 @@ public class Client {
   private final int connectionTimeout;
 
   private final boolean fallbackAllowed;
+  private final byte[] clientId;
   
-  final static int PING_CALL_ID = -1;
+  final static int CONNECTION_CONTEXT_CALL_ID = -3;
   
   /**
    * Executor on which IPC calls' parameters are sent.
@@ -251,23 +274,58 @@ public class Client {
   synchronized boolean isZeroReference() {
     return refCount==0;
   }
-  
+
+  /** Check the rpc response header. */
+  void checkResponse(RpcResponseHeaderProto header) throws IOException {
+    if (header == null) {
+      throw new IOException("Response is null.");
+    }
+    if (header.hasClientId()) {
+      // check client IDs
+      final byte[] id = header.getClientId().toByteArray();
+      if (!Arrays.equals(id, RpcConstants.DUMMY_CLIENT_ID)) {
+        if (!Arrays.equals(id, clientId)) {
+          throw new IOException("Client IDs not matched: local ID="
+              + StringUtils.byteToHexString(clientId) + ", ID in reponse="
+              + StringUtils.byteToHexString(header.getClientId().toByteArray()));
+        }
+      }
+    }
+  }
+
+  Call createCall(RPC.RpcKind rpcKind, Writable rpcRequest) {
+    return new Call(rpcKind, rpcRequest);
+  }
+
   /** 
    * Class that represents an RPC call
    */
-  private class Call {
+  static class Call {
     final int id;               // call id
+    final int retry;           // retry count
     final Writable rpcRequest;  // the serialized rpc request
     Writable rpcResponse;       // null if rpc has error
     IOException error;          // exception, null if success
     final RPC.RpcKind rpcKind;      // Rpc EngineKind
     boolean done;               // true when call is done
 
-    protected Call(RPC.RpcKind rpcKind, Writable param) {
+    private Call(RPC.RpcKind rpcKind, Writable param) {
       this.rpcKind = rpcKind;
       this.rpcRequest = param;
-      synchronized (Client.this) {
-        this.id = counter++;
+
+      final Integer id = callId.get();
+      if (id == null) {
+        this.id = nextCallId();
+      } else {
+        callId.set(null);
+        this.id = id;
+      }
+      
+      final Integer rc = retryCount.get();
+      if (rc == null) {
+        this.retry = 0;
+      } else {
+        this.retry = rc;
       }
     }
 
@@ -308,10 +366,9 @@ public class Client {
    * socket: responses may be delivered out of order. */
   private class Connection extends Thread {
     private InetSocketAddress server;             // server ip:port
-    private String serverPrincipal;  // server's krb5 principal name
     private final ConnectionId remoteId;                // connection id
     private AuthMethod authMethod; // authentication method
-    private Token<? extends TokenIdentifier> token;
+    private AuthProtocol authProtocol;
     private int serviceClass;
     private SaslRpcClient saslRpcClient;
     
@@ -326,6 +383,7 @@ public class Client {
     private boolean tcpNoDelay; // if T then disable Nagle's Algorithm
     private boolean doPing; //do we need to send ping message
     private int pingInterval; // how often sends ping to the server in msecs
+    private ByteArrayOutputStream pingRequest; // ping message
     
     // currently active calls
     private Hashtable<Integer, Call> calls = new Hashtable<Integer, Call>();
@@ -351,6 +409,15 @@ public class Client {
       this.maxRetriesOnSocketTimeouts = remoteId.getMaxRetriesOnSocketTimeouts();
       this.tcpNoDelay = remoteId.getTcpNoDelay();
       this.doPing = remoteId.getDoPing();
+      if (doPing) {
+        // construct a RPC header with the callId as the ping callId
+        pingRequest = new ByteArrayOutputStream();
+        RpcRequestHeaderProto pingHeader = ProtoUtil
+            .makeRpcRequestHeader(RpcKind.RPC_PROTOCOL_BUFFER,
+                OperationProto.RPC_FINAL_PACKET, PING_CALL_ID,
+                RpcConstants.INVALID_RETRY_COUNT, clientId);
+        pingHeader.writeDelimitedTo(pingRequest);
+      }
       this.pingInterval = remoteId.getPingInterval();
       this.serviceClass = serviceClass;
       if (LOG.isDebugEnabled()) {
@@ -358,45 +425,11 @@ public class Client {
       }
 
       UserGroupInformation ticket = remoteId.getTicket();
-      Class<?> protocol = remoteId.getProtocol();
-      if (protocol != null) {
-        TokenInfo tokenInfo = SecurityUtil.getTokenInfo(protocol, conf);
-        if (tokenInfo != null) {
-          TokenSelector<? extends TokenIdentifier> tokenSelector = null;
-          try {
-            tokenSelector = tokenInfo.value().newInstance();
-          } catch (InstantiationException e) {
-            throw new IOException(e.toString());
-          } catch (IllegalAccessException e) {
-            throw new IOException(e.toString());
-          }
-          token = tokenSelector.selectToken(
-              SecurityUtil.buildTokenService(server),
-              ticket.getTokens());
-        }
-        KerberosInfo krbInfo = SecurityUtil.getKerberosInfo(protocol, conf);
-        if (krbInfo != null) {
-          serverPrincipal = remoteId.getServerPrincipal();
-          if (LOG.isDebugEnabled()) {
-            LOG.debug("RPC Server's Kerberos principal name for protocol="
-                + protocol.getCanonicalName() + " is " + serverPrincipal);
-          }
-        }
-      }
-      
-      AuthenticationMethod authentication;
-      if (token != null) {
-        authentication = AuthenticationMethod.TOKEN;
-      } else if (ticket != null) {
-        authentication = ticket.getRealAuthenticationMethod();
-      } else { // this only happens in lazy tests
-        authentication = AuthenticationMethod.SIMPLE;
-      }
-      authMethod = authentication.getAuthMethod();
-      
-      if (LOG.isDebugEnabled())
-        LOG.debug("Use " + authMethod + " authentication for protocol "
-            + protocol.getSimpleName());
+      // try SASL if security is enabled or if the ugi contains tokens.
+      // this causes a SIMPLE client with tokens to attempt SASL
+      boolean trySasl = UserGroupInformation.isSecurityEnabled() ||
+                        (ticket != null && !ticket.getTokens().isEmpty());
+      this.authProtocol = trySasl ? AuthProtocol.SASL : AuthProtocol.NONE;
       
       this.setName("IPC Client (" + socketFactory.hashCode() +") connection to " +
           server.toString() +
@@ -507,11 +540,10 @@ public class Client {
       return false;
     }
     
-    private synchronized boolean setupSaslConnection(final InputStream in2, 
-        final OutputStream out2) 
-        throws IOException {
-      saslRpcClient = new SaslRpcClient(authMethod, token, serverPrincipal,
-          fallbackAllowed);
+    private synchronized AuthMethod setupSaslConnection(final InputStream in2, 
+        final OutputStream out2) throws IOException, InterruptedException {
+      saslRpcClient = new SaslRpcClient(remoteId.getTicket(),
+          remoteId.getProtocol(), remoteId.getAddress(), conf);
       return saslRpcClient.saslConnect(in2, out2);
     }
 
@@ -549,7 +581,8 @@ public class Client {
            * client, to ensure Server matching address of the client connection
            * to host name in principal passed.
            */
-          if (UserGroupInformation.isSecurityEnabled()) {
+          UserGroupInformation ticket = remoteId.getTicket();
+          if (ticket != null && ticket.hasKerberosCredentials()) {
             KerberosInfo krbInfo = 
               remoteId.getProtocol().getAnnotation(KerberosInfo.class);
             if (krbInfo != null && krbInfo.clientPrincipal() != null) {
@@ -627,7 +660,7 @@ public class Client {
             } else {
               String msg = "Couldn't setup connection for "
                   + UserGroupInformation.getLoginUser().getUserName() + " to "
-                  + serverPrincipal;
+                  + remoteId;
               LOG.warn(msg);
               throw (IOException) new IOException(msg).initCause(ex);
             }
@@ -647,7 +680,7 @@ public class Client {
      * a header to the server and starts
      * the connection thread that waits for responses.
      */
-    private synchronized void setupIOstreams() throws InterruptedException {
+    private synchronized void setupIOstreams() {
       if (socket != null || shouldCloseConnection.get()) {
         return;
       } 
@@ -663,23 +696,24 @@ public class Client {
           InputStream inStream = NetUtils.getInputStream(socket);
           OutputStream outStream = NetUtils.getOutputStream(socket);
           writeConnectionHeader(outStream);
-          if (authMethod != AuthMethod.SIMPLE) {
+          if (authProtocol == AuthProtocol.SASL) {
             final InputStream in2 = inStream;
             final OutputStream out2 = outStream;
             UserGroupInformation ticket = remoteId.getTicket();
             if (ticket.getRealUser() != null) {
               ticket = ticket.getRealUser();
             }
-            boolean continueSasl = false;
             try {
-              continueSasl = ticket
-                  .doAs(new PrivilegedExceptionAction<Boolean>() {
+              authMethod = ticket
+                  .doAs(new PrivilegedExceptionAction<AuthMethod>() {
                     @Override
-                    public Boolean run() throws IOException {
+                    public AuthMethod run()
+                        throws IOException, InterruptedException {
                       return setupSaslConnection(in2, out2);
                     }
                   });
             } catch (Exception ex) {
+              authMethod = saslRpcClient.getAuthMethod();
               if (rand == null) {
                 rand = new Random();
               }
@@ -687,23 +721,32 @@ public class Client {
                   ticket);
               continue;
             }
-            if (continueSasl) {
+            if (authMethod != AuthMethod.SIMPLE) {
               // Sasl connect is successful. Let's set up Sasl i/o streams.
               inStream = saslRpcClient.getInputStream(inStream);
               outStream = saslRpcClient.getOutputStream(outStream);
-            } else {
-              // fall back to simple auth because server told us so.
-              authMethod = AuthMethod.SIMPLE;
+              // for testing
+              remoteId.saslQop =
+                  (String)saslRpcClient.getNegotiatedProperty(Sasl.QOP);
+            } else if (UserGroupInformation.isSecurityEnabled() &&
+                       !fallbackAllowed) {
+              throw new IOException("Server asks us to fall back to SIMPLE " +
+                  "auth, but this client is configured to only allow secure " +
+                  "connections.");
             }
           }
         
           if (doPing) {
-            this.in = new DataInputStream(new BufferedInputStream(
-                new PingInputStream(inStream)));
-          } else {
-            this.in = new DataInputStream(new BufferedInputStream(inStream));
+            inStream = new PingInputStream(inStream);
           }
-          this.out = new DataOutputStream(new BufferedOutputStream(outStream));
+          this.in = new DataInputStream(new BufferedInputStream(inStream));
+
+          // SASL may have already buffered the stream
+          if (!(outStream instanceof BufferedOutputStream)) {
+            outStream = new BufferedOutputStream(outStream);
+          }
+          this.out = new DataOutputStream(outStream);
+          
           writeConnectionContext(remoteId, authMethod);
 
           // update last activity time
@@ -810,17 +853,9 @@ public class Client {
         throws IOException {
       DataOutputStream out = new DataOutputStream(new BufferedOutputStream(outStream));
       // Write out the header, version and authentication method
-      out.write(Server.HEADER.array());
-      out.write(Server.CURRENT_VERSION);
+      out.write(RpcConstants.HEADER.array());
+      out.write(RpcConstants.CURRENT_VERSION);
       out.write(serviceClass);
-      final AuthProtocol authProtocol;
-      switch (authMethod) {
-        case SIMPLE:
-          authProtocol = AuthProtocol.NONE;
-          break;
-        default:
-          authProtocol = AuthProtocol.SASL;
-      }
       out.write(authProtocol.callId);
       out.flush();
     }
@@ -832,17 +867,20 @@ public class Client {
                                         AuthMethod authMethod)
                                             throws IOException {
       // Write out the ConnectionHeader
-      DataOutputBuffer buf = new DataOutputBuffer();
-      ProtoUtil.makeIpcConnectionContext(
+      IpcConnectionContextProto message = ProtoUtil.makeIpcConnectionContext(
           RPC.getProtocolName(remoteId.getProtocol()),
           remoteId.getTicket(),
-          authMethod).writeTo(buf);
+          authMethod);
+      RpcRequestHeaderProto connectionContextHeader = ProtoUtil
+          .makeRpcRequestHeader(RpcKind.RPC_PROTOCOL_BUFFER,
+              OperationProto.RPC_FINAL_PACKET, CONNECTION_CONTEXT_CALL_ID,
+              RpcConstants.INVALID_RETRY_COUNT, clientId);
+      RpcRequestMessageWrapper request =
+          new RpcRequestMessageWrapper(connectionContextHeader, message);
       
       // Write out the packet length
-      int bufLen = buf.getLength();
-
-      out.writeInt(bufLen);
-      out.write(buf.getData(), 0, bufLen);
+      out.writeInt(request.getLength());
+      request.write(out);
     }
     
     /* wait till someone signals us to start reading RPC response or
@@ -888,7 +926,8 @@ public class Client {
       if ( curTime - lastActivity.get() >= pingInterval) {
         lastActivity.set(curTime);
         synchronized (out) {
-          out.writeInt(PING_CALL_ID);
+          out.writeInt(pingRequest.size());
+          pingRequest.writeTo(out);
           out.flush();
         }
       }
@@ -944,7 +983,8 @@ public class Client {
       // Items '1' and '2' are prepared here. 
       final DataOutputBuffer d = new DataOutputBuffer();
       RpcRequestHeaderProto header = ProtoUtil.makeRpcRequestHeader(
-         call.rpcKind, OperationProto.RPC_FINAL_PACKET, call.id);
+          call.rpcKind, OperationProto.RPC_FINAL_PACKET, call.id, call.retry,
+          clientId);
       header.writeDelimitedTo(d);
       call.rpcRequest.write(d);
 
@@ -1009,9 +1049,8 @@ public class Client {
         int totalLen = in.readInt();
         RpcResponseHeaderProto header = 
             RpcResponseHeaderProto.parseDelimitedFrom(in);
-        if (header == null) {
-          throw new IOException("Response is null.");
-        }
+        checkResponse(header);
+
         int headerLen = header.getSerializedSize();
         headerLen += CodedOutputStream.computeRawVarint32Size(headerLen);
 
@@ -1144,6 +1183,7 @@ public class Client {
         CommonConfigurationKeys.IPC_CLIENT_CONNECT_TIMEOUT_DEFAULT);
     this.fallbackAllowed = conf.getBoolean(CommonConfigurationKeys.IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_KEY,
         CommonConfigurationKeys.IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_DEFAULT);
+    this.clientId = ClientId.getClientId();
     this.sendParamsExecutor = clientExcecutorFactory.refAndGetInstance();
   }
 
@@ -1198,7 +1238,7 @@ public class Client {
    *  for RPC_BUILTIN
    */
   public Writable call(Writable param, InetSocketAddress address)
-  throws InterruptedException, IOException {
+      throws IOException {
     return call(RPC.RpcKind.RPC_BUILTIN, param, address);
     
   }
@@ -1210,7 +1250,7 @@ public class Client {
    */
   @Deprecated
   public Writable call(RPC.RpcKind rpcKind, Writable param, InetSocketAddress address)
-  throws InterruptedException, IOException {
+  throws IOException {
       return call(rpcKind, param, address, null);
   }
   
@@ -1224,8 +1264,7 @@ public class Client {
    */
   @Deprecated
   public Writable call(RPC.RpcKind rpcKind, Writable param, InetSocketAddress addr, 
-      UserGroupInformation ticket)  
-      throws InterruptedException, IOException {
+      UserGroupInformation ticket) throws IOException {
     ConnectionId remoteId = ConnectionId.getConnectionId(addr, null, ticket, 0,
         conf);
     return call(rpcKind, param, remoteId);
@@ -1243,8 +1282,7 @@ public class Client {
   @Deprecated
   public Writable call(RPC.RpcKind rpcKind, Writable param, InetSocketAddress addr, 
                        Class<?> protocol, UserGroupInformation ticket,
-                       int rpcTimeout)  
-                       throws InterruptedException, IOException {
+                       int rpcTimeout) throws IOException {
     ConnectionId remoteId = ConnectionId.getConnectionId(addr, protocol,
         ticket, rpcTimeout, conf);
     return call(rpcKind, param, remoteId);
@@ -1258,8 +1296,7 @@ public class Client {
    */
   public Writable call(Writable param, InetSocketAddress addr,
       Class<?> protocol, UserGroupInformation ticket,
-      int rpcTimeout, Configuration conf)
-      throws InterruptedException, IOException {
+      int rpcTimeout, Configuration conf) throws IOException {
     ConnectionId remoteId = ConnectionId.getConnectionId(addr, protocol,
         ticket, rpcTimeout, conf);
     return call(RPC.RpcKind.RPC_BUILTIN, param, remoteId);
@@ -1273,7 +1310,7 @@ public class Client {
   public Writable call(Writable param, InetSocketAddress addr,
       Class<?> protocol, UserGroupInformation ticket,
       int rpcTimeout, int serviceClass, Configuration conf)
-      throws InterruptedException, IOException {
+      throws IOException {
     ConnectionId remoteId = ConnectionId.getConnectionId(addr, protocol,
         ticket, rpcTimeout, conf);
     return call(RPC.RpcKind.RPC_BUILTIN, param, remoteId, serviceClass);
@@ -1289,8 +1326,7 @@ public class Client {
    */
   public Writable call(RPC.RpcKind rpcKind, Writable param, InetSocketAddress addr, 
                        Class<?> protocol, UserGroupInformation ticket,
-                       int rpcTimeout, Configuration conf)  
-                       throws InterruptedException, IOException {
+                       int rpcTimeout, Configuration conf) throws IOException {
     ConnectionId remoteId = ConnectionId.getConnectionId(addr, protocol,
         ticket, rpcTimeout, conf);
     return call(rpcKind, param, remoteId);
@@ -1300,8 +1336,8 @@ public class Client {
    * Same as {link {@link #call(RPC.RpcKind, Writable, ConnectionId)}
    * except the rpcKind is RPC_BUILTIN
    */
-  public Writable call(Writable param, ConnectionId remoteId)  
-      throws InterruptedException, IOException {
+  public Writable call(Writable param, ConnectionId remoteId)
+      throws IOException {
      return call(RPC.RpcKind.RPC_BUILTIN, param, remoteId);
   }
   
@@ -1317,7 +1353,7 @@ public class Client {
    * threw an exception.
    */
   public Writable call(RPC.RpcKind rpcKind, Writable rpcRequest,
-      ConnectionId remoteId) throws InterruptedException, IOException {
+      ConnectionId remoteId) throws IOException {
     return call(rpcKind, rpcRequest, remoteId, RPC.RPC_SERVICE_CLASS_DEFAULT);
   }
 
@@ -1334,9 +1370,8 @@ public class Client {
    * threw an exception.
    */
   public Writable call(RPC.RpcKind rpcKind, Writable rpcRequest,
-      ConnectionId remoteId, int serviceClass)
-      throws InterruptedException, IOException {
-    Call call = new Call(rpcKind, rpcRequest);
+      ConnectionId remoteId, int serviceClass) throws IOException {
+    final Call call = createCall(rpcKind, rpcRequest);
     Connection connection = getConnection(remoteId, call, serviceClass);
     try {
       connection.sendRpcRequest(call);                 // send the rpc request
@@ -1394,8 +1429,7 @@ public class Client {
   /** Get a connection from the pool, or create a new one and add it to the
    * pool.  Connections to a given ConnectionId are reused. */
   private Connection getConnection(ConnectionId remoteId,
-                                   Call call, int serviceClass)
-                                   throws IOException, InterruptedException {
+      Call call, int serviceClass) throws IOException {
     if (!running.get()) {
       // the client is stopped
       throw new IOException("The client is stopped");
@@ -1435,7 +1469,6 @@ public class Client {
     final Class<?> protocol;
     private static final int PRIME = 16777619;
     private final int rpcTimeout;
-    private final String serverPrincipal;
     private final int maxIdleTime; //connections will be culled if it was idle for 
     //maxIdleTime msecs
     private final RetryPolicy connectionRetryPolicy;
@@ -1444,17 +1477,16 @@ public class Client {
     private final boolean tcpNoDelay; // if T then disable Nagle's Algorithm
     private final boolean doPing; //do we need to send ping message
     private final int pingInterval; // how often sends ping to the server in msecs
+    private String saslQop; // here for testing
     
     ConnectionId(InetSocketAddress address, Class<?> protocol, 
-                 UserGroupInformation ticket, int rpcTimeout,
-                 String serverPrincipal, int maxIdleTime, 
+                 UserGroupInformation ticket, int rpcTimeout, int maxIdleTime, 
                  RetryPolicy connectionRetryPolicy, int maxRetriesOnSocketTimeouts,
                  boolean tcpNoDelay, boolean doPing, int pingInterval) {
       this.protocol = protocol;
       this.address = address;
       this.ticket = ticket;
       this.rpcTimeout = rpcTimeout;
-      this.serverPrincipal = serverPrincipal;
       this.maxIdleTime = maxIdleTime;
       this.connectionRetryPolicy = connectionRetryPolicy;
       this.maxRetriesOnSocketTimeouts = maxRetriesOnSocketTimeouts;
@@ -1479,10 +1511,6 @@ public class Client {
       return rpcTimeout;
     }
     
-    String getServerPrincipal() {
-      return serverPrincipal;
-    }
-    
     int getMaxIdleTime() {
       return maxIdleTime;
     }
@@ -1504,6 +1532,11 @@ public class Client {
       return pingInterval;
     }
     
+    @VisibleForTesting
+    String getSaslQop() {
+      return saslQop;
+    }
+    
     static ConnectionId getConnectionId(InetSocketAddress addr,
         Class<?> protocol, UserGroupInformation ticket, int rpcTimeout,
         Configuration conf) throws IOException {
@@ -1532,11 +1565,9 @@ public class Client {
             max, 1, TimeUnit.SECONDS);
       }
 
-      String remotePrincipal = getRemotePrincipal(conf, addr, protocol);
       boolean doPing =
         conf.getBoolean(CommonConfigurationKeys.IPC_CLIENT_PING_KEY, true);
-      return new ConnectionId(addr, protocol, ticket,
-          rpcTimeout, remotePrincipal,
+      return new ConnectionId(addr, protocol, ticket, rpcTimeout,
           conf.getInt(CommonConfigurationKeysPublic.IPC_CLIENT_CONNECTION_MAXIDLETIME_KEY,
               CommonConfigurationKeysPublic.IPC_CLIENT_CONNECTION_MAXIDLETIME_DEFAULT),
           connectionRetryPolicy,
@@ -1549,25 +1580,6 @@ public class Client {
           (doPing ? Client.getPingInterval(conf) : 0));
     }
     
-    private static String getRemotePrincipal(Configuration conf,
-        InetSocketAddress address, Class<?> protocol) throws IOException {
-      if (!UserGroupInformation.isSecurityEnabled() || protocol == null) {
-        return null;
-      }
-      KerberosInfo krbInfo = SecurityUtil.getKerberosInfo(protocol, conf);
-      if (krbInfo != null) {
-        String serverKey = krbInfo.serverPrincipal();
-        if (serverKey == null) {
-          throw new IOException(
-              "Can't obtain server Kerberos config key from protocol="
-                  + protocol.getCanonicalName());
-        }
-        return SecurityUtil.getServerPrincipal(conf.get(serverKey), address
-            .getAddress());
-      }
-      return null;
-    }
-    
     static boolean isEqual(Object a, Object b) {
       return a == null ? b == null : a.equals(b);
     }
@@ -1586,7 +1598,6 @@ public class Client {
             && this.pingInterval == that.pingInterval
             && isEqual(this.protocol, that.protocol)
             && this.rpcTimeout == that.rpcTimeout
-            && isEqual(this.serverPrincipal, that.serverPrincipal)
             && this.tcpNoDelay == that.tcpNoDelay
             && isEqual(this.ticket, that.ticket);
       }
@@ -1602,8 +1613,6 @@ public class Client {
       result = PRIME * result + pingInterval;
       result = PRIME * result + ((protocol == null) ? 0 : protocol.hashCode());
       result = PRIME * result + rpcTimeout;
-      result = PRIME * result
-          + ((serverPrincipal == null) ? 0 : serverPrincipal.hashCode());
       result = PRIME * result + (tcpNoDelay ? 1231 : 1237);
       result = PRIME * result + ((ticket == null) ? 0 : ticket.hashCode());
       return result;
@@ -1611,7 +1620,21 @@ public class Client {
     
     @Override
     public String toString() {
-      return serverPrincipal + "@" + address;
+      return address.toString();
     }
   }  
+
+  /**
+   * Returns the next valid sequential call ID by incrementing an atomic counter
+   * and masking off the sign bit.  Valid call IDs are non-negative integers in
+   * the range [ 0, 2^31 - 1 ].  Negative numbers are reserved for special
+   * purposes.  The values can overflow back to 0 and be reused.  Note that prior
+   * versions of the client did not mask off the sign bit, so a server may still
+   * see a negative call ID if it receives connections from an old client.
+   * 
+   * @return next call ID
+   */
+  public static int nextCallId() {
+    return callIdCounter.getAndIncrement() & 0x7FFFFFFF;
+  }
 }

Modified: hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java?rev=1513258&r1=1513257&r2=1513258&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java Mon Aug 12 21:25:49 2013
@@ -124,7 +124,7 @@ public class ProtobufRpcEngine implement
     /**
      * This constructor takes a connectionId, instead of creating a new one.
      */
-    public Invoker(Class<?> protocol, Client.ConnectionId connId,
+    private Invoker(Class<?> protocol, Client.ConnectionId connId,
         Configuration conf, SocketFactory factory) {
       this.remoteId = connId;
       this.client = CLIENTS.getClient(conf, factory, RpcResponseWrapper.class);
@@ -192,7 +192,6 @@ public class ProtobufRpcEngine implement
       }
 
       RequestHeaderProto rpcRequestHeader = constructRpcRequestHeader(method);
-      RpcResponseWrapper val = null;
       
       if (LOG.isTraceEnabled()) {
         LOG.trace(Thread.currentThread().getId() + ": Call -> " +
@@ -202,6 +201,7 @@ public class ProtobufRpcEngine implement
 
 
       Message theRequest = (Message) args[1];
+      final RpcResponseWrapper val;
       try {
         val = (RpcResponseWrapper) client.call(RPC.RpcKind.RPC_PROTOCOL_BUFFER,
             new RpcRequestWrapper(rpcRequestHeader, theRequest), remoteId);

Modified: hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RPC.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RPC.java?rev=1513258&r1=1513257&r2=1513258&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RPC.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RPC.java Mon Aug 12 21:25:49 2013
@@ -642,104 +642,6 @@ public class RPC {
             + proxy.getClass());
   }
 
-  /** Construct a server for a protocol implementation instance listening on a
-   * port and address.
-   * @deprecated Please use {@link Builder} to build the {@link Server}
-   */
-  @Deprecated
-  public static Server getServer(final Object instance, final String bindAddress, final int port, Configuration conf) 
-    throws IOException {
-    return getServer(instance, bindAddress, port, 1, false, conf);
-  }
-
-  /** Construct a server for a protocol implementation instance listening on a
-   * port and address.
-   * @deprecated Please use {@link Builder} to build the {@link Server}
-   */
-  @Deprecated
-  public static Server getServer(final Object instance, final String bindAddress, final int port,
-                                 final int numHandlers,
-                                 final boolean verbose, Configuration conf) 
-    throws IOException {
-    return getServer(instance.getClass(),         // use impl class for protocol
-                     instance, bindAddress, port, numHandlers, false, conf, null,
-                     null);
-  }
-
-  /** Construct a server for a protocol implementation instance.
-   *  @deprecated Please use {@link Builder} to build the {@link Server}
-   */
-  @Deprecated
-  public static Server getServer(Class<?> protocol,
-                                 Object instance, String bindAddress,
-                                 int port, Configuration conf) 
-    throws IOException {
-    return getServer(protocol, instance, bindAddress, port, 1, false, conf, null,
-        null);
-  }
-
-  /** Construct a server for a protocol implementation instance.
-   * @deprecated Please use {@link Builder} to build the {@link Server}
-   */
-  @Deprecated
-  public static Server getServer(Class<?> protocol,
-                                 Object instance, String bindAddress, int port,
-                                 int numHandlers,
-                                 boolean verbose, Configuration conf) 
-    throws IOException {
-    
-    return getServer(protocol, instance, bindAddress, port, numHandlers, verbose,
-                 conf, null, null);
-  }
-  
-  /** Construct a server for a protocol implementation instance. 
-   *  @deprecated Please use {@link Builder} to build the {@link Server}
-   */
-  @Deprecated
-  public static Server getServer(Class<?> protocol,
-                                 Object instance, String bindAddress, int port,
-                                 int numHandlers,
-                                 boolean verbose, Configuration conf,
-                                 SecretManager<? extends TokenIdentifier> secretManager) 
-    throws IOException {
-    return getServer(protocol, instance, bindAddress, port, numHandlers, verbose,
-        conf, secretManager, null);
-  }
-  
-  /**
-   *  @deprecated Please use {@link Builder} to build the {@link Server}
-   */
-  @Deprecated
-  public static Server getServer(Class<?> protocol,
-      Object instance, String bindAddress, int port,
-      int numHandlers,
-      boolean verbose, Configuration conf,
-      SecretManager<? extends TokenIdentifier> secretManager,
-      String portRangeConfig) 
-  throws IOException {
-    return getProtocolEngine(protocol, conf)
-      .getServer(protocol, instance, bindAddress, port, numHandlers, -1, -1,
-                 verbose, conf, secretManager, portRangeConfig);
-  }
-
-  /** Construct a server for a protocol implementation instance.
-   *  @deprecated Please use {@link Builder} to build the {@link Server}
-   */
-  @Deprecated
-  public static <PROTO extends VersionedProtocol, IMPL extends PROTO> 
-        Server getServer(Class<PROTO> protocol,
-                                 IMPL instance, String bindAddress, int port,
-                                 int numHandlers, int numReaders, int queueSizePerHandler,
-                                 boolean verbose, Configuration conf,
-                                 SecretManager<? extends TokenIdentifier> secretManager) 
-    throws IOException {
-    
-    return getProtocolEngine(protocol, conf)
-      .getServer(protocol, instance, bindAddress, port, numHandlers,
-                 numReaders, queueSizePerHandler, verbose, conf, secretManager,
-                 null);
-  }
-
   /**
    * Class to construct instances of RPC server with specific options.
    */
@@ -913,7 +815,7 @@ public class RPC {
    
    // Register  protocol and its impl for rpc calls
    void registerProtocolAndImpl(RpcKind rpcKind, Class<?> protocolClass, 
-       Object protocolImpl) throws IOException {
+       Object protocolImpl) {
      String protocolName = RPC.getProtocolName(protocolClass);
      long version;
      
@@ -943,8 +845,6 @@ public class RPC {
      }
    }
    
-   
-   @SuppressWarnings("unused") // will be useful later.
    VerProtocolImpl[] getSupportedProtocolVersions(RPC.RpcKind rpcKind,
        String protocolName) {
      VerProtocolImpl[] resultk = 
@@ -999,8 +899,7 @@ public class RPC {
       initProtocolMetaInfo(conf);
     }
     
-    private void initProtocolMetaInfo(Configuration conf)
-        throws IOException {
+    private void initProtocolMetaInfo(Configuration conf) {
       RPC.setProtocolEngine(conf, ProtocolMetaInfoPB.class,
           ProtobufRpcEngine.class);
       ProtocolMetaInfoServerSideTranslatorPB xlator = 
@@ -1018,7 +917,7 @@ public class RPC {
      * @return the server (for convenience)
      */
     public Server addProtocol(RpcKind rpcKind, Class<?> protocolClass,
-        Object protocolImpl) throws IOException {
+        Object protocolImpl) {
       registerProtocolAndImpl(rpcKind, protocolClass, protocolImpl);
       return this;
     }