You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by se...@apache.org on 2017/02/27 20:24:16 UTC

[14/20] hive git commit: HIVE-15859: HoS: Write RPC messages in event loop (Rui reviewed by Xuefu)

HIVE-15859: HoS: Write RPC messages in event loop (Rui reviewed by Xuefu)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/3813f39b
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/3813f39b
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/3813f39b

Branch: refs/heads/hive-14535
Commit: 3813f39b7a066e4a4e15300aa07b1c9ece61a0a0
Parents: 26356be
Author: Rui Li <li...@apache.org>
Authored: Mon Feb 27 13:29:53 2017 +0800
Committer: Rui Li <li...@apache.org>
Committed: Mon Feb 27 13:29:53 2017 +0800

----------------------------------------------------------------------
 .../java/org/apache/hive/spark/client/rpc/Rpc.java | 17 +++++++++--------
 .../hive/spark/client/rpc/RpcDispatcher.java       |  7 +------
 .../org/apache/hive/spark/client/rpc/TestRpc.java  |  3 +++
 3 files changed, 13 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/3813f39b/spark-client/src/main/java/org/apache/hive/spark/client/rpc/Rpc.java
----------------------------------------------------------------------
diff --git a/spark-client/src/main/java/org/apache/hive/spark/client/rpc/Rpc.java b/spark-client/src/main/java/org/apache/hive/spark/client/rpc/Rpc.java
index 0489684..5dde16c 100644
--- a/spark-client/src/main/java/org/apache/hive/spark/client/rpc/Rpc.java
+++ b/spark-client/src/main/java/org/apache/hive/spark/client/rpc/Rpc.java
@@ -221,7 +221,6 @@ public class Rpc implements Closeable {
   private final Channel channel;
   private final Collection<Listener> listeners;
   private final EventExecutorGroup egroup;
-  private final Object channelLock;
   private volatile RpcDispatcher dispatcher;
 
   private Rpc(RpcConfiguration config, Channel channel, EventExecutorGroup egroup) {
@@ -229,7 +228,6 @@ public class Rpc implements Closeable {
     Preconditions.checkArgument(egroup != null);
     this.config = config;
     this.channel = channel;
-    this.channelLock = new Object();
     this.dispatcher = null;
     this.egroup = egroup;
     this.listeners = Lists.newLinkedList();
@@ -271,13 +269,13 @@ public class Rpc implements Closeable {
    * @param retType Type of expected reply.
    * @return A future used to monitor the operation.
    */
-  public <T> Future<T> call(Object msg, Class<T> retType) {
+  public <T> Future<T> call(final Object msg, Class<T> retType) {
     Preconditions.checkArgument(msg != null);
     Preconditions.checkState(channel.isActive(), "RPC channel is closed.");
     try {
       final long id = rpcId.getAndIncrement();
       final Promise<T> promise = createPromise();
-      ChannelFutureListener listener = new ChannelFutureListener() {
+      final ChannelFutureListener listener = new ChannelFutureListener() {
           @Override
           public void operationComplete(ChannelFuture cf) {
             if (!cf.isSuccess() && !promise.isDone()) {
@@ -290,10 +288,13 @@ public class Rpc implements Closeable {
       };
 
       dispatcher.registerRpc(id, promise, msg.getClass().getName());
-      synchronized (channelLock) {
-        channel.write(new MessageHeader(id, Rpc.MessageType.CALL)).addListener(listener);
-        channel.writeAndFlush(msg).addListener(listener);
-      }
+      channel.eventLoop().submit(new Runnable() {
+        @Override
+        public void run() {
+          channel.write(new MessageHeader(id, Rpc.MessageType.CALL)).addListener(listener);
+          channel.writeAndFlush(msg).addListener(listener);
+        }
+      });
       return promise;
     } catch (Exception e) {
       throw Throwables.propagate(e);

http://git-wip-us.apache.org/repos/asf/hive/blob/3813f39b/spark-client/src/main/java/org/apache/hive/spark/client/rpc/RpcDispatcher.java
----------------------------------------------------------------------
diff --git a/spark-client/src/main/java/org/apache/hive/spark/client/rpc/RpcDispatcher.java b/spark-client/src/main/java/org/apache/hive/spark/client/rpc/RpcDispatcher.java
index ebafd13..2b6ab29 100644
--- a/spark-client/src/main/java/org/apache/hive/spark/client/rpc/RpcDispatcher.java
+++ b/spark-client/src/main/java/org/apache/hive/spark/client/rpc/RpcDispatcher.java
@@ -152,12 +152,7 @@ public abstract class RpcDispatcher extends SimpleChannelInboundHandler<Object>
 
   @Override
   public final void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
-    if (LOG.isDebugEnabled()) {
-      LOG.debug(String.format("[%s] Caught exception in channel pipeline.", name()), cause);
-    } else {
-      LOG.info("[{}] Closing channel due to exception in pipeline ({}).", name(),
-          cause.getMessage());
-    }
+    LOG.error(String.format("[%s] Closing channel due to exception in pipeline.", name()), cause);
 
     if (lastHeader != null) {
       // There's an RPC waiting for a reply. Exception was most probably caught while processing

http://git-wip-us.apache.org/repos/asf/hive/blob/3813f39b/spark-client/src/test/java/org/apache/hive/spark/client/rpc/TestRpc.java
----------------------------------------------------------------------
diff --git a/spark-client/src/test/java/org/apache/hive/spark/client/rpc/TestRpc.java b/spark-client/src/test/java/org/apache/hive/spark/client/rpc/TestRpc.java
index 77c3d02..5a4801c 100644
--- a/spark-client/src/test/java/org/apache/hive/spark/client/rpc/TestRpc.java
+++ b/spark-client/src/test/java/org/apache/hive/spark/client/rpc/TestRpc.java
@@ -282,6 +282,9 @@ public class TestRpc {
     EmbeddedChannel client = (EmbeddedChannel) clientRpc.getChannel();
     EmbeddedChannel server = (EmbeddedChannel) serverRpc.getChannel();
 
+    server.runPendingTasks();
+    client.runPendingTasks();
+
     int count = 0;
     while (!client.outboundMessages().isEmpty()) {
       server.writeInbound(client.readOutbound());