You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by sh...@apache.org on 2021/05/07 01:26:52 UTC
[hadoop] branch branch-3.1 updated: HADOOP-17680. Allow
ProtobufRpcEngine to be extensible (#2905) Contributed by Hector Chaverri.
This is an automated email from the ASF dual-hosted git repository.
shv pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/branch-3.1 by this push:
new 39bf9e2 HADOOP-17680. Allow ProtobufRpcEngine to be extensible (#2905) Contributed by Hector Chaverri.
39bf9e2 is described below
commit 39bf9e270e3a5dbcd3a7a2924f62bd0b2dee0a02
Author: hchaverr <hc...@linkedin.com>
AuthorDate: Thu May 6 16:40:45 2021 -0700
HADOOP-17680. Allow ProtobufRpcEngine to be extensible (#2905) Contributed by Hector Chaverri.
(cherry picked from commit f40e3eb0590f85bb42d2471992bf5d524628fdd6)
---
.../org/apache/hadoop/ipc/ProtobufRpcEngine.java | 30 +++++++++++++++++-----
1 file changed, 24 insertions(+), 6 deletions(-)
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java
index 959f701..670093f 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java
@@ -115,7 +115,7 @@ public class ProtobufRpcEngine implements RpcEngine {
factory)), false);
}
- private static class Invoker implements RpcInvocationHandler {
+ protected static class Invoker implements RpcInvocationHandler {
private final Map<String, Message> returnTypes =
new ConcurrentHashMap<String, Message>();
private boolean isClosed = false;
@@ -126,7 +126,7 @@ public class ProtobufRpcEngine implements RpcEngine {
private AtomicBoolean fallbackToSimpleAuth;
private AlignmentContext alignmentContext;
- private Invoker(Class<?> protocol, InetSocketAddress addr,
+ protected Invoker(Class<?> protocol, InetSocketAddress addr,
UserGroupInformation ticket, Configuration conf, SocketFactory factory,
int rpcTimeout, RetryPolicy connectionRetryPolicy,
AtomicBoolean fallbackToSimpleAuth, AlignmentContext alignmentContext)
@@ -141,7 +141,7 @@ public class ProtobufRpcEngine implements RpcEngine {
/**
* This constructor takes a connectionId, instead of creating a new one.
*/
- private Invoker(Class<?> protocol, Client.ConnectionId connId,
+ protected Invoker(Class<?> protocol, Client.ConnectionId connId,
Configuration conf, SocketFactory factory) {
this.remoteId = connId;
this.client = CLIENTS.getClient(conf, factory, RpcWritable.Buffer.class);
@@ -218,8 +218,6 @@ public class ProtobufRpcEngine implements RpcEngine {
traceScope = tracer.newScope(RpcClientUtil.methodToTraceString(method));
}
- RequestHeaderProto rpcRequestHeader = constructRpcRequestHeader(method);
-
if (LOG.isTraceEnabled()) {
LOG.trace(Thread.currentThread().getId() + ": Call -> " +
remoteId + ": " + method.getName() +
@@ -231,7 +229,7 @@ public class ProtobufRpcEngine implements RpcEngine {
final RpcWritable.Buffer val;
try {
val = (RpcWritable.Buffer) client.call(RPC.RpcKind.RPC_PROTOCOL_BUFFER,
- new RpcProtobufRequest(rpcRequestHeader, theRequest), remoteId,
+ constructRpcRequest(method, theRequest), remoteId,
fallbackToSimpleAuth, alignmentContext);
} catch (Throwable e) {
@@ -276,6 +274,11 @@ public class ProtobufRpcEngine implements RpcEngine {
}
}
+ protected Writable constructRpcRequest(Method method, Message theRequest) {
+ RequestHeaderProto rpcRequestHeader = constructRpcRequestHeader(method);
+ return new RpcProtobufRequest(rpcRequestHeader, theRequest);
+ }
+
private Message getReturnMessage(final Method method,
final RpcWritable.Buffer buf) throws ServiceException {
Message prototype = null;
@@ -325,6 +328,14 @@ public class ProtobufRpcEngine implements RpcEngine {
public ConnectionId getConnectionId() {
return remoteId;
}
+
+ protected long getClientProtocolVersion() {
+ return clientProtocolVersion;
+ }
+
+ protected String getProtocolName() {
+ return protocolName;
+ }
}
@VisibleForTesting
@@ -503,6 +514,13 @@ public class ProtobufRpcEngine implements RpcEngine {
String declaringClassProtoName =
rpcRequest.getDeclaringClassProtocolName();
long clientVersion = rpcRequest.getClientProtocolVersion();
+ return call(server, connectionProtocolName, request, receiveTime,
+ methodName, declaringClassProtoName, clientVersion);
+ }
+
+ protected Writable call(RPC.Server server, String connectionProtocolName,
+ RpcWritable.Buffer request, long receiveTime, String methodName,
+ String declaringClassProtoName, long clientVersion) throws Exception {
if (server.verbose)
LOG.info("Call: connectionProtocolName=" + connectionProtocolName +
", method=" + methodName);
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org