You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by da...@apache.org on 2013/08/06 01:21:53 UTC

svn commit: r1510805 - in /hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common: ./ src/main/java/org/apache/hadoop/ipc/ src/test/java/org/apache/hadoop/ipc/

Author: daryn
Date: Mon Aug  5 23:21:53 2013
New Revision: 1510805

URL: http://svn.apache.org/r1510805
Log:
HADOOP-9832. [RPC v9] Add RPC header to client ping (daryn)

Modified:
    hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/CHANGES.txt
    hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java
    hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RpcConstants.java
    hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java
    hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java

Modified: hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/CHANGES.txt?rev=1510805&r1=1510804&r2=1510805&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/CHANGES.txt (original)
+++ hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/CHANGES.txt Mon Aug  5 23:21:53 2013
@@ -97,6 +97,8 @@ Release 2.1.0-beta - 2013-08-06
 
     HADOOP-9698. [RPC v9] Client must honor server's SASL negotiate response (daryn)
 
+    HADOOP-9832. [RPC v9] Add RPC header to client ping (daryn)
+
   NEW FEATURES
 
     HADOOP-9283. Add support for running the Hadoop client on AIX. (atm)

Modified: hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java?rev=1510805&r1=1510804&r2=1510805&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java (original)
+++ hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java Mon Aug  5 23:21:53 2013
@@ -18,8 +18,11 @@
 
 package org.apache.hadoop.ipc;
 
+import static org.apache.hadoop.ipc.RpcConstants.*;
+
 import java.io.BufferedInputStream;
 import java.io.BufferedOutputStream;
+import java.io.ByteArrayOutputStream;
 import java.io.DataInputStream;
 import java.io.DataOutputStream;
 import java.io.FilterInputStream;
@@ -380,6 +383,7 @@ public class Client {
     private boolean tcpNoDelay; // if T then disable Nagle's Algorithm
     private boolean doPing; //do we need to send ping message
     private int pingInterval; // how often sends ping to the server in msecs
+    private ByteArrayOutputStream pingRequest; // ping message
     
     // currently active calls
     private Hashtable<Integer, Call> calls = new Hashtable<Integer, Call>();
@@ -405,6 +409,15 @@ public class Client {
       this.maxRetriesOnSocketTimeouts = remoteId.getMaxRetriesOnSocketTimeouts();
       this.tcpNoDelay = remoteId.getTcpNoDelay();
       this.doPing = remoteId.getDoPing();
+      if (doPing) {
+        // construct a RPC header with the callId as the ping callId
+        pingRequest = new ByteArrayOutputStream();
+        RpcRequestHeaderProto pingHeader = ProtoUtil
+            .makeRpcRequestHeader(RpcKind.RPC_PROTOCOL_BUFFER,
+                OperationProto.RPC_FINAL_PACKET, PING_CALL_ID,
+                RpcConstants.INVALID_RETRY_COUNT, clientId);
+        pingHeader.writeDelimitedTo(pingRequest);
+      }
       this.pingInterval = remoteId.getPingInterval();
       this.serviceClass = serviceClass;
       if (LOG.isDebugEnabled()) {
@@ -908,7 +921,8 @@ public class Client {
       if ( curTime - lastActivity.get() >= pingInterval) {
         lastActivity.set(curTime);
         synchronized (out) {
-          out.writeInt(RpcConstants.PING_CALL_ID);
+          out.writeInt(pingRequest.size());
+          pingRequest.writeTo(out);
           out.flush();
         }
       }

Modified: hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RpcConstants.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RpcConstants.java?rev=1510805&r1=1510804&r2=1510805&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RpcConstants.java (original)
+++ hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RpcConstants.java Mon Aug  5 23:21:53 2013
@@ -27,12 +27,13 @@ public class RpcConstants {
     // Hidden Constructor
   }
   
-  public static final int PING_CALL_ID = -1;
+  public static final int AUTHORIZATION_FAILED_CALL_ID = -1;
+  public static final int INVALID_CALL_ID = -2;
+  public static final int CONNECTION_CONTEXT_CALL_ID = -3;
+  public static final int PING_CALL_ID = -4;
   
   public static final byte[] DUMMY_CLIENT_ID = new byte[0];
   
-  public static final int INVALID_CALL_ID = -2;
-  
   public static final int INVALID_RETRY_COUNT = -1;
   
   /**

Modified: hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java?rev=1510805&r1=1510804&r2=1510805&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java (original)
+++ hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java Mon Aug  5 23:21:53 2013
@@ -71,8 +71,7 @@ import org.apache.hadoop.fs.CommonConfig
 import org.apache.hadoop.io.DataOutputBuffer;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableUtils;
-import static org.apache.hadoop.ipc.RpcConstants.CURRENT_VERSION;
-import org.apache.hadoop.ipc.ProtobufRpcEngine.RpcRequestMessageWrapper;
+import static org.apache.hadoop.ipc.RpcConstants.*;
 import org.apache.hadoop.ipc.ProtobufRpcEngine.RpcResponseWrapper;
 import org.apache.hadoop.ipc.RPC.RpcInvoker;
 import org.apache.hadoop.ipc.RPC.VersionMismatch;
@@ -1174,10 +1173,7 @@ public abstract class Server {
     public UserGroupInformation attemptingUser = null; // user name before auth
 
     // Fake 'call' for failed authorization response
-    private static final int AUTHORIZATION_FAILED_CALLID = -1;
-    private static final int CONNECTION_CONTEXT_CALL_ID = -3;
-    
-    private final Call authFailedCall = new Call(AUTHORIZATION_FAILED_CALLID,
+    private final Call authFailedCall = new Call(AUTHORIZATION_FAILED_CALL_ID,
         RpcConstants.INVALID_RETRY_COUNT, null, this);
     private ByteArrayOutputStream authFailedResponse = new ByteArrayOutputStream();
     
@@ -1521,11 +1517,6 @@ public abstract class Server {
         if (data == null) {
           dataLengthBuffer.flip();
           dataLength = dataLengthBuffer.getInt();
-          if ((dataLength == RpcConstants.PING_CALL_ID) && (!useWrap)) {
-            // covers the !useSasl too
-            dataLengthBuffer.clear();
-            return 0; // ping message
-          }
           checkDataLength(dataLength);
           data = ByteBuffer.allocate(dataLength);
         }
@@ -1736,13 +1727,6 @@ public abstract class Server {
         if (unwrappedData == null) {
           unwrappedDataLengthBuffer.flip();
           int unwrappedDataLength = unwrappedDataLengthBuffer.getInt();
-
-          if (unwrappedDataLength == RpcConstants.PING_CALL_ID) {
-            if (LOG.isDebugEnabled())
-              LOG.debug("Received ping message");
-            unwrappedDataLengthBuffer.clear();
-            continue; // ping message
-          }
           unwrappedData = ByteBuffer.allocate(unwrappedDataLength);
         }
 
@@ -1911,6 +1895,8 @@ public abstract class Server {
               "SASL protocol not requested by client");
         }
         saslReadAndProcess(dis);
+      } else if (callId == PING_CALL_ID) {
+        LOG.debug("Received ping message");
       } else {
         throw new WrappedRpcServerException(
             RpcErrorCodeProto.FATAL_INVALID_RPC_HEADER,
@@ -1924,7 +1910,7 @@ public abstract class Server {
      */
     private void authorizeConnection() throws WrappedRpcServerException {
       try {
-        // If auth method is DIGEST, the token was obtained by the
+        // If auth method is TOKEN, the token was obtained by the
         // real user for the effective user, therefore not required to
         // authorize real user. doAs is allowed only for simple or kerberos
         // authentication

Modified: hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java?rev=1510805&r1=1510804&r2=1510805&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java (original)
+++ hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java Mon Aug  5 23:21:53 2013
@@ -100,6 +100,7 @@ public class TestRPC {
     
     void ping() throws IOException;
     void slowPing(boolean shouldSlow) throws IOException;
+    void sleep(long delay) throws IOException, InterruptedException;
     String echo(String value) throws IOException;
     String[] echo(String[] value) throws IOException;
     Writable echo(Writable value) throws IOException;
@@ -146,6 +147,11 @@ public class TestRPC {
     }
     
     @Override
+    public void sleep(long delay) throws InterruptedException {
+      Thread.sleep(delay);
+    }
+    
+    @Override
     public String echo(String value) throws IOException { return value; }
 
     @Override
@@ -925,6 +931,28 @@ public class TestRPC {
     assertTrue("rpc got exception " + error.get(), error.get() == null);
   }
 
+  @Test
+  public void testConnectionPing() throws Exception {
+    Configuration conf = new Configuration();
+    int pingInterval = 50;
+    conf.setBoolean(CommonConfigurationKeys.IPC_CLIENT_PING_KEY, true);
+    conf.setInt(CommonConfigurationKeys.IPC_PING_INTERVAL_KEY, pingInterval);
+    final Server server = new RPC.Builder(conf)
+        .setProtocol(TestProtocol.class).setInstance(new TestImpl())
+        .setBindAddress(ADDRESS).setPort(0).setNumHandlers(5).setVerbose(true)
+        .build();
+    server.start();
+
+    final TestProtocol proxy = RPC.getProxy(TestProtocol.class,
+        TestProtocol.versionID, server.getListenerAddress(), conf);
+    try {
+      // this call will throw exception if server couldn't decode the ping
+      proxy.sleep(pingInterval*4);
+    } finally {
+      if (proxy != null) RPC.stopProxy(proxy);
+    }
+  }
+
   public static void main(String[] args) throws IOException {
     new TestRPC().testCallsInternal(conf);