You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by zh...@apache.org on 2019/06/25 03:25:53 UTC

[hadoop] 01/01: HDFS-13643. Implement basic async rpc client

This is an automated email from the ASF dual-hosted git repository.

zhangduo pushed a commit to branch HDFS-13572
in repository https://gitbox.apache.org/repos/asf/hadoop.git

commit edd741d7a7e043268bd8a559d1de73019e58a839
Author: zhangduo <zh...@apache.org>
AuthorDate: Mon Jun 4 21:54:45 2018 +0800

    HDFS-13643. Implement basic async rpc client
---
 .../hadoop-client-minicluster/pom.xml              |   4 +
 hadoop-hdfs-project/hadoop-hdfs-client/pom.xml     |  29 +++-
 .../hdfs/ipc/BufferCallBeforeInitHandler.java      | 100 ++++++++++++
 .../main/java/org/apache/hadoop/hdfs/ipc/Call.java | 132 ++++++++++++++++
 .../org/apache/hadoop/hdfs/ipc/ConnectionId.java   |  71 +++++++++
 .../apache/hadoop/hdfs/ipc/HdfsRpcController.java  |  74 +++++++++
 .../java/org/apache/hadoop/hdfs/ipc/IPCUtil.java   |  34 ++++
 .../java/org/apache/hadoop/hdfs/ipc/RpcClient.java | 128 +++++++++++++++
 .../org/apache/hadoop/hdfs/ipc/RpcConnection.java  | 153 ++++++++++++++++++
 .../apache/hadoop/hdfs/ipc/RpcDuplexHandler.java   | 175 +++++++++++++++++++++
 .../org/apache/hadoop/hdfs/ipc/TestAsyncIPC.java   |  88 +++++++++++
 .../apache/hadoop/hdfs/ipc/TestRpcProtocolPB.java  |  27 ++++
 .../org/apache/hadoop/hdfs/ipc/TestServer.java     |  58 +++++++
 .../src/test/proto/test_rpc.proto                  |  35 +++++
 14 files changed, 1103 insertions(+), 5 deletions(-)

diff --git a/hadoop-client-modules/hadoop-client-minicluster/pom.xml b/hadoop-client-modules/hadoop-client-minicluster/pom.xml
index 918b374..594e28c 100644
--- a/hadoop-client-modules/hadoop-client-minicluster/pom.xml
+++ b/hadoop-client-modules/hadoop-client-minicluster/pom.xml
@@ -115,6 +115,10 @@
           <artifactId>netty</artifactId>
         </exclusion>
         <exclusion>
+          <groupId>io.netty</groupId>
+          <artifactId>netty-all</artifactId>
+        </exclusion>
+        <exclusion>
           <groupId>javax.servlet</groupId>
           <artifactId>javax.servlet-api</artifactId>
         </exclusion>
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/pom.xml b/hadoop-hdfs-project/hadoop-hdfs-client/pom.xml
index 8769bef..863f700 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/pom.xml
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/pom.xml
@@ -39,6 +39,11 @@ https://maven.apache.org/xsd/maven-4.0.0.xsd">
       <artifactId>okhttp</artifactId>
     </dependency>
     <dependency>
+      <groupId>io.netty</groupId>
+      <artifactId>netty-all</artifactId>
+      <scope>compile</scope>
+    </dependency>
+    <dependency>
       <groupId>org.apache.hadoop</groupId>
       <artifactId>hadoop-common</artifactId>
       <scope>provided</scope>
@@ -64,11 +69,6 @@ https://maven.apache.org/xsd/maven-4.0.0.xsd">
       <scope>test</scope>
     </dependency>
     <dependency>
-      <groupId>io.netty</groupId>
-      <artifactId>netty-all</artifactId>
-      <scope>test</scope>
-    </dependency>
-    <dependency>
       <groupId>org.mock-server</groupId>
       <artifactId>mockserver-netty</artifactId>
       <scope>test</scope>
@@ -163,6 +163,25 @@ https://maven.apache.org/xsd/maven-4.0.0.xsd">
               </source>
             </configuration>
           </execution>
+          <execution>
+            <id>compile-test-protoc</id>
+            <goals>
+              <goal>test-protoc</goal>
+            </goals>
+            <configuration>
+              <protocVersion>${protobuf.version}</protocVersion>
+              <protocCommand>${protoc.path}</protocCommand>
+              <imports>
+                <param>${basedir}/src/test/proto</param>
+              </imports>
+              <source>
+                <directory>${basedir}/src/test/proto</directory>
+                <includes>
+                  <include>test_rpc.proto</include>
+                </includes>
+              </source>
+            </configuration>
+          </execution>
         </executions>
       </plugin>
       <plugin>
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ipc/BufferCallBeforeInitHandler.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ipc/BufferCallBeforeInitHandler.java
new file mode 100644
index 0000000..89433e9
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ipc/BufferCallBeforeInitHandler.java
@@ -0,0 +1,100 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.ipc;
+
+import io.netty.channel.ChannelDuplexHandler;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelPromise;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.hadoop.classification.InterfaceAudience;
+
+@InterfaceAudience.Private
+public class BufferCallBeforeInitHandler extends ChannelDuplexHandler {
+
+  private enum BufferCallAction {
+    FLUSH, FAIL
+  }
+
+  public static final class BufferCallEvent {
+
+    public final BufferCallAction action;
+
+    public final IOException error;
+
+    private BufferCallEvent(BufferCallBeforeInitHandler.BufferCallAction action,
+        IOException error) {
+      this.action = action;
+      this.error = error;
+    }
+
+    public static BufferCallBeforeInitHandler.BufferCallEvent success() {
+      return SUCCESS_EVENT;
+    }
+
+    public static BufferCallBeforeInitHandler.BufferCallEvent fail(
+        IOException error) {
+      return new BufferCallEvent(BufferCallAction.FAIL, error);
+    }
+  }
+
+  private static final BufferCallEvent SUCCESS_EVENT =
+      new BufferCallEvent(BufferCallAction.FLUSH, null);
+
+  private final Map<Integer, Call> id2Call = new HashMap<>();
+
+  @Override
+  public void write(ChannelHandlerContext ctx, Object msg,
+      ChannelPromise promise) {
+    if (msg instanceof Call) {
+      Call call = (Call) msg;
+      id2Call.put(call.getId(), call);
+      // The call is already in track so here we set the write operation as
+      // success.
+      // We will fail the call directly if we can not write it out.
+      promise.trySuccess();
+    } else {
+      ctx.write(msg, promise);
+    }
+  }
+
+  @Override
+  public void userEventTriggered(ChannelHandlerContext ctx, Object evt)
+      throws Exception {
+    if (evt instanceof BufferCallEvent) {
+      BufferCallEvent bcEvt = (BufferCallBeforeInitHandler.BufferCallEvent) evt;
+      switch (bcEvt.action) {
+      case FLUSH:
+        for (Call call : id2Call.values()) {
+          ctx.write(call);
+        }
+        break;
+      case FAIL:
+        for (Call call : id2Call.values()) {
+          call.setException(bcEvt.error);
+        }
+        break;
+      }
+      ctx.flush();
+      ctx.pipeline().remove(this);
+    } else {
+      ctx.fireUserEventTriggered(evt);
+    }
+  }
+}
\ No newline at end of file
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ipc/Call.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ipc/Call.java
new file mode 100644
index 0000000..14a35af
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ipc/Call.java
@@ -0,0 +1,132 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.ipc;
+
+import com.google.protobuf.Message;
+import com.google.protobuf.RpcCallback;
+import java.io.IOException;
+import org.apache.hadoop.classification.InterfaceAudience;
+
+@InterfaceAudience.Private
+class Call {
+  private final int id;
+
+  private final String protocolName;
+
+  private final long protocolVersion;
+
+  private final String methodName;
+
+  private final Message param;
+
+  private final Message responseDefaultType;
+
+  private volatile Message response;
+
+  private volatile IOException error;
+
+  private boolean done;
+
+  private final RpcCallback<Call> callback;
+
+  Call(int id, String protocolName, long protocolVersion, String methodName,
+      Message param, Message responseDefaultType, RpcCallback<Call> callback) {
+    this.id = id;
+    this.protocolName = protocolName;
+    this.protocolVersion = protocolVersion;
+    this.methodName = methodName;
+    this.param = param;
+    this.responseDefaultType = responseDefaultType;
+    this.callback = callback;
+  }
+
+  private void callComplete() {
+    callback.run(this);
+  }
+
+  /**
+   * Set the exception when there is an error. Notify the caller the call is
+   * done.
+   *
+   * @param error exception thrown by the call; either local or remote
+   */
+  void setException(IOException error) {
+    synchronized (this) {
+      if (done) {
+        return;
+      }
+      this.done = true;
+      this.error = error;
+    }
+    callComplete();
+  }
+
+  /**
+   * Set the return value when there is no error. Notify the caller the call is
+   * done.
+   *
+   * @param response return value of the call.
+   * @param cells Can be null
+   */
+  void setResponse(Message response) {
+    synchronized (this) {
+      if (done) {
+        return;
+      }
+      this.done = true;
+      this.response = response;
+    }
+    callComplete();
+  }
+
+  int getId() {
+    return id;
+  }
+
+  String getProtocolName() {
+    return protocolName;
+  }
+
+  long getProtocolVersion() {
+    return protocolVersion;
+  }
+
+  String getMethodName() {
+    return methodName;
+  }
+
+  Message getParam() {
+    return param;
+  }
+
+  Message getResponseDefaultType() {
+    return responseDefaultType;
+  }
+
+  Message getResponse() {
+    return response;
+  }
+
+  IOException getError() {
+    return error;
+  }
+
+  boolean isDone() {
+    return done;
+  }
+}
\ No newline at end of file
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ipc/ConnectionId.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ipc/ConnectionId.java
new file mode 100644
index 0000000..111b925
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ipc/ConnectionId.java
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.ipc;
+
+import java.net.InetSocketAddress;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.security.UserGroupInformation;
+
+@InterfaceAudience.Private
+class ConnectionId {
+
+  private static final int PRIME = 16777619;
+
+  private final UserGroupInformation ticket;
+  private final String protocolName;
+  private final InetSocketAddress address;
+
+  public ConnectionId(UserGroupInformation ticket, String protocolName,
+      InetSocketAddress address) {
+    this.ticket = ticket;
+    this.protocolName = protocolName;
+    this.address = address;
+  }
+
+  UserGroupInformation getTicket() {
+    return ticket;
+  }
+
+  String getProtocolName() {
+    return protocolName;
+  }
+
+  InetSocketAddress getAddress() {
+    return address;
+  }
+
+  @Override
+  public int hashCode() {
+    int h = ticket == null ? 0 : ticket.hashCode();
+    h = PRIME * h + protocolName.hashCode();
+    h = PRIME * h + address.hashCode();
+    return h;
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+    if (obj instanceof ConnectionId) {
+      ConnectionId id = (ConnectionId) obj;
+      return address.equals(id.address) &&
+          ((ticket != null && ticket.equals(id.ticket)) ||
+              (ticket == id.ticket)) &&
+          protocolName.equals(id.protocolName);
+    }
+    return false;
+  }
+}
\ No newline at end of file
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ipc/HdfsRpcController.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ipc/HdfsRpcController.java
new file mode 100644
index 0000000..71ac3ef
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ipc/HdfsRpcController.java
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.ipc;
+
+import com.google.protobuf.RpcCallback;
+import com.google.protobuf.RpcController;
+
+import java.io.IOException;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+
+@InterfaceAudience.Private
+public class HdfsRpcController implements RpcController {
+
+  private IOException error;
+
+  @Override
+  public void reset() {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public boolean failed() {
+    return error != null;
+  }
+
+  @Override
+  public String errorText() {
+    return error != null ? error.getMessage() : null;
+  }
+
+  @Override
+  public void startCancel() {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public void setFailed(String reason) {
+    this.error = new IOException(reason);
+  }
+
+  public void setException(IOException error) {
+    this.error = error;
+  }
+
+  public IOException getException() {
+    return error;
+  }
+
+  @Override
+  public boolean isCanceled() {
+    return false;
+  }
+
+  @Override
+  public void notifyOnCancel(RpcCallback<Object> callback) {
+    throw new UnsupportedOperationException();
+  }
+}
\ No newline at end of file
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ipc/IPCUtil.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ipc/IPCUtil.java
new file mode 100644
index 0000000..db46bdb
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ipc/IPCUtil.java
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.ipc;
+
+import java.io.IOException;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+
+@InterfaceAudience.Private
+class IPCUtil {
+
+  static IOException toIOE(Throwable t) {
+    if (t instanceof IOException) {
+      return (IOException) t;
+    } else {
+      return new IOException(t);
+    }
+  }
+}
\ No newline at end of file
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ipc/RpcClient.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ipc/RpcClient.java
new file mode 100644
index 0000000..4792173
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ipc/RpcClient.java
@@ -0,0 +1,128 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.ipc;
+
+import com.google.protobuf.Descriptors;
+import com.google.protobuf.Message;
+import com.google.protobuf.RpcCallback;
+import com.google.protobuf.RpcChannel;
+
+import io.netty.channel.Channel;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.nio.NioSocketChannel;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.ipc.ClientId;
+import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.ipc.RemoteException;
+import org.apache.hadoop.ipc.Server.AuthProtocol;
+import org.apache.hadoop.security.UserGroupInformation;
+
+/**
+ * The protobuf based rpc client.
+ */
+@InterfaceAudience.Private
+public class RpcClient implements Closeable {
+
+  private final byte[] clientId;
+
+  private final EventLoopGroup group = new NioEventLoopGroup();
+
+  private final Class<? extends Channel> channelClass = NioSocketChannel.class;
+
+  private final AtomicInteger callIdCnt = new AtomicInteger(0);
+
+  private final ConcurrentMap<ConnectionId, RpcConnection> connections =
+      new ConcurrentHashMap<>();
+
+  public RpcClient() {
+    this.clientId = ClientId.getClientId();
+  }
+
+  private int nextCallId() {
+    int id, next;
+    do {
+      id = callIdCnt.get();
+      next = id < Integer.MAX_VALUE ? id + 1 : 0;
+    } while (!callIdCnt.compareAndSet(id, next));
+    return id;
+  }
+
+  private void onCallFinished(Call call, HdfsRpcController hrc,
+      InetSocketAddress addr, RpcCallback<Message> callback) {
+    IOException error = call.getError();
+    if (error != null) {
+      if (error instanceof RemoteException) {
+        error.fillInStackTrace();
+      }
+      hrc.setException(error);
+      callback.run(null);
+    } else {
+      callback.run(call.getResponse());
+    }
+  }
+
+  private void callMethod(String protocolName, long protocolVersion,
+      Descriptors.MethodDescriptor md, HdfsRpcController hrc, Message param,
+      Message returnType, UserGroupInformation ugi, InetSocketAddress addr,
+      RpcCallback<Message> callback) {
+    Call call =
+        new Call(nextCallId(), protocolName, protocolVersion, md.getName(),
+            param, returnType, c -> onCallFinished(c, hrc, addr, callback));
+    ConnectionId remoteId = new ConnectionId(ugi, protocolName, addr);
+    connections
+        .computeIfAbsent(remoteId,
+            k -> new RpcConnection(this, k, AuthProtocol.NONE))
+        .sendRequest(call);
+  }
+
+  public RpcChannel createRpcChannel(Class<?> protocol, InetSocketAddress addr,
+      UserGroupInformation ugi) {
+    String protocolName = RPC.getProtocolName(protocol);
+    long protocolVersion = RPC.getProtocolVersion(protocol);
+    return (method, controller, request, responsePrototype, done) -> callMethod(
+        protocolName, protocolVersion, method, (HdfsRpcController) controller,
+        request, responsePrototype, ugi, addr, done);
+  }
+
+  byte[] getClientId() {
+    return clientId;
+  }
+
+  EventLoopGroup getGroup() {
+    return group;
+  }
+
+  Class<? extends Channel> getChannelClass() {
+    return channelClass;
+  }
+
+  @Override
+  public void close() throws IOException {
+    connections.values().forEach(c -> c.shutdown());
+    connections.clear();
+  }
+}
\ No newline at end of file
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ipc/RpcConnection.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ipc/RpcConnection.java
new file mode 100644
index 0000000..5e7b482
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ipc/RpcConnection.java
@@ -0,0 +1,153 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.ipc;
+
+import static org.apache.hadoop.ipc.RpcConstants.CONNECTION_CONTEXT_CALL_ID;
+
+import com.google.protobuf.CodedOutputStream;
+import io.netty.bootstrap.Bootstrap;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufOutputStream;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelFutureListener;
+import io.netty.channel.ChannelOption;
+import io.netty.channel.ChannelPipeline;
+import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
+import java.io.IOException;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.hdfs.ipc.BufferCallBeforeInitHandler.BufferCallEvent;
+import org.apache.hadoop.ipc.RPC.RpcKind;
+import org.apache.hadoop.ipc.RpcConstants;
+import org.apache.hadoop.ipc.Server.AuthProtocol;
+import org.apache.hadoop.ipc.protobuf.IpcConnectionContextProtos.IpcConnectionContextProto;
+import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcRequestHeaderProto;
+import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcRequestHeaderProto.OperationProto;
+import org.apache.hadoop.security.SaslRpcServer.AuthMethod;
+import org.apache.hadoop.util.ProtoUtil;
+
+/**
+ * The connection to remote server.
+ */
+@InterfaceAudience.Private
+class RpcConnection {
+
+  final RpcClient rpcClient;
+
+  final ConnectionId remoteId;
+
+  private final AuthProtocol authProtocol;
+
+  private Channel channel;
+
+  public RpcConnection(RpcClient rpcClient, ConnectionId remoteId,
+      AuthProtocol authProtocol) {
+    this.rpcClient = rpcClient;
+    this.remoteId = remoteId;
+    this.authProtocol = authProtocol;
+  }
+
+  private void writeConnectionHeader(Channel ch) {
+    ByteBuf header = ch.alloc().buffer(7);
+    header.writeBytes(RpcConstants.HEADER.duplicate());
+    header.writeByte(RpcConstants.CURRENT_VERSION);
+    header.writeByte(0); // service class
+    header.writeByte(authProtocol.callId);
+    ch.writeAndFlush(header);
+  }
+
+  private void writeConnectionContext(Channel ch) throws IOException {
+    RpcRequestHeaderProto connectionContextHeader =
+        ProtoUtil.makeRpcRequestHeader(RpcKind.RPC_PROTOCOL_BUFFER,
+            OperationProto.RPC_FINAL_PACKET, CONNECTION_CONTEXT_CALL_ID,
+            RpcConstants.INVALID_RETRY_COUNT, rpcClient.getClientId());
+    int headerSize = connectionContextHeader.getSerializedSize();
+    IpcConnectionContextProto message = ProtoUtil.makeIpcConnectionContext(
+        remoteId.getProtocolName(), remoteId.getTicket(), AuthMethod.SIMPLE);
+    int messageSize = message.getSerializedSize();
+
+    int totalSize =
+        CodedOutputStream.computeRawVarint32Size(headerSize) + headerSize +
+            CodedOutputStream.computeRawVarint32Size(messageSize) + messageSize;
+    ByteBuf buf = ch.alloc().buffer(totalSize + 4);
+    buf.writeInt(totalSize);
+    ByteBufOutputStream out = new ByteBufOutputStream(buf);
+    connectionContextHeader.writeDelimitedTo(out);
+    message.writeDelimitedTo(out);
+    ch.writeAndFlush(buf);
+  }
+
+  private void established(Channel ch) throws IOException {
+    ChannelPipeline p = ch.pipeline();
+    String addBeforeHandler =
+        p.context(BufferCallBeforeInitHandler.class).name();
+    p.addBefore(addBeforeHandler, "frameDecoder",
+        new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4));
+    p.addBefore(addBeforeHandler, "rpcHandler", new RpcDuplexHandler(this));
+    p.fireUserEventTriggered(BufferCallEvent.success());
+  }
+
+  private Channel connect() {
+    if (channel != null) {
+      return channel;
+    }
+    channel = new Bootstrap().group(rpcClient.getGroup())
+        .channel(rpcClient.getChannelClass())
+        .option(ChannelOption.TCP_NODELAY, true)
+        .option(ChannelOption.SO_KEEPALIVE, true)
+        .handler(new BufferCallBeforeInitHandler())
+        .remoteAddress(remoteId.getAddress()).connect()
+        .addListener(new ChannelFutureListener() {
+
+          @Override
+          public void operationComplete(ChannelFuture future) throws Exception {
+            Channel ch = future.channel();
+            if (!future.isSuccess()) {
+              failInit(ch, IPCUtil.toIOE(future.cause()));
+              return;
+            }
+            writeConnectionHeader(ch);
+            writeConnectionContext(ch);
+            established(ch);
+          }
+        }).channel();
+    return channel;
+  }
+
+  private synchronized void failInit(Channel ch, IOException e) {
+    // fail all pending calls
+    ch.pipeline().fireUserEventTriggered(BufferCallEvent.fail(e));
+    shutdown0();
+  }
+
+  private void shutdown0() {
+    if (channel != null) {
+      channel.close();
+      channel = null;
+    }
+  }
+
+  public synchronized void shutdown() {
+    shutdown0();
+  }
+
+  public synchronized void sendRequest(Call call) {
+    Channel channel = connect();
+    channel.eventLoop().execute(() -> channel.writeAndFlush(call));
+  }
+}
\ No newline at end of file
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ipc/RpcDuplexHandler.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ipc/RpcDuplexHandler.java
new file mode 100644
index 0000000..3cc5659
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ipc/RpcDuplexHandler.java
@@ -0,0 +1,175 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.ipc;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.ipc.RPC.RpcKind;
+import org.apache.hadoop.ipc.RemoteException;
+import org.apache.hadoop.ipc.protobuf.ProtobufRpcEngineProtos.RequestHeaderProto;
+import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcRequestHeaderProto;
+import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcRequestHeaderProto.OperationProto;
+import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcResponseHeaderProto;
+import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcResponseHeaderProto.RpcErrorCodeProto;
+import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcResponseHeaderProto.RpcStatusProto;
+import org.apache.hadoop.util.ProtoUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.protobuf.CodedOutputStream;
+import com.google.protobuf.Message;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufInputStream;
+import io.netty.buffer.ByteBufOutputStream;
+import io.netty.channel.ChannelDuplexHandler;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelPromise;
+
+@InterfaceAudience.Private
+class RpcDuplexHandler extends ChannelDuplexHandler {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(RpcDuplexHandler.class);
+
+  private final RpcConnection conn;
+
+  private final Map<Integer, Call> id2Call = new HashMap<>();
+
+  public RpcDuplexHandler(RpcConnection conn) {
+    this.conn = conn;
+  }
+
+  private void writeRequest(ChannelHandlerContext ctx, Call call,
+      ChannelPromise promise) throws IOException {
+    id2Call.put(call.getId(), call);
+
+    RpcRequestHeaderProto rpcHeader = ProtoUtil.makeRpcRequestHeader(
+        RpcKind.RPC_PROTOCOL_BUFFER, OperationProto.RPC_FINAL_PACKET,
+        call.getId(), 0, conn.rpcClient.getClientId());
+    int rpcHeaderSize = rpcHeader.getSerializedSize();
+    RequestHeaderProto requestHeader =
+        RequestHeaderProto.newBuilder().setMethodName(call.getMethodName())
+            .setDeclaringClassProtocolName(call.getProtocolName())
+            .setClientProtocolVersion(call.getProtocolVersion()).build();
+    int requestHeaderSize = requestHeader.getSerializedSize();
+    int totalSize = CodedOutputStream.computeRawVarint32Size(rpcHeaderSize) +
+        rpcHeaderSize +
+        CodedOutputStream.computeRawVarint32Size(requestHeaderSize) +
+        requestHeaderSize;
+    Message param = call.getParam();
+    if (param != null) {
+      int paramSize = param.getSerializedSize();
+      totalSize +=
+          CodedOutputStream.computeRawVarint32Size(paramSize) + paramSize;
+    }
+    ByteBufOutputStream out =
+        new ByteBufOutputStream(ctx.alloc().buffer(totalSize + 4));
+    out.writeInt(totalSize);
+    rpcHeader.writeDelimitedTo(out);
+    requestHeader.writeDelimitedTo(out);
+    if (param != null) {
+      param.writeDelimitedTo(out);
+    }
+    ctx.write(out.buffer(), promise);
+  }
+
+  @Override
+  public void write(ChannelHandlerContext ctx, Object msg,
+      ChannelPromise promise) throws Exception {
+    if (msg instanceof Call) {
+      writeRequest(ctx, (Call) msg, promise);
+    } else {
+      ctx.write(msg, promise);
+    }
+  }
+
+  private void readResponse(ChannelHandlerContext ctx, ByteBuf buf)
+      throws Exception {
+    ByteBufInputStream in = new ByteBufInputStream(buf);
+    RpcResponseHeaderProto header =
+        RpcResponseHeaderProto.parseDelimitedFrom(in);
+    int id = header.getCallId();
+    RpcStatusProto status = header.getStatus();
+    if (status != RpcStatusProto.SUCCESS) {
+      String exceptionClassName =
+          header.hasExceptionClassName() ? header.getExceptionClassName()
+              : "ServerDidNotSetExceptionClassName";
+      String errorMsg = header.hasErrorMsg() ? header.getErrorMsg()
+          : "ServerDidNotSetErrorMsg";
+      RpcErrorCodeProto errCode =
+          (header.hasErrorDetail() ? header.getErrorDetail() : null);
+      if (errCode == null) {
+        LOG.warn("Detailed error code not set by server on rpc error");
+      }
+      RemoteException re =
+          new RemoteException(exceptionClassName, errorMsg, errCode);
+      if (status == RpcStatusProto.ERROR) {
+        Call call = id2Call.remove(id);
+        call.setException(re);
+      } else if (status == RpcStatusProto.FATAL) {
+        exceptionCaught(ctx, re);
+      }
+      return;
+    }
+    Call call = id2Call.remove(id);
+    call.setResponse(call.getResponseDefaultType().getParserForType()
+        .parseDelimitedFrom(in));
+  }
+
+  @Override
+  public void channelRead(ChannelHandlerContext ctx, Object msg)
+      throws Exception {
+    if (msg instanceof ByteBuf) {
+      ByteBuf buf = (ByteBuf) msg;
+      try {
+        readResponse(ctx, buf);
+      } finally {
+        buf.release();
+      }
+    }
+  }
+
+  private void cleanupCalls(ChannelHandlerContext ctx, IOException error) {
+    for (Call call : id2Call.values()) {
+      call.setException(error);
+    }
+    id2Call.clear();
+  }
+
+  @Override
+  public void channelInactive(ChannelHandlerContext ctx) throws Exception {
+    if (!id2Call.isEmpty()) {
+      cleanupCalls(ctx, new IOException("Connection closed"));
+    }
+    conn.shutdown();
+    ctx.fireChannelInactive();
+  }
+
+  @Override
+  public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
+      throws Exception {
+    if (!id2Call.isEmpty()) {
+      cleanupCalls(ctx, new IOException("Connection closed"));
+    }
+    conn.shutdown();
+  }
+}
\ No newline at end of file
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/test/java/org/apache/hadoop/hdfs/ipc/TestAsyncIPC.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/test/java/org/apache/hadoop/hdfs/ipc/TestAsyncIPC.java
new file mode 100644
index 0000000..86fde48
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/test/java/org/apache/hadoop/hdfs/ipc/TestAsyncIPC.java
@@ -0,0 +1,88 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.ipc;
+
+import static org.junit.Assert.assertEquals;
+
+import com.google.protobuf.RpcChannel;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.ipc.protobuf.TestRpcProtos;
+import org.apache.hadoop.hdfs.ipc.protobuf.TestRpcProtos.EchoRequestProto;
+import org.apache.hadoop.ipc.ProtobufRpcEngine;
+import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class TestAsyncIPC {
+
+  private static Configuration CONF;
+
+  private static TestServer SERVER;
+
+  private static int PORT;
+
+  @BeforeClass
+  public static void setUp() throws IOException {
+    CONF = new Configuration();
+    RPC.setProtocolEngine(CONF, TestRpcProtocolPB.class,
+        ProtobufRpcEngine.class);
+    SERVER = new TestServer(CONF);
+    SERVER.start();
+    PORT = SERVER.port();
+  }
+
+  @AfterClass
+  public static void tearDown() {
+    SERVER.stop();
+  }
+
+  @Test
+  public void test() throws IOException, InterruptedException {
+    try (RpcClient client = new RpcClient()) {
+      RpcChannel channel = client.createRpcChannel(TestRpcProtocolPB.class,
+          new InetSocketAddress("localhost", PORT),
+          UserGroupInformation.getCurrentUser());
+      TestRpcProtos.TestRpcService.Interface stub =
+          TestRpcProtos.TestRpcService.newStub(channel);
+      Map<Integer, String> results = new HashMap<>();
+      int count = 100;
+      CountDownLatch latch = new CountDownLatch(count);
+      for (int i = 0; i < count; i++) {
+        final int index = i;
+        stub.echo(new HdfsRpcController(),
+            EchoRequestProto.newBuilder().setMessage("Echo-" + index).build(),
+            resp -> {
+              results.put(index, resp.getMessage());
+              latch.countDown();
+            });
+      }
+      latch.await();
+      assertEquals(count, results.size());
+      for (int i = 0; i < count; i++) {
+        assertEquals("Echo-" + i, results.get(i));
+      }
+    }
+  }
+}
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/test/java/org/apache/hadoop/hdfs/ipc/TestRpcProtocolPB.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/test/java/org/apache/hadoop/hdfs/ipc/TestRpcProtocolPB.java
new file mode 100644
index 0000000..c7f7f27
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/test/java/org/apache/hadoop/hdfs/ipc/TestRpcProtocolPB.java
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.ipc;
+
+import org.apache.hadoop.hdfs.ipc.protobuf.TestRpcProtos;
+import org.apache.hadoop.ipc.ProtocolInfo;
+
+@ProtocolInfo(protocolName = "org.apache.hadoop.hdfs.ipc.TestRpcProtocol",
+    protocolVersion = 1)
+public interface TestRpcProtocolPB
+    extends TestRpcProtos.TestRpcService.BlockingInterface {
+}
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/test/java/org/apache/hadoop/hdfs/ipc/TestServer.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/test/java/org/apache/hadoop/hdfs/ipc/TestServer.java
new file mode 100644
index 0000000..3e06cc8
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/test/java/org/apache/hadoop/hdfs/ipc/TestServer.java
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.ipc;
+
+import com.google.protobuf.RpcController;
+import com.google.protobuf.ServiceException;
+import java.io.IOException;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.ipc.protobuf.TestRpcProtos;
+import org.apache.hadoop.hdfs.ipc.protobuf.TestRpcProtos.EchoRequestProto;
+import org.apache.hadoop.hdfs.ipc.protobuf.TestRpcProtos.EchoResponseProto;
+import org.apache.hadoop.ipc.RPC;
+
+public class TestServer implements TestRpcProtocolPB {
+
+  private final RPC.Server server;
+
+  public TestServer(Configuration conf) throws IOException {
+    server = new RPC.Builder(conf).setProtocol(TestRpcProtocolPB.class)
+        .setInstance(
+            TestRpcProtos.TestRpcService.newReflectiveBlockingService(this))
+        .setNumHandlers(10).build();
+  }
+
+  public void start() {
+    server.start();
+  }
+
+  public void stop() {
+    server.stop();
+  }
+
+  public int port() {
+    return server.getPort();
+  }
+
+  @Override
+  public EchoResponseProto echo(RpcController controller,
+      EchoRequestProto request) throws ServiceException {
+    return EchoResponseProto.newBuilder().setMessage(request.getMessage())
+        .build();
+  }
+}
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/test/proto/test_rpc.proto b/hadoop-hdfs-project/hadoop-hdfs-client/src/test/proto/test_rpc.proto
new file mode 100644
index 0000000..0997f56
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/test/proto/test_rpc.proto
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+option java_package = "org.apache.hadoop.hdfs.ipc.protobuf";
+option java_outer_classname = "TestRpcProtos";
+option java_generic_services = true;
+option java_generate_equals_and_hash = true;
+package hadoop.hdfs;
+
+message EchoRequestProto {
+  required string message = 1;
+}
+
+message EchoResponseProto {
+  required string message = 1;
+}
+
+service TestRpcService {
+  rpc echo(EchoRequestProto) returns (EchoResponseProto);
+}
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org