You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by da...@apache.org on 2013/08/06 01:01:28 UTC
svn commit: r1510793 - in
/hadoop/common/trunk/hadoop-common-project/hadoop-common/src:
main/java/org/apache/hadoop/ipc/ test/java/org/apache/hadoop/ipc/
Author: daryn
Date: Mon Aug 5 23:01:27 2013
New Revision: 1510793
URL: http://svn.apache.org/r1510793
Log:
HADOOP-9832. Add RPC header to client ping (daryn)
Modified:
hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java
hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RpcConstants.java
hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java
hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java
Modified: hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java?rev=1510793&r1=1510792&r2=1510793&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java (original)
+++ hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java Mon Aug 5 23:01:27 2013
@@ -18,10 +18,11 @@
package org.apache.hadoop.ipc;
-import static org.apache.hadoop.ipc.RpcConstants.CONNECTION_CONTEXT_CALL_ID;
+import static org.apache.hadoop.ipc.RpcConstants.*;
import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
+import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.FilterInputStream;
@@ -382,6 +383,7 @@ public class Client {
private boolean tcpNoDelay; // if T then disable Nagle's Algorithm
private boolean doPing; //do we need to send ping message
private int pingInterval; // how often sends ping to the server in msecs
+ private ByteArrayOutputStream pingRequest; // ping message
// currently active calls
private Hashtable<Integer, Call> calls = new Hashtable<Integer, Call>();
@@ -407,6 +409,15 @@ public class Client {
this.maxRetriesOnSocketTimeouts = remoteId.getMaxRetriesOnSocketTimeouts();
this.tcpNoDelay = remoteId.getTcpNoDelay();
this.doPing = remoteId.getDoPing();
+ if (doPing) {
+ // construct a RPC header with the callId as the ping callId
+ pingRequest = new ByteArrayOutputStream();
+ RpcRequestHeaderProto pingHeader = ProtoUtil
+ .makeRpcRequestHeader(RpcKind.RPC_PROTOCOL_BUFFER,
+ OperationProto.RPC_FINAL_PACKET, PING_CALL_ID,
+ RpcConstants.INVALID_RETRY_COUNT, clientId);
+ pingHeader.writeDelimitedTo(pingRequest);
+ }
this.pingInterval = remoteId.getPingInterval();
this.serviceClass = serviceClass;
if (LOG.isDebugEnabled()) {
@@ -910,7 +921,8 @@ public class Client {
if ( curTime - lastActivity.get() >= pingInterval) {
lastActivity.set(curTime);
synchronized (out) {
- out.writeInt(RpcConstants.PING_CALL_ID);
+ out.writeInt(pingRequest.size());
+ pingRequest.writeTo(out);
out.flush();
}
}
Modified: hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RpcConstants.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RpcConstants.java?rev=1510793&r1=1510792&r2=1510793&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RpcConstants.java (original)
+++ hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RpcConstants.java Mon Aug 5 23:01:27 2013
@@ -27,13 +27,13 @@ public class RpcConstants {
// Hidden Constructor
}
- public static final int PING_CALL_ID = -1;
+ public static final int AUTHORIZATION_FAILED_CALL_ID = -1;
+ public static final int INVALID_CALL_ID = -2;
+ public static final int CONNECTION_CONTEXT_CALL_ID = -3;
+ public static final int PING_CALL_ID = -4;
public static final byte[] DUMMY_CLIENT_ID = new byte[0];
- public static final int INVALID_CALL_ID = -2;
-
- public static final int CONNECTION_CONTEXT_CALL_ID = -3;
public static final int INVALID_RETRY_COUNT = -1;
Modified: hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java?rev=1510793&r1=1510792&r2=1510793&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java (original)
+++ hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java Mon Aug 5 23:01:27 2013
@@ -72,8 +72,7 @@ import org.apache.hadoop.fs.CommonConfig
import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableUtils;
-import static org.apache.hadoop.ipc.RpcConstants.CURRENT_VERSION;
-import static org.apache.hadoop.ipc.RpcConstants.CONNECTION_CONTEXT_CALL_ID;
+import static org.apache.hadoop.ipc.RpcConstants.*;
import org.apache.hadoop.ipc.ProtobufRpcEngine.RpcResponseWrapper;
import org.apache.hadoop.ipc.RPC.RpcInvoker;
import org.apache.hadoop.ipc.RPC.VersionMismatch;
@@ -1177,9 +1176,7 @@ public abstract class Server {
public UserGroupInformation attemptingUser = null; // user name before auth
// Fake 'call' for failed authorization response
- private static final int AUTHORIZATION_FAILED_CALLID = -1;
-
- private final Call authFailedCall = new Call(AUTHORIZATION_FAILED_CALLID,
+ private final Call authFailedCall = new Call(AUTHORIZATION_FAILED_CALL_ID,
RpcConstants.INVALID_RETRY_COUNT, null, this);
private ByteArrayOutputStream authFailedResponse = new ByteArrayOutputStream();
@@ -1523,11 +1520,6 @@ public abstract class Server {
if (data == null) {
dataLengthBuffer.flip();
dataLength = dataLengthBuffer.getInt();
- if ((dataLength == RpcConstants.PING_CALL_ID) && (!useWrap)) {
- // covers the !useSasl too
- dataLengthBuffer.clear();
- return 0; // ping message
- }
checkDataLength(dataLength);
data = ByteBuffer.allocate(dataLength);
}
@@ -1738,13 +1730,6 @@ public abstract class Server {
if (unwrappedData == null) {
unwrappedDataLengthBuffer.flip();
int unwrappedDataLength = unwrappedDataLengthBuffer.getInt();
-
- if (unwrappedDataLength == RpcConstants.PING_CALL_ID) {
- if (LOG.isDebugEnabled())
- LOG.debug("Received ping message");
- unwrappedDataLengthBuffer.clear();
- continue; // ping message
- }
unwrappedData = ByteBuffer.allocate(unwrappedDataLength);
}
@@ -1913,6 +1898,8 @@ public abstract class Server {
"SASL protocol not requested by client");
}
saslReadAndProcess(dis);
+ } else if (callId == PING_CALL_ID) {
+ LOG.debug("Received ping message");
} else {
throw new WrappedRpcServerException(
RpcErrorCodeProto.FATAL_INVALID_RPC_HEADER,
@@ -1926,7 +1913,7 @@ public abstract class Server {
*/
private void authorizeConnection() throws WrappedRpcServerException {
try {
- // If auth method is DIGEST, the token was obtained by the
+ // If auth method is TOKEN, the token was obtained by the
// real user for the effective user, therefore not required to
// authorize real user. doAs is allowed only for simple or kerberos
// authentication
Modified: hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java?rev=1510793&r1=1510792&r2=1510793&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java (original)
+++ hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java Mon Aug 5 23:01:27 2013
@@ -100,6 +100,7 @@ public class TestRPC {
void ping() throws IOException;
void slowPing(boolean shouldSlow) throws IOException;
+ void sleep(long delay) throws IOException, InterruptedException;
String echo(String value) throws IOException;
String[] echo(String[] value) throws IOException;
Writable echo(Writable value) throws IOException;
@@ -146,6 +147,11 @@ public class TestRPC {
}
@Override
+ public void sleep(long delay) throws InterruptedException {
+ Thread.sleep(delay);
+ }
+
+ @Override
public String echo(String value) throws IOException { return value; }
@Override
@@ -932,6 +938,28 @@ public class TestRPC {
}
}
+ @Test
+ public void testConnectionPing() throws Exception {
+ Configuration conf = new Configuration();
+ int pingInterval = 50;
+ conf.setBoolean(CommonConfigurationKeys.IPC_CLIENT_PING_KEY, true);
+ conf.setInt(CommonConfigurationKeys.IPC_PING_INTERVAL_KEY, pingInterval);
+ final Server server = new RPC.Builder(conf)
+ .setProtocol(TestProtocol.class).setInstance(new TestImpl())
+ .setBindAddress(ADDRESS).setPort(0).setNumHandlers(5).setVerbose(true)
+ .build();
+ server.start();
+
+ final TestProtocol proxy = RPC.getProxy(TestProtocol.class,
+ TestProtocol.versionID, server.getListenerAddress(), conf);
+ try {
+ // this call will throw exception if server couldn't decode the ping
+ proxy.sleep(pingInterval*4);
+ } finally {
+ if (proxy != null) RPC.stopProxy(proxy);
+ }
+ }
+
public static void main(String[] args) throws IOException {
new TestRPC().testCallsInternal(conf);