You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by zh...@apache.org on 2019/06/25 03:25:53 UTC
[hadoop] 01/01: HDFS-13643. Implement basic async rpc client
This is an automated email from the ASF dual-hosted git repository.
zhangduo pushed a commit to branch HDFS-13572
in repository https://gitbox.apache.org/repos/asf/hadoop.git
commit edd741d7a7e043268bd8a559d1de73019e58a839
Author: zhangduo <zh...@apache.org>
AuthorDate: Mon Jun 4 21:54:45 2018 +0800
HDFS-13643. Implement basic async rpc client
---
.../hadoop-client-minicluster/pom.xml | 4 +
hadoop-hdfs-project/hadoop-hdfs-client/pom.xml | 29 +++-
.../hdfs/ipc/BufferCallBeforeInitHandler.java | 100 ++++++++++++
.../main/java/org/apache/hadoop/hdfs/ipc/Call.java | 132 ++++++++++++++++
.../org/apache/hadoop/hdfs/ipc/ConnectionId.java | 71 +++++++++
.../apache/hadoop/hdfs/ipc/HdfsRpcController.java | 74 +++++++++
.../java/org/apache/hadoop/hdfs/ipc/IPCUtil.java | 34 ++++
.../java/org/apache/hadoop/hdfs/ipc/RpcClient.java | 128 +++++++++++++++
.../org/apache/hadoop/hdfs/ipc/RpcConnection.java | 153 ++++++++++++++++++
.../apache/hadoop/hdfs/ipc/RpcDuplexHandler.java | 175 +++++++++++++++++++++
.../org/apache/hadoop/hdfs/ipc/TestAsyncIPC.java | 88 +++++++++++
.../apache/hadoop/hdfs/ipc/TestRpcProtocolPB.java | 27 ++++
.../org/apache/hadoop/hdfs/ipc/TestServer.java | 58 +++++++
.../src/test/proto/test_rpc.proto | 35 +++++
14 files changed, 1103 insertions(+), 5 deletions(-)
diff --git a/hadoop-client-modules/hadoop-client-minicluster/pom.xml b/hadoop-client-modules/hadoop-client-minicluster/pom.xml
index 918b374..594e28c 100644
--- a/hadoop-client-modules/hadoop-client-minicluster/pom.xml
+++ b/hadoop-client-modules/hadoop-client-minicluster/pom.xml
@@ -115,6 +115,10 @@
<artifactId>netty</artifactId>
</exclusion>
<exclusion>
+ <groupId>io.netty</groupId>
+ <artifactId>netty-all</artifactId>
+ </exclusion>
+ <exclusion>
<groupId>javax.servlet</groupId>
<artifactId>javax.servlet-api</artifactId>
</exclusion>
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/pom.xml b/hadoop-hdfs-project/hadoop-hdfs-client/pom.xml
index 8769bef..863f700 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/pom.xml
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/pom.xml
@@ -39,6 +39,11 @@ https://maven.apache.org/xsd/maven-4.0.0.xsd">
<artifactId>okhttp</artifactId>
</dependency>
<dependency>
+ <groupId>io.netty</groupId>
+ <artifactId>netty-all</artifactId>
+ <scope>compile</scope>
+ </dependency>
+ <dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<scope>provided</scope>
@@ -64,11 +69,6 @@ https://maven.apache.org/xsd/maven-4.0.0.xsd">
<scope>test</scope>
</dependency>
<dependency>
- <groupId>io.netty</groupId>
- <artifactId>netty-all</artifactId>
- <scope>test</scope>
- </dependency>
- <dependency>
<groupId>org.mock-server</groupId>
<artifactId>mockserver-netty</artifactId>
<scope>test</scope>
@@ -163,6 +163,25 @@ https://maven.apache.org/xsd/maven-4.0.0.xsd">
</source>
</configuration>
</execution>
+ <execution>
+ <id>compile-test-protoc</id>
+ <goals>
+ <goal>test-protoc</goal>
+ </goals>
+ <configuration>
+ <protocVersion>${protobuf.version}</protocVersion>
+ <protocCommand>${protoc.path}</protocCommand>
+ <imports>
+ <param>${basedir}/src/test/proto</param>
+ </imports>
+ <source>
+ <directory>${basedir}/src/test/proto</directory>
+ <includes>
+ <include>test_rpc.proto</include>
+ </includes>
+ </source>
+ </configuration>
+ </execution>
</executions>
</plugin>
<plugin>
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ipc/BufferCallBeforeInitHandler.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ipc/BufferCallBeforeInitHandler.java
new file mode 100644
index 0000000..89433e9
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ipc/BufferCallBeforeInitHandler.java
@@ -0,0 +1,100 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.ipc;
+
+import io.netty.channel.ChannelDuplexHandler;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelPromise;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.hadoop.classification.InterfaceAudience;
+
+@InterfaceAudience.Private
+public class BufferCallBeforeInitHandler extends ChannelDuplexHandler {
+
+ private enum BufferCallAction {
+ FLUSH, FAIL
+ }
+
+ public static final class BufferCallEvent {
+
+ public final BufferCallAction action;
+
+ public final IOException error;
+
+ private BufferCallEvent(BufferCallBeforeInitHandler.BufferCallAction action,
+ IOException error) {
+ this.action = action;
+ this.error = error;
+ }
+
+ public static BufferCallBeforeInitHandler.BufferCallEvent success() {
+ return SUCCESS_EVENT;
+ }
+
+ public static BufferCallBeforeInitHandler.BufferCallEvent fail(
+ IOException error) {
+ return new BufferCallEvent(BufferCallAction.FAIL, error);
+ }
+ }
+
+ private static final BufferCallEvent SUCCESS_EVENT =
+ new BufferCallEvent(BufferCallAction.FLUSH, null);
+
+ private final Map<Integer, Call> id2Call = new HashMap<>();
+
+ @Override
+ public void write(ChannelHandlerContext ctx, Object msg,
+ ChannelPromise promise) {
+ if (msg instanceof Call) {
+ Call call = (Call) msg;
+ id2Call.put(call.getId(), call);
+ // The call is already in track so here we set the write operation as
+ // success.
+ // We will fail the call directly if we can not write it out.
+ promise.trySuccess();
+ } else {
+ ctx.write(msg, promise);
+ }
+ }
+
+ @Override
+ public void userEventTriggered(ChannelHandlerContext ctx, Object evt)
+ throws Exception {
+ if (evt instanceof BufferCallEvent) {
+ BufferCallEvent bcEvt = (BufferCallBeforeInitHandler.BufferCallEvent) evt;
+ switch (bcEvt.action) {
+ case FLUSH:
+ for (Call call : id2Call.values()) {
+ ctx.write(call);
+ }
+ break;
+ case FAIL:
+ for (Call call : id2Call.values()) {
+ call.setException(bcEvt.error);
+ }
+ break;
+ }
+ ctx.flush();
+ ctx.pipeline().remove(this);
+ } else {
+ ctx.fireUserEventTriggered(evt);
+ }
+ }
+}
\ No newline at end of file
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ipc/Call.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ipc/Call.java
new file mode 100644
index 0000000..14a35af
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ipc/Call.java
@@ -0,0 +1,132 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.ipc;
+
+import com.google.protobuf.Message;
+import com.google.protobuf.RpcCallback;
+import java.io.IOException;
+import org.apache.hadoop.classification.InterfaceAudience;
+
+@InterfaceAudience.Private
+class Call {
+ private final int id;
+
+ private final String protocolName;
+
+ private final long protocolVersion;
+
+ private final String methodName;
+
+ private final Message param;
+
+ private final Message responseDefaultType;
+
+ private volatile Message response;
+
+ private volatile IOException error;
+
+ private boolean done;
+
+ private final RpcCallback<Call> callback;
+
+ Call(int id, String protocolName, long protocolVersion, String methodName,
+ Message param, Message responseDefaultType, RpcCallback<Call> callback) {
+ this.id = id;
+ this.protocolName = protocolName;
+ this.protocolVersion = protocolVersion;
+ this.methodName = methodName;
+ this.param = param;
+ this.responseDefaultType = responseDefaultType;
+ this.callback = callback;
+ }
+
+ private void callComplete() {
+ callback.run(this);
+ }
+
+ /**
+ * Set the exception when there is an error. Notify the caller the call is
+ * done.
+ *
+ * @param error exception thrown by the call; either local or remote
+ */
+ void setException(IOException error) {
+ synchronized (this) {
+ if (done) {
+ return;
+ }
+ this.done = true;
+ this.error = error;
+ }
+ callComplete();
+ }
+
+ /**
+ * Set the return value when there is no error. Notify the caller the call is
+ * done.
+ *
+ * @param response return value of the call.
+ * @param cells Can be null
+ */
+ void setResponse(Message response) {
+ synchronized (this) {
+ if (done) {
+ return;
+ }
+ this.done = true;
+ this.response = response;
+ }
+ callComplete();
+ }
+
+ int getId() {
+ return id;
+ }
+
+ String getProtocolName() {
+ return protocolName;
+ }
+
+ long getProtocolVersion() {
+ return protocolVersion;
+ }
+
+ String getMethodName() {
+ return methodName;
+ }
+
+ Message getParam() {
+ return param;
+ }
+
+ Message getResponseDefaultType() {
+ return responseDefaultType;
+ }
+
+ Message getResponse() {
+ return response;
+ }
+
+ IOException getError() {
+ return error;
+ }
+
+ boolean isDone() {
+ return done;
+ }
+}
\ No newline at end of file
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ipc/ConnectionId.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ipc/ConnectionId.java
new file mode 100644
index 0000000..111b925
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ipc/ConnectionId.java
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.ipc;
+
+import java.net.InetSocketAddress;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.security.UserGroupInformation;
+
+@InterfaceAudience.Private
+class ConnectionId {
+
+ private static final int PRIME = 16777619;
+
+ private final UserGroupInformation ticket;
+ private final String protocolName;
+ private final InetSocketAddress address;
+
+ public ConnectionId(UserGroupInformation ticket, String protocolName,
+ InetSocketAddress address) {
+ this.ticket = ticket;
+ this.protocolName = protocolName;
+ this.address = address;
+ }
+
+ UserGroupInformation getTicket() {
+ return ticket;
+ }
+
+ String getProtocolName() {
+ return protocolName;
+ }
+
+ InetSocketAddress getAddress() {
+ return address;
+ }
+
+ @Override
+ public int hashCode() {
+ int h = ticket == null ? 0 : ticket.hashCode();
+ h = PRIME * h + protocolName.hashCode();
+ h = PRIME * h + address.hashCode();
+ return h;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (obj instanceof ConnectionId) {
+ ConnectionId id = (ConnectionId) obj;
+ return address.equals(id.address) &&
+ ((ticket != null && ticket.equals(id.ticket)) ||
+ (ticket == id.ticket)) &&
+ protocolName.equals(id.protocolName);
+ }
+ return false;
+ }
+}
\ No newline at end of file
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ipc/HdfsRpcController.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ipc/HdfsRpcController.java
new file mode 100644
index 0000000..71ac3ef
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ipc/HdfsRpcController.java
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.ipc;
+
+import com.google.protobuf.RpcCallback;
+import com.google.protobuf.RpcController;
+
+import java.io.IOException;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+
+@InterfaceAudience.Private
+public class HdfsRpcController implements RpcController {
+
+ private IOException error;
+
+ @Override
+ public void reset() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public boolean failed() {
+ return error != null;
+ }
+
+ @Override
+ public String errorText() {
+ return error != null ? error.getMessage() : null;
+ }
+
+ @Override
+ public void startCancel() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void setFailed(String reason) {
+ this.error = new IOException(reason);
+ }
+
+ public void setException(IOException error) {
+ this.error = error;
+ }
+
+ public IOException getException() {
+ return error;
+ }
+
+ @Override
+ public boolean isCanceled() {
+ return false;
+ }
+
+ @Override
+ public void notifyOnCancel(RpcCallback<Object> callback) {
+ throw new UnsupportedOperationException();
+ }
+}
\ No newline at end of file
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ipc/IPCUtil.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ipc/IPCUtil.java
new file mode 100644
index 0000000..db46bdb
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ipc/IPCUtil.java
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.ipc;
+
+import java.io.IOException;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+
+@InterfaceAudience.Private
+class IPCUtil {
+
+ static IOException toIOE(Throwable t) {
+ if (t instanceof IOException) {
+ return (IOException) t;
+ } else {
+ return new IOException(t);
+ }
+ }
+}
\ No newline at end of file
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ipc/RpcClient.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ipc/RpcClient.java
new file mode 100644
index 0000000..4792173
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ipc/RpcClient.java
@@ -0,0 +1,128 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.ipc;
+
+import com.google.protobuf.Descriptors;
+import com.google.protobuf.Message;
+import com.google.protobuf.RpcCallback;
+import com.google.protobuf.RpcChannel;
+
+import io.netty.channel.Channel;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.nio.NioSocketChannel;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.ipc.ClientId;
+import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.ipc.RemoteException;
+import org.apache.hadoop.ipc.Server.AuthProtocol;
+import org.apache.hadoop.security.UserGroupInformation;
+
+/**
+ * The protobuf based rpc client.
+ */
+@InterfaceAudience.Private
+public class RpcClient implements Closeable {
+
+ private final byte[] clientId;
+
+ private final EventLoopGroup group = new NioEventLoopGroup();
+
+ private final Class<? extends Channel> channelClass = NioSocketChannel.class;
+
+ private final AtomicInteger callIdCnt = new AtomicInteger(0);
+
+ private final ConcurrentMap<ConnectionId, RpcConnection> connections =
+ new ConcurrentHashMap<>();
+
+ public RpcClient() {
+ this.clientId = ClientId.getClientId();
+ }
+
+ private int nextCallId() {
+ int id, next;
+ do {
+ id = callIdCnt.get();
+ next = id < Integer.MAX_VALUE ? id + 1 : 0;
+ } while (!callIdCnt.compareAndSet(id, next));
+ return id;
+ }
+
+ private void onCallFinished(Call call, HdfsRpcController hrc,
+ InetSocketAddress addr, RpcCallback<Message> callback) {
+ IOException error = call.getError();
+ if (error != null) {
+ if (error instanceof RemoteException) {
+ error.fillInStackTrace();
+ }
+ hrc.setException(error);
+ callback.run(null);
+ } else {
+ callback.run(call.getResponse());
+ }
+ }
+
+ private void callMethod(String protocolName, long protocolVersion,
+ Descriptors.MethodDescriptor md, HdfsRpcController hrc, Message param,
+ Message returnType, UserGroupInformation ugi, InetSocketAddress addr,
+ RpcCallback<Message> callback) {
+ Call call =
+ new Call(nextCallId(), protocolName, protocolVersion, md.getName(),
+ param, returnType, c -> onCallFinished(c, hrc, addr, callback));
+ ConnectionId remoteId = new ConnectionId(ugi, protocolName, addr);
+ connections
+ .computeIfAbsent(remoteId,
+ k -> new RpcConnection(this, k, AuthProtocol.NONE))
+ .sendRequest(call);
+ }
+
+ public RpcChannel createRpcChannel(Class<?> protocol, InetSocketAddress addr,
+ UserGroupInformation ugi) {
+ String protocolName = RPC.getProtocolName(protocol);
+ long protocolVersion = RPC.getProtocolVersion(protocol);
+ return (method, controller, request, responsePrototype, done) -> callMethod(
+ protocolName, protocolVersion, method, (HdfsRpcController) controller,
+ request, responsePrototype, ugi, addr, done);
+ }
+
+ byte[] getClientId() {
+ return clientId;
+ }
+
+ EventLoopGroup getGroup() {
+ return group;
+ }
+
+ Class<? extends Channel> getChannelClass() {
+ return channelClass;
+ }
+
+ @Override
+ public void close() throws IOException {
+ connections.values().forEach(c -> c.shutdown());
+ connections.clear();
+ }
+}
\ No newline at end of file
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ipc/RpcConnection.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ipc/RpcConnection.java
new file mode 100644
index 0000000..5e7b482
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ipc/RpcConnection.java
@@ -0,0 +1,153 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.ipc;
+
+import static org.apache.hadoop.ipc.RpcConstants.CONNECTION_CONTEXT_CALL_ID;
+
+import com.google.protobuf.CodedOutputStream;
+import io.netty.bootstrap.Bootstrap;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufOutputStream;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelFutureListener;
+import io.netty.channel.ChannelOption;
+import io.netty.channel.ChannelPipeline;
+import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
+import java.io.IOException;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.hdfs.ipc.BufferCallBeforeInitHandler.BufferCallEvent;
+import org.apache.hadoop.ipc.RPC.RpcKind;
+import org.apache.hadoop.ipc.RpcConstants;
+import org.apache.hadoop.ipc.Server.AuthProtocol;
+import org.apache.hadoop.ipc.protobuf.IpcConnectionContextProtos.IpcConnectionContextProto;
+import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcRequestHeaderProto;
+import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcRequestHeaderProto.OperationProto;
+import org.apache.hadoop.security.SaslRpcServer.AuthMethod;
+import org.apache.hadoop.util.ProtoUtil;
+
+/**
+ * The connection to remote server.
+ */
+@InterfaceAudience.Private
+class RpcConnection {
+
+ final RpcClient rpcClient;
+
+ final ConnectionId remoteId;
+
+ private final AuthProtocol authProtocol;
+
+ private Channel channel;
+
+ public RpcConnection(RpcClient rpcClient, ConnectionId remoteId,
+ AuthProtocol authProtocol) {
+ this.rpcClient = rpcClient;
+ this.remoteId = remoteId;
+ this.authProtocol = authProtocol;
+ }
+
+ private void writeConnectionHeader(Channel ch) {
+ ByteBuf header = ch.alloc().buffer(7);
+ header.writeBytes(RpcConstants.HEADER.duplicate());
+ header.writeByte(RpcConstants.CURRENT_VERSION);
+ header.writeByte(0); // service class
+ header.writeByte(authProtocol.callId);
+ ch.writeAndFlush(header);
+ }
+
+ private void writeConnectionContext(Channel ch) throws IOException {
+ RpcRequestHeaderProto connectionContextHeader =
+ ProtoUtil.makeRpcRequestHeader(RpcKind.RPC_PROTOCOL_BUFFER,
+ OperationProto.RPC_FINAL_PACKET, CONNECTION_CONTEXT_CALL_ID,
+ RpcConstants.INVALID_RETRY_COUNT, rpcClient.getClientId());
+ int headerSize = connectionContextHeader.getSerializedSize();
+ IpcConnectionContextProto message = ProtoUtil.makeIpcConnectionContext(
+ remoteId.getProtocolName(), remoteId.getTicket(), AuthMethod.SIMPLE);
+ int messageSize = message.getSerializedSize();
+
+ int totalSize =
+ CodedOutputStream.computeRawVarint32Size(headerSize) + headerSize +
+ CodedOutputStream.computeRawVarint32Size(messageSize) + messageSize;
+ ByteBuf buf = ch.alloc().buffer(totalSize + 4);
+ buf.writeInt(totalSize);
+ ByteBufOutputStream out = new ByteBufOutputStream(buf);
+ connectionContextHeader.writeDelimitedTo(out);
+ message.writeDelimitedTo(out);
+ ch.writeAndFlush(buf);
+ }
+
+ private void established(Channel ch) throws IOException {
+ ChannelPipeline p = ch.pipeline();
+ String addBeforeHandler =
+ p.context(BufferCallBeforeInitHandler.class).name();
+ p.addBefore(addBeforeHandler, "frameDecoder",
+ new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4));
+ p.addBefore(addBeforeHandler, "rpcHandler", new RpcDuplexHandler(this));
+ p.fireUserEventTriggered(BufferCallEvent.success());
+ }
+
+ private Channel connect() {
+ if (channel != null) {
+ return channel;
+ }
+ channel = new Bootstrap().group(rpcClient.getGroup())
+ .channel(rpcClient.getChannelClass())
+ .option(ChannelOption.TCP_NODELAY, true)
+ .option(ChannelOption.SO_KEEPALIVE, true)
+ .handler(new BufferCallBeforeInitHandler())
+ .remoteAddress(remoteId.getAddress()).connect()
+ .addListener(new ChannelFutureListener() {
+
+ @Override
+ public void operationComplete(ChannelFuture future) throws Exception {
+ Channel ch = future.channel();
+ if (!future.isSuccess()) {
+ failInit(ch, IPCUtil.toIOE(future.cause()));
+ return;
+ }
+ writeConnectionHeader(ch);
+ writeConnectionContext(ch);
+ established(ch);
+ }
+ }).channel();
+ return channel;
+ }
+
+ private synchronized void failInit(Channel ch, IOException e) {
+ // fail all pending calls
+ ch.pipeline().fireUserEventTriggered(BufferCallEvent.fail(e));
+ shutdown0();
+ }
+
+ private void shutdown0() {
+ if (channel != null) {
+ channel.close();
+ channel = null;
+ }
+ }
+
+ public synchronized void shutdown() {
+ shutdown0();
+ }
+
+ public synchronized void sendRequest(Call call) {
+ Channel channel = connect();
+ channel.eventLoop().execute(() -> channel.writeAndFlush(call));
+ }
+}
\ No newline at end of file
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ipc/RpcDuplexHandler.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ipc/RpcDuplexHandler.java
new file mode 100644
index 0000000..3cc5659
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ipc/RpcDuplexHandler.java
@@ -0,0 +1,175 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.ipc;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.ipc.RPC.RpcKind;
+import org.apache.hadoop.ipc.RemoteException;
+import org.apache.hadoop.ipc.protobuf.ProtobufRpcEngineProtos.RequestHeaderProto;
+import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcRequestHeaderProto;
+import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcRequestHeaderProto.OperationProto;
+import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcResponseHeaderProto;
+import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcResponseHeaderProto.RpcErrorCodeProto;
+import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcResponseHeaderProto.RpcStatusProto;
+import org.apache.hadoop.util.ProtoUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.protobuf.CodedOutputStream;
+import com.google.protobuf.Message;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufInputStream;
+import io.netty.buffer.ByteBufOutputStream;
+import io.netty.channel.ChannelDuplexHandler;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelPromise;
+
+@InterfaceAudience.Private
+class RpcDuplexHandler extends ChannelDuplexHandler {
+
+ private static final Logger LOG =
+ LoggerFactory.getLogger(RpcDuplexHandler.class);
+
+ private final RpcConnection conn;
+
+ private final Map<Integer, Call> id2Call = new HashMap<>();
+
+ public RpcDuplexHandler(RpcConnection conn) {
+ this.conn = conn;
+ }
+
+ private void writeRequest(ChannelHandlerContext ctx, Call call,
+ ChannelPromise promise) throws IOException {
+ id2Call.put(call.getId(), call);
+
+ RpcRequestHeaderProto rpcHeader = ProtoUtil.makeRpcRequestHeader(
+ RpcKind.RPC_PROTOCOL_BUFFER, OperationProto.RPC_FINAL_PACKET,
+ call.getId(), 0, conn.rpcClient.getClientId());
+ int rpcHeaderSize = rpcHeader.getSerializedSize();
+ RequestHeaderProto requestHeader =
+ RequestHeaderProto.newBuilder().setMethodName(call.getMethodName())
+ .setDeclaringClassProtocolName(call.getProtocolName())
+ .setClientProtocolVersion(call.getProtocolVersion()).build();
+ int requestHeaderSize = requestHeader.getSerializedSize();
+ int totalSize = CodedOutputStream.computeRawVarint32Size(rpcHeaderSize) +
+ rpcHeaderSize +
+ CodedOutputStream.computeRawVarint32Size(requestHeaderSize) +
+ requestHeaderSize;
+ Message param = call.getParam();
+ if (param != null) {
+ int paramSize = param.getSerializedSize();
+ totalSize +=
+ CodedOutputStream.computeRawVarint32Size(paramSize) + paramSize;
+ }
+ ByteBufOutputStream out =
+ new ByteBufOutputStream(ctx.alloc().buffer(totalSize + 4));
+ out.writeInt(totalSize);
+ rpcHeader.writeDelimitedTo(out);
+ requestHeader.writeDelimitedTo(out);
+ if (param != null) {
+ param.writeDelimitedTo(out);
+ }
+ ctx.write(out.buffer(), promise);
+ }
+
+ @Override
+ public void write(ChannelHandlerContext ctx, Object msg,
+ ChannelPromise promise) throws Exception {
+ if (msg instanceof Call) {
+ writeRequest(ctx, (Call) msg, promise);
+ } else {
+ ctx.write(msg, promise);
+ }
+ }
+
+ private void readResponse(ChannelHandlerContext ctx, ByteBuf buf)
+ throws Exception {
+ ByteBufInputStream in = new ByteBufInputStream(buf);
+ RpcResponseHeaderProto header =
+ RpcResponseHeaderProto.parseDelimitedFrom(in);
+ int id = header.getCallId();
+ RpcStatusProto status = header.getStatus();
+ if (status != RpcStatusProto.SUCCESS) {
+ String exceptionClassName =
+ header.hasExceptionClassName() ? header.getExceptionClassName()
+ : "ServerDidNotSetExceptionClassName";
+ String errorMsg = header.hasErrorMsg() ? header.getErrorMsg()
+ : "ServerDidNotSetErrorMsg";
+ RpcErrorCodeProto errCode =
+ (header.hasErrorDetail() ? header.getErrorDetail() : null);
+ if (errCode == null) {
+ LOG.warn("Detailed error code not set by server on rpc error");
+ }
+ RemoteException re =
+ new RemoteException(exceptionClassName, errorMsg, errCode);
+ if (status == RpcStatusProto.ERROR) {
+ Call call = id2Call.remove(id);
+ call.setException(re);
+ } else if (status == RpcStatusProto.FATAL) {
+ exceptionCaught(ctx, re);
+ }
+ return;
+ }
+ Call call = id2Call.remove(id);
+ call.setResponse(call.getResponseDefaultType().getParserForType()
+ .parseDelimitedFrom(in));
+ }
+
+ @Override
+ public void channelRead(ChannelHandlerContext ctx, Object msg)
+ throws Exception {
+ if (msg instanceof ByteBuf) {
+ ByteBuf buf = (ByteBuf) msg;
+ try {
+ readResponse(ctx, buf);
+ } finally {
+ buf.release();
+ }
+ }
+ }
+
+ private void cleanupCalls(ChannelHandlerContext ctx, IOException error) {
+ for (Call call : id2Call.values()) {
+ call.setException(error);
+ }
+ id2Call.clear();
+ }
+
+ @Override
+ public void channelInactive(ChannelHandlerContext ctx) throws Exception {
+ if (!id2Call.isEmpty()) {
+ cleanupCalls(ctx, new IOException("Connection closed"));
+ }
+ conn.shutdown();
+ ctx.fireChannelInactive();
+ }
+
+ @Override
+ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
+ throws Exception {
+ if (!id2Call.isEmpty()) {
+ cleanupCalls(ctx, new IOException("Connection closed"));
+ }
+ conn.shutdown();
+ }
+}
\ No newline at end of file
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/test/java/org/apache/hadoop/hdfs/ipc/TestAsyncIPC.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/test/java/org/apache/hadoop/hdfs/ipc/TestAsyncIPC.java
new file mode 100644
index 0000000..86fde48
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/test/java/org/apache/hadoop/hdfs/ipc/TestAsyncIPC.java
@@ -0,0 +1,88 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.ipc;
+
+import static org.junit.Assert.assertEquals;
+
+import com.google.protobuf.RpcChannel;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.ipc.protobuf.TestRpcProtos;
+import org.apache.hadoop.hdfs.ipc.protobuf.TestRpcProtos.EchoRequestProto;
+import org.apache.hadoop.ipc.ProtobufRpcEngine;
+import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class TestAsyncIPC {
+
+ private static Configuration CONF;
+
+ private static TestServer SERVER;
+
+ private static int PORT;
+
+ @BeforeClass
+ public static void setUp() throws IOException {
+ CONF = new Configuration();
+ RPC.setProtocolEngine(CONF, TestRpcProtocolPB.class,
+ ProtobufRpcEngine.class);
+ SERVER = new TestServer(CONF);
+ SERVER.start();
+ PORT = SERVER.port();
+ }
+
+ @AfterClass
+ public static void tearDown() {
+ SERVER.stop();
+ }
+
+ @Test
+ public void test() throws IOException, InterruptedException {
+ try (RpcClient client = new RpcClient()) {
+ RpcChannel channel = client.createRpcChannel(TestRpcProtocolPB.class,
+ new InetSocketAddress("localhost", PORT),
+ UserGroupInformation.getCurrentUser());
+ TestRpcProtos.TestRpcService.Interface stub =
+ TestRpcProtos.TestRpcService.newStub(channel);
+ Map<Integer, String> results = new HashMap<>();
+ int count = 100;
+ CountDownLatch latch = new CountDownLatch(count);
+ for (int i = 0; i < count; i++) {
+ final int index = i;
+ stub.echo(new HdfsRpcController(),
+ EchoRequestProto.newBuilder().setMessage("Echo-" + index).build(),
+ resp -> {
+ results.put(index, resp.getMessage());
+ latch.countDown();
+ });
+ }
+ latch.await();
+ assertEquals(count, results.size());
+ for (int i = 0; i < count; i++) {
+ assertEquals("Echo-" + i, results.get(i));
+ }
+ }
+ }
+}
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/test/java/org/apache/hadoop/hdfs/ipc/TestRpcProtocolPB.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/test/java/org/apache/hadoop/hdfs/ipc/TestRpcProtocolPB.java
new file mode 100644
index 0000000..c7f7f27
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/test/java/org/apache/hadoop/hdfs/ipc/TestRpcProtocolPB.java
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.ipc;
+
+import org.apache.hadoop.hdfs.ipc.protobuf.TestRpcProtos;
+import org.apache.hadoop.ipc.ProtocolInfo;
+
+@ProtocolInfo(protocolName = "org.apache.hadoop.hdfs.ipc.TestRpcProtocol",
+ protocolVersion = 1)
+public interface TestRpcProtocolPB
+ extends TestRpcProtos.TestRpcService.BlockingInterface {
+}
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/test/java/org/apache/hadoop/hdfs/ipc/TestServer.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/test/java/org/apache/hadoop/hdfs/ipc/TestServer.java
new file mode 100644
index 0000000..3e06cc8
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/test/java/org/apache/hadoop/hdfs/ipc/TestServer.java
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.ipc;
+
+import com.google.protobuf.RpcController;
+import com.google.protobuf.ServiceException;
+import java.io.IOException;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.ipc.protobuf.TestRpcProtos;
+import org.apache.hadoop.hdfs.ipc.protobuf.TestRpcProtos.EchoRequestProto;
+import org.apache.hadoop.hdfs.ipc.protobuf.TestRpcProtos.EchoResponseProto;
+import org.apache.hadoop.ipc.RPC;
+
+public class TestServer implements TestRpcProtocolPB {
+
+ private final RPC.Server server;
+
+ public TestServer(Configuration conf) throws IOException {
+ server = new RPC.Builder(conf).setProtocol(TestRpcProtocolPB.class)
+ .setInstance(
+ TestRpcProtos.TestRpcService.newReflectiveBlockingService(this))
+ .setNumHandlers(10).build();
+ }
+
+ public void start() {
+ server.start();
+ }
+
+ public void stop() {
+ server.stop();
+ }
+
+ public int port() {
+ return server.getPort();
+ }
+
+ @Override
+ public EchoResponseProto echo(RpcController controller,
+ EchoRequestProto request) throws ServiceException {
+ return EchoResponseProto.newBuilder().setMessage(request.getMessage())
+ .build();
+ }
+}
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/test/proto/test_rpc.proto b/hadoop-hdfs-project/hadoop-hdfs-client/src/test/proto/test_rpc.proto
new file mode 100644
index 0000000..0997f56
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/test/proto/test_rpc.proto
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+option java_package = "org.apache.hadoop.hdfs.ipc.protobuf";
+option java_outer_classname = "TestRpcProtos";
+option java_generic_services = true;
+option java_generate_equals_and_hash = true;
+package hadoop.hdfs;
+
+message EchoRequestProto {
+ required string message = 1;
+}
+
+message EchoResponseProto {
+ required string message = 1;
+}
+
+service TestRpcService {
+ rpc echo(EchoRequestProto) returns (EchoResponseProto);
+}
\ No newline at end of file
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org