You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ratis.apache.org by ms...@apache.org on 2020/05/11 05:14:48 UTC
[incubator-ratis] branch master updated: RATIS-932. Avoid usage of
ratis-thirdparty-hadoop in ratis-hadoop module. (#94)
This is an automated email from the ASF dual-hosted git repository.
msingh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-ratis.git
The following commit(s) were added to refs/heads/master by this push:
new 1720b1f RATIS-932. Avoid usage of ratis-thirdparty-hadoop in ratis-hadoop module. (#94)
1720b1f is described below
commit 1720b1fde3ccaa149cfa98d142dd758e41f0b211
Author: Mukul Kumar Singh <ms...@apache.org>
AuthorDate: Mon May 11 10:44:38 2020 +0530
RATIS-932. Avoid usage of ratis-thirdparty-hadoop in ratis-hadoop module. (#94)
---
pom.xml | 6 -
ratis-examples/pom.xml | 5 -
ratis-hadoop/pom.xml | 4 -
.../apache/hadoop/ipc/ProtobufRpcEngineShaded.java | 610 ---------------------
.../java/org/apache/ratis/hadooprpc/Proxy.java | 4 +-
.../ratis/hadooprpc/server/HadoopRpcService.java | 6 +-
ratis-logservice/pom.xml | 5 -
7 files changed, 5 insertions(+), 635 deletions(-)
diff --git a/pom.xml b/pom.xml
index 886862c..de0ac8f 100644
--- a/pom.xml
+++ b/pom.xml
@@ -416,12 +416,6 @@
<version>${ratis.thirdparty.version}</version>
</dependency>
- <dependency>
- <groupId>org.apache.ratis</groupId>
- <artifactId>ratis-thirdparty-hadoop</artifactId>
- <version>${ratis.thirdparty.version}</version>
- </dependency>
-
<!-- External dependencies -->
<dependency>
<groupId>org.slf4j</groupId>
diff --git a/ratis-examples/pom.xml b/ratis-examples/pom.xml
index f7c183d..919b84e 100644
--- a/ratis-examples/pom.xml
+++ b/ratis-examples/pom.xml
@@ -80,11 +80,6 @@
<scope>test</scope>
<type>test-jar</type>
</dependency>
- <dependency>
- <artifactId>ratis-thirdparty-hadoop</artifactId>
- <groupId>org.apache.ratis</groupId>
- <scope>test</scope>
- </dependency>
<dependency>
<artifactId>ratis-grpc</artifactId>
diff --git a/ratis-hadoop/pom.xml b/ratis-hadoop/pom.xml
index d700d2f..d263864 100644
--- a/ratis-hadoop/pom.xml
+++ b/ratis-hadoop/pom.xml
@@ -28,10 +28,6 @@
<artifactId>ratis-proto</artifactId>
<groupId>org.apache.ratis</groupId>
</dependency>
- <dependency>
- <artifactId>ratis-thirdparty-hadoop</artifactId>
- <groupId>org.apache.ratis</groupId>
- </dependency>
<dependency>
<artifactId>ratis-common</artifactId>
diff --git a/ratis-hadoop/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngineShaded.java b/ratis-hadoop/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngineShaded.java
deleted file mode 100644
index c3a7a8a..0000000
--- a/ratis-hadoop/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngineShaded.java
+++ /dev/null
@@ -1,610 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.ipc;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.classification.InterfaceStability;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.io.DataOutputOutputStream;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.retry.RetryPolicy;
-import org.apache.hadoop.ipc.Client.ConnectionId;
-import org.apache.hadoop.ipc.RPC.RpcInvoker;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.security.token.SecretManager;
-import org.apache.hadoop.security.token.TokenIdentifier;
-import org.apache.hadoop.util.ProtoUtil;
-import org.apache.hadoop.util.Time;
-import org.apache.ratis.thirdparty.com.google.protobuf.*;
-import org.apache.ratis.thirdparty.com.google.protobuf.Descriptors.MethodDescriptor;
-import org.apache.ratis.thirdparty.org.apache.hadoop.ipc.protobuf.ProtobufRpcEngineProtos.RequestHeaderProto;
-import org.apache.ratis.thirdparty.org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcRequestHeaderProto;
-import org.apache.ratis.thirdparty.org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcResponseHeaderProto;
-
-import javax.net.SocketFactory;
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.io.OutputStream;
-import java.lang.reflect.Method;
-import java.lang.reflect.Proxy;
-import java.net.InetSocketAddress;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-/**
- * Copied from {@link org.apache.hadoop.ipc.ProtobufRpcEngine}
- * and replaced the protobuf classes with the shaded classes.
- */
-@InterfaceStability.Evolving
-public class ProtobufRpcEngineShaded implements RpcEngine {
- public static final Log LOG = LogFactory.getLog(ProtobufRpcEngineShaded.class);
-
- static { // Register the rpcRequest deserializer for WritableRpcEngine
- org.apache.hadoop.ipc.Server.registerProtocolEngine(
- RPC.RpcKind.RPC_PROTOCOL_BUFFER, RpcRequestWrapper.class,
- new Server.ProtoBufRpcInvoker());
- }
-
- private static final ClientCache CLIENTS = new ClientCache();
-
- public <T> ProtocolProxy<T> getProxy(Class<T> protocol, long clientVersion,
- InetSocketAddress addr, UserGroupInformation ticket, Configuration conf,
- SocketFactory factory, int rpcTimeout) throws IOException {
- return getProxy(protocol, clientVersion, addr, ticket, conf, factory,
- rpcTimeout, null);
- }
-
- @Override
- public <T> ProtocolProxy<T> getProxy(Class<T> protocol, long clientVersion,
- InetSocketAddress addr, UserGroupInformation ticket, Configuration conf,
- SocketFactory factory, int rpcTimeout, RetryPolicy connectionRetryPolicy
- ) throws IOException {
- return getProxy(protocol, clientVersion, addr, ticket, conf, factory,
- rpcTimeout, connectionRetryPolicy, null);
- }
-
- @Override
- @SuppressWarnings("unchecked")
- public <T> ProtocolProxy<T> getProxy(Class<T> protocol, long clientVersion,
- InetSocketAddress addr, UserGroupInformation ticket, Configuration conf,
- SocketFactory factory, int rpcTimeout, RetryPolicy connectionRetryPolicy,
- AtomicBoolean fallbackToSimpleAuth) throws IOException {
-
- final Invoker invoker = new Invoker(protocol, addr, ticket, conf, factory,
- rpcTimeout, connectionRetryPolicy, fallbackToSimpleAuth);
- return new ProtocolProxy<T>(protocol, (T) Proxy.newProxyInstance(
- protocol.getClassLoader(), new Class[]{protocol}, invoker), false);
- }
-
- @Override
- public ProtocolProxy<ProtocolMetaInfoPB> getProtocolMetaInfoProxy(
- ConnectionId connId, Configuration conf, SocketFactory factory)
- throws IOException {
- Class<ProtocolMetaInfoPB> protocol = ProtocolMetaInfoPB.class;
- return new ProtocolProxy<ProtocolMetaInfoPB>(protocol,
- (ProtocolMetaInfoPB) Proxy.newProxyInstance(protocol.getClassLoader(),
- new Class[] { protocol }, new Invoker(protocol, connId, conf,
- factory)), false);
- }
-
- private static final class Invoker implements RpcInvocationHandler {
- private final Map<String, Message> returnTypes =
- new ConcurrentHashMap<String, Message>();
- private boolean isClosed = false;
- private final Client.ConnectionId remoteId;
- private final Client client;
- private final long clientProtocolVersion;
- private final String protocolName;
- private AtomicBoolean fallbackToSimpleAuth;
-
- @SuppressWarnings("checkstyle:parameternumber")
- private Invoker(Class<?> protocol, InetSocketAddress addr,
- UserGroupInformation ticket, Configuration conf, SocketFactory factory,
- int rpcTimeout, RetryPolicy connectionRetryPolicy,
- AtomicBoolean fallbackToSimpleAuth) throws IOException {
- this(protocol, Client.ConnectionId.getConnectionId(
- addr, protocol, ticket, rpcTimeout, connectionRetryPolicy, conf),
- conf, factory);
- this.fallbackToSimpleAuth = fallbackToSimpleAuth;
- }
-
- /**
- * This constructor takes a connectionId, instead of creating a new one.
- */
- private Invoker(Class<?> protocol, Client.ConnectionId connId,
- Configuration conf, SocketFactory factory) {
- this.remoteId = connId;
- this.client = CLIENTS.getClient(conf, factory, RpcResponseWrapper.class);
- this.protocolName = RPC.getProtocolName(protocol);
- this.clientProtocolVersion = RPC
- .getProtocolVersion(protocol);
- }
-
- private RequestHeaderProto constructRpcRequestHeader(Method method) {
- RequestHeaderProto.Builder builder = RequestHeaderProto
- .newBuilder();
- builder.setMethodName(method.getName());
-
-
- // For protobuf, {@code protocol} used when creating client side proxy is
- // the interface extending BlockingInterface, which has the annotations
- // such as ProtocolName etc.
- //
- // Using Method.getDeclaringClass(), as in WritableEngine to get at
- // the protocol interface will return BlockingInterface, from where
- // the annotation ProtocolName and Version cannot be
- // obtained.
- //
- // Hence we simply use the protocol class used to create the proxy.
- // For PB this may limit the use of mixins on client side.
- builder.setDeclaringClassProtocolName(protocolName);
- builder.setClientProtocolVersion(clientProtocolVersion);
- return builder.build();
- }
-
- /**
- * This is the client side invoker of RPC method. It only throws
- * ServiceException, since the invocation proxy expects only
- * ServiceException to be thrown by the method in case protobuf service.
- *
- * ServiceException has the following causes:
- * <ol>
- * <li>Exceptions encountered on the client side in this method are
- * set as cause in ServiceException as is.</li>
- * <li>Exceptions from the server are wrapped in RemoteException and are
- * set as cause in ServiceException</li>
- * </ol>
- *
- * Note that the client calling protobuf RPC methods, must handle
- * ServiceException by getting the cause from the ServiceException. If the
- * cause is RemoteException, then unwrap it to get the exception thrown by
- * the server.
- */
- @Override
- public Object invoke(Object proxy, Method method, Object[] args)
- throws ServiceException {
- long startTime = 0;
- if (LOG.isDebugEnabled()) {
- startTime = Time.now();
- }
-
- if (args.length != 2) { // RpcController + Message
- throw new ServiceException("Too many parameters for request. Method: ["
- + method.getName() + "]" + ", Expected: 2, Actual: "
- + args.length);
- }
- if (args[1] == null) {
- throw new ServiceException("null param while calling Method: ["
- + method.getName() + "]");
- }
-
- RequestHeaderProto rpcRequestHeader = constructRpcRequestHeader(method);
-
- if (LOG.isTraceEnabled()) {
- LOG.trace(Thread.currentThread().getId() + ": Call -> " +
- remoteId + ": " + method.getName() +
- " {" + TextFormat.shortDebugString((Message) args[1]) + "}");
- }
-
-
- Message theRequest = (Message) args[1];
- final RpcResponseWrapper val;
- try {
- val = (RpcResponseWrapper) client.call(RPC.RpcKind.RPC_PROTOCOL_BUFFER,
- new RpcRequestWrapper(rpcRequestHeader, theRequest), remoteId,
- fallbackToSimpleAuth);
-
- } catch (Throwable e) {
- if (LOG.isTraceEnabled()) {
- LOG.trace(Thread.currentThread().getId() + ": Exception <- " +
- remoteId + ": " + method.getName() +
- " {" + e + "}");
- }
- throw new ServiceException(e);
- }
-
- if (LOG.isDebugEnabled()) {
- long callTime = Time.now() - startTime;
- LOG.debug("Call: " + method.getName() + " took " + callTime + "ms");
- }
-
- Message prototype = null;
- try {
- prototype = getReturnProtoType(method);
- } catch (Exception e) {
- throw new ServiceException(e);
- }
- Message returnMessage;
- try {
- returnMessage = prototype.newBuilderForType()
- .mergeFrom(val.theResponseRead).build();
-
- if (LOG.isTraceEnabled()) {
- LOG.trace(Thread.currentThread().getId() + ": Response <- " +
- remoteId + ": " + method.getName() +
- " {" + TextFormat.shortDebugString(returnMessage) + "}");
- }
-
- } catch (Throwable e) {
- throw new ServiceException(e);
- }
- return returnMessage;
- }
-
- @Override
- public void close() throws IOException {
- if (!isClosed) {
- isClosed = true;
- CLIENTS.stopClient(client);
- }
- }
-
- private Message getReturnProtoType(Method method) throws Exception {
- if (returnTypes.containsKey(method.getName())) {
- return returnTypes.get(method.getName());
- }
-
- Class<?> returnType = method.getReturnType();
- Method newInstMethod = returnType.getMethod("getDefaultInstance");
- newInstMethod.setAccessible(true);
- Message prototype = (Message) newInstMethod.invoke(null, (Object[]) null);
- returnTypes.put(method.getName(), prototype);
- return prototype;
- }
-
- @Override //RpcInvocationHandler
- public ConnectionId getConnectionId() {
- return remoteId;
- }
- }
-
- interface RpcWrapper extends Writable {
- int getLength();
- }
- /**
- * Wrapper for Protocol Buffer Requests
- *
- * Note while this wrapper is writable, the request on the wire is in
- * Protobuf. Several methods on {@link org.apache.hadoop.ipc.Server and RPC}
- * use type Writable as a wrapper to work across multiple RpcEngine kinds.
- */
- private abstract static class RpcMessageWithHeader<T extends GeneratedMessage> implements RpcWrapper {
- private T requestHeader;
- private Message theRequest; // for clientSide, the request is here
- private byte[] theRequestRead; // for server side, the request is here
-
- RpcMessageWithHeader() {
- }
-
- RpcMessageWithHeader(T requestHeader, Message theRequest) {
- this.requestHeader = requestHeader;
- this.theRequest = theRequest;
- }
-
- @Override
- public void write(DataOutput out) throws IOException {
- OutputStream os = DataOutputOutputStream.constructOutputStream(out);
-
- ((Message)requestHeader).writeDelimitedTo(os);
- theRequest.writeDelimitedTo(os);
- }
-
- @Override
- public void readFields(DataInput in) throws IOException {
- this.requestHeader = parseHeaderFrom(readVarintBytes(in));
- this.theRequestRead = readMessageRequest(in);
- }
-
- abstract T parseHeaderFrom(byte[] bytes) throws IOException;
-
- byte[] readMessageRequest(DataInput in) throws IOException {
- return readVarintBytes(in);
- }
-
- private static byte[] readVarintBytes(DataInput in) throws IOException {
- final int length = ProtoUtil.readRawVarint32(in);
- final byte[] bytes = new byte[length];
- in.readFully(bytes);
- return bytes;
- }
-
- T getMessageHeader() {
- return this.requestHeader;
- }
-
- byte[] getMessageBytes() {
- return this.theRequestRead;
- }
-
- @Override
- public int getLength() {
- int headerLen = requestHeader.getSerializedSize();
- int reqLen;
- if (theRequest != null) {
- reqLen = theRequest.getSerializedSize();
- } else if (theRequestRead != null ) {
- reqLen = theRequestRead.length;
- } else {
- throw new IllegalArgumentException(
- "getLength on uninitialized RpcWrapper");
- }
- return CodedOutputStream.computeUInt32SizeNoTag(headerLen) + headerLen
- + CodedOutputStream.computeUInt32SizeNoTag(reqLen) + reqLen;
- }
- }
-
- private static class RpcRequestWrapper extends RpcMessageWithHeader<RequestHeaderProto> {
- @SuppressWarnings("unused")
- RpcRequestWrapper() {}
-
- RpcRequestWrapper(RequestHeaderProto requestHeader, Message theRequest) {
- super(requestHeader, theRequest);
- }
-
- @Override
- RequestHeaderProto parseHeaderFrom(byte[] bytes) throws IOException {
- return RequestHeaderProto.parseFrom(bytes);
- }
-
- @Override
- public String toString() {
- return getMessageHeader().getDeclaringClassProtocolName() + "." +
- getMessageHeader().getMethodName();
- }
- }
-
- @InterfaceAudience.LimitedPrivate({"RPC"})
- public static class RpcRequestMessageWrapper extends RpcMessageWithHeader<RpcRequestHeaderProto> {
- public RpcRequestMessageWrapper() {}
-
- public RpcRequestMessageWrapper(
- RpcRequestHeaderProto requestHeader, Message theRequest) {
- super(requestHeader, theRequest);
- }
-
- @Override
- RpcRequestHeaderProto parseHeaderFrom(byte[] bytes) throws IOException {
- return RpcRequestHeaderProto.parseFrom(bytes);
- }
- }
-
- @InterfaceAudience.LimitedPrivate({"RPC"})
- public static class RpcResponseMessageWrapper extends RpcMessageWithHeader<RpcResponseHeaderProto> {
- public RpcResponseMessageWrapper() {}
-
- public RpcResponseMessageWrapper(
- RpcResponseHeaderProto responseHeader, Message theRequest) {
- super(responseHeader, theRequest);
- }
-
- @Override
- byte[] readMessageRequest(DataInput in) throws IOException {
- // error message contain no message body
- switch (getMessageHeader().getStatus()) {
- case ERROR:
- case FATAL:
- return null;
- default:
- return super.readMessageRequest(in);
- }
- }
-
- @Override
- RpcResponseHeaderProto parseHeaderFrom(byte[] bytes) throws IOException {
- return RpcResponseHeaderProto.parseFrom(bytes);
- }
- }
-
- /**
- * Wrapper for Protocol Buffer Responses
- *
- * Note while this wrapper is writable, the request on the wire is in
- * Protobuf. Several methods on {@link org.apache.hadoop.ipc.Server and RPC}
- * use type Writable as a wrapper to work across multiple RpcEngine kinds.
- */
- @InterfaceAudience.LimitedPrivate({"RPC"}) // temporarily exposed
- public static class RpcResponseWrapper implements RpcWrapper {
- private Message theResponse; // for senderSide, the response is here
- private byte[] theResponseRead; // for receiver side, the response is here
-
- public RpcResponseWrapper() {
- }
-
- public RpcResponseWrapper(Message message) {
- this.theResponse = message;
- }
-
- @Override
- public void write(DataOutput out) throws IOException {
- OutputStream os = DataOutputOutputStream.constructOutputStream(out);
- theResponse.writeDelimitedTo(os);
- }
-
- @Override
- public void readFields(DataInput in) throws IOException {
- int length = ProtoUtil.readRawVarint32(in);
- theResponseRead = new byte[length];
- in.readFully(theResponseRead);
- }
-
- @Override
- public int getLength() {
- int resLen;
- if (theResponse != null) {
- resLen = theResponse.getSerializedSize();
- } else if (theResponseRead != null ) {
- resLen = theResponseRead.length;
- } else {
- throw new IllegalArgumentException(
- "getLength on uninitialized RpcWrapper");
- }
- return CodedOutputStream.computeUInt32SizeNoTag(resLen) + resLen;
- }
- }
-
- @Override
- public RPC.Server getServer(Class<?> protocol, Object protocolImpl,
- String bindAddress, int port, int numHandlers, int numReaders,
- int queueSizePerHandler, boolean verbose, Configuration conf,
- SecretManager<? extends TokenIdentifier> secretManager,
- String portRangeConfig)
- throws IOException {
- return new Server(protocol, protocolImpl, conf, bindAddress, port,
- numHandlers, numReaders, queueSizePerHandler, verbose, secretManager,
- portRangeConfig);
- }
-
- public static class Server extends RPC.Server {
- /**
- * Construct an RPC server.
- *
- * @param protocolClass the class of protocol
- * @param protocolImpl the protocolImpl whose methods will be called
- * @param conf the configuration to use
- * @param bindAddress the address to bind on to listen for connection
- * @param port the port to listen for connections on
- * @param numHandlers the number of method handler threads to run
- * @param verbose whether each call should be logged
- * @param portRangeConfig A config parameter that can be used to restrict
- * the range of ports used when port is 0 (an ephemeral port)
- */
- @SuppressWarnings("checkstyle:parameternumber")
- public Server(Class<?> protocolClass, Object protocolImpl,
- Configuration conf, String bindAddress, int port, int numHandlers,
- int numReaders, int queueSizePerHandler, boolean verbose,
- SecretManager<? extends TokenIdentifier> secretManager,
- String portRangeConfig)
- throws IOException {
- super(bindAddress, port, null, numHandlers,
- numReaders, queueSizePerHandler, conf, classNameBase(protocolImpl
- .getClass().getName()), secretManager, portRangeConfig);
- this.verbose = verbose;
- registerProtocolAndImpl(RPC.RpcKind.RPC_PROTOCOL_BUFFER, protocolClass,
- protocolImpl);
- }
-
- /**
- * Protobuf invoker for {@link RpcInvoker}
- */
- static class ProtoBufRpcInvoker implements RpcInvoker {
- private static ProtoClassProtoImpl getProtocolImpl(RPC.Server server,
- String protoName, long clientVersion) throws RpcServerException {
- ProtoNameVer pv = new ProtoNameVer(protoName, clientVersion);
- ProtoClassProtoImpl impl =
- server.getProtocolImplMap(RPC.RpcKind.RPC_PROTOCOL_BUFFER).get(pv);
- if (impl == null) { // no match for Protocol AND Version
- VerProtocolImpl highest =
- server.getHighestSupportedProtocol(RPC.RpcKind.RPC_PROTOCOL_BUFFER,
- protoName);
- if (highest == null) {
- throw new RpcNoSuchProtocolException(
- "Unknown protocol: " + protoName);
- }
- // protocol supported but not the version that client wants
- throw new RPC.VersionMismatch(protoName, clientVersion,
- highest.version);
- }
- return impl;
- }
-
- @Override
- /**
- * This is a server side method, which is invoked over RPC. On success
- * the return response has protobuf response payload. On failure, the
- * exception name and the stack trace are return in the resposne.
- * See {@link HadoopRpcResponseProto}
- *
- * In this method there three types of exceptions possible and they are
- * returned in response as follows.
- * <ol>
- * <li> Exceptions encountered in this method that are returned
- * as {@link RpcServerException} </li>
- * <li> Exceptions thrown by the service is wrapped in ServiceException.
- * In that this method returns in response the exception thrown by the
- * service.</li>
- * <li> Other exceptions thrown by the service. They are returned as
- * it is.</li>
- * </ol>
- */
- public Writable call(RPC.Server server, String protocol,
- Writable writableRequest, long receiveTime) throws Exception {
- RpcRequestWrapper request = (RpcRequestWrapper) writableRequest;
- RequestHeaderProto rpcRequest = request.getMessageHeader();
- String methodName = rpcRequest.getMethodName();
- String protoName = rpcRequest.getDeclaringClassProtocolName();
- long clientVersion = rpcRequest.getClientProtocolVersion();
- if (server.verbose) {
- LOG.info("Call: protocol=" + protocol + ", method=" + methodName);
- }
-
- ProtoClassProtoImpl protocolImpl = getProtocolImpl(server, protoName,
- clientVersion);
- BlockingService service = (BlockingService) protocolImpl.protocolImpl;
- MethodDescriptor methodDescriptor = service.getDescriptorForType()
- .findMethodByName(methodName);
- if (methodDescriptor == null) {
- String msg = "Unknown method " + methodName + " called on " + protocol
- + " protocol.";
- LOG.warn(msg);
- throw new RpcNoSuchMethodException(msg);
- }
- Message prototype = service.getRequestPrototype(methodDescriptor);
- Message param = prototype.newBuilderForType()
- .mergeFrom(request.getMessageBytes()).build();
-
- Message result;
- long startTime = Time.now();
- int qTime = (int) (startTime - receiveTime);
- Exception exception = null;
- try {
- server.rpcDetailedMetrics.init(protocolImpl.protocolClass);
- result = service.callBlockingMethod(methodDescriptor, null, param);
- } catch (ServiceException e) {
- exception = (Exception) e.getCause();
- throw (Exception) e.getCause();
- } catch (Exception e) {
- exception = e;
- throw e;
- } finally {
- int processingTime = (int) (Time.now() - startTime);
- if (LOG.isDebugEnabled()) {
- String msg = "Served: " + methodName + " queueTime= " + qTime +
- " procesingTime= " + processingTime;
- if (exception != null) {
- msg += " exception= " + exception.getClass().getSimpleName();
- }
- LOG.debug(msg);
- }
- String detailedMetricsName = (exception == null) ?
- methodName :
- exception.getClass().getSimpleName();
- server.rpcMetrics.addRpcQueueTime(qTime);
- server.rpcMetrics.addRpcProcessingTime(processingTime);
- server.rpcDetailedMetrics.addProcessingTime(detailedMetricsName,
- processingTime);
- }
- return new RpcResponseWrapper(result);
- }
- }
- }
-}
diff --git a/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/Proxy.java b/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/Proxy.java
index a5b4fd1..b9c32e7 100644
--- a/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/Proxy.java
+++ b/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/Proxy.java
@@ -18,7 +18,7 @@
package org.apache.ratis.hadooprpc;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.ipc.ProtobufRpcEngineShaded;
+import org.apache.hadoop.ipc.ProtobufRpcEngine;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.UserGroupInformation;
@@ -30,7 +30,7 @@ public class Proxy<PROTOCOL> implements Closeable {
public static <PROTOCOL> PROTOCOL getProxy(
Class<PROTOCOL> clazz, String addressStr, Configuration conf)
throws IOException {
- RPC.setProtocolEngine(conf, clazz, ProtobufRpcEngineShaded.class);
+ RPC.setProtocolEngine(conf, clazz, ProtobufRpcEngine.class);
return RPC.getProxy(clazz, RPC.getProtocolVersion(clazz),
org.apache.ratis.util.NetUtils.createSocketAddr(addressStr),
UserGroupInformation.getCurrentUser(),
diff --git a/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/server/HadoopRpcService.java b/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/server/HadoopRpcService.java
index 73c3a23..abd232b 100644
--- a/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/server/HadoopRpcService.java
+++ b/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/server/HadoopRpcService.java
@@ -18,7 +18,7 @@
package org.apache.ratis.hadooprpc.server;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.ipc.ProtobufRpcEngineShaded;
+import org.apache.hadoop.ipc.ProtobufRpcEngine;
import org.apache.hadoop.ipc.RPC;
import org.apache.ratis.hadooprpc.HadoopConfigKeys;
import org.apache.ratis.hadooprpc.Proxy;
@@ -129,7 +129,7 @@ public final class HadoopRpcService extends RaftServerRpcWithProxy<Proxy<RaftSer
final BlockingService service
= RaftServerProtocolService.newReflectiveBlockingService(
new RaftServerProtocolServerSideTranslatorPB(serverProtocol));
- RPC.setProtocolEngine(conf, RaftServerProtocolPB.class, ProtobufRpcEngineShaded.class);
+ RPC.setProtocolEngine(conf, RaftServerProtocolPB.class, ProtobufRpcEngine.class);
return new RPC.Builder(conf)
.setProtocol(RaftServerProtocolPB.class)
.setInstance(service)
@@ -142,7 +142,7 @@ public final class HadoopRpcService extends RaftServerRpcWithProxy<Proxy<RaftSer
private void addRaftClientProtocol(RaftServer server, Configuration conf) {
final Class<?> protocol = CombinedClientProtocolPB.class;
- RPC.setProtocolEngine(conf, protocol, ProtobufRpcEngineShaded.class);
+ RPC.setProtocolEngine(conf, protocol, ProtobufRpcEngine.class);
final BlockingService service = CombinedClientProtocolService.newReflectiveBlockingService(
new CombinedClientProtocolServerSideTranslatorPB(server));
diff --git a/ratis-logservice/pom.xml b/ratis-logservice/pom.xml
index a83a617..eaf12b1 100644
--- a/ratis-logservice/pom.xml
+++ b/ratis-logservice/pom.xml
@@ -249,11 +249,6 @@
<scope>test</scope>
<type>test-jar</type>
</dependency>
- <dependency>
- <artifactId>ratis-thirdparty-hadoop</artifactId>
- <groupId>org.apache.ratis</groupId>
- <scope>test</scope>
- </dependency>
<!-- Test third-party dependencies -->
<dependency>
<groupId>junit</groupId>