You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by se...@apache.org on 2017/02/27 20:24:16 UTC
[14/20] hive git commit: HIVE-15859: HoS: Write RPC messages in event
loop (Rui reviewed by Xuefu)
HIVE-15859: HoS: Write RPC messages in event loop (Rui reviewed by Xuefu)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/3813f39b
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/3813f39b
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/3813f39b
Branch: refs/heads/hive-14535
Commit: 3813f39b7a066e4a4e15300aa07b1c9ece61a0a0
Parents: 26356be
Author: Rui Li <li...@apache.org>
Authored: Mon Feb 27 13:29:53 2017 +0800
Committer: Rui Li <li...@apache.org>
Committed: Mon Feb 27 13:29:53 2017 +0800
----------------------------------------------------------------------
.../java/org/apache/hive/spark/client/rpc/Rpc.java | 17 +++++++++--------
.../hive/spark/client/rpc/RpcDispatcher.java | 7 +------
.../org/apache/hive/spark/client/rpc/TestRpc.java | 3 +++
3 files changed, 13 insertions(+), 14 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/3813f39b/spark-client/src/main/java/org/apache/hive/spark/client/rpc/Rpc.java
----------------------------------------------------------------------
diff --git a/spark-client/src/main/java/org/apache/hive/spark/client/rpc/Rpc.java b/spark-client/src/main/java/org/apache/hive/spark/client/rpc/Rpc.java
index 0489684..5dde16c 100644
--- a/spark-client/src/main/java/org/apache/hive/spark/client/rpc/Rpc.java
+++ b/spark-client/src/main/java/org/apache/hive/spark/client/rpc/Rpc.java
@@ -221,7 +221,6 @@ public class Rpc implements Closeable {
private final Channel channel;
private final Collection<Listener> listeners;
private final EventExecutorGroup egroup;
- private final Object channelLock;
private volatile RpcDispatcher dispatcher;
private Rpc(RpcConfiguration config, Channel channel, EventExecutorGroup egroup) {
@@ -229,7 +228,6 @@ public class Rpc implements Closeable {
Preconditions.checkArgument(egroup != null);
this.config = config;
this.channel = channel;
- this.channelLock = new Object();
this.dispatcher = null;
this.egroup = egroup;
this.listeners = Lists.newLinkedList();
@@ -271,13 +269,13 @@ public class Rpc implements Closeable {
* @param retType Type of expected reply.
* @return A future used to monitor the operation.
*/
- public <T> Future<T> call(Object msg, Class<T> retType) {
+ public <T> Future<T> call(final Object msg, Class<T> retType) {
Preconditions.checkArgument(msg != null);
Preconditions.checkState(channel.isActive(), "RPC channel is closed.");
try {
final long id = rpcId.getAndIncrement();
final Promise<T> promise = createPromise();
- ChannelFutureListener listener = new ChannelFutureListener() {
+ final ChannelFutureListener listener = new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture cf) {
if (!cf.isSuccess() && !promise.isDone()) {
@@ -290,10 +288,13 @@ public class Rpc implements Closeable {
};
dispatcher.registerRpc(id, promise, msg.getClass().getName());
- synchronized (channelLock) {
- channel.write(new MessageHeader(id, Rpc.MessageType.CALL)).addListener(listener);
- channel.writeAndFlush(msg).addListener(listener);
- }
+ channel.eventLoop().submit(new Runnable() {
+ @Override
+ public void run() {
+ channel.write(new MessageHeader(id, Rpc.MessageType.CALL)).addListener(listener);
+ channel.writeAndFlush(msg).addListener(listener);
+ }
+ });
return promise;
} catch (Exception e) {
throw Throwables.propagate(e);
http://git-wip-us.apache.org/repos/asf/hive/blob/3813f39b/spark-client/src/main/java/org/apache/hive/spark/client/rpc/RpcDispatcher.java
----------------------------------------------------------------------
diff --git a/spark-client/src/main/java/org/apache/hive/spark/client/rpc/RpcDispatcher.java b/spark-client/src/main/java/org/apache/hive/spark/client/rpc/RpcDispatcher.java
index ebafd13..2b6ab29 100644
--- a/spark-client/src/main/java/org/apache/hive/spark/client/rpc/RpcDispatcher.java
+++ b/spark-client/src/main/java/org/apache/hive/spark/client/rpc/RpcDispatcher.java
@@ -152,12 +152,7 @@ public abstract class RpcDispatcher extends SimpleChannelInboundHandler<Object>
@Override
public final void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
- if (LOG.isDebugEnabled()) {
- LOG.debug(String.format("[%s] Caught exception in channel pipeline.", name()), cause);
- } else {
- LOG.info("[{}] Closing channel due to exception in pipeline ({}).", name(),
- cause.getMessage());
- }
+ LOG.error(String.format("[%s] Closing channel due to exception in pipeline.", name()), cause);
if (lastHeader != null) {
// There's an RPC waiting for a reply. Exception was most probably caught while processing
http://git-wip-us.apache.org/repos/asf/hive/blob/3813f39b/spark-client/src/test/java/org/apache/hive/spark/client/rpc/TestRpc.java
----------------------------------------------------------------------
diff --git a/spark-client/src/test/java/org/apache/hive/spark/client/rpc/TestRpc.java b/spark-client/src/test/java/org/apache/hive/spark/client/rpc/TestRpc.java
index 77c3d02..5a4801c 100644
--- a/spark-client/src/test/java/org/apache/hive/spark/client/rpc/TestRpc.java
+++ b/spark-client/src/test/java/org/apache/hive/spark/client/rpc/TestRpc.java
@@ -282,6 +282,9 @@ public class TestRpc {
EmbeddedChannel client = (EmbeddedChannel) clientRpc.getChannel();
EmbeddedChannel server = (EmbeddedChannel) serverRpc.getChannel();
+ server.runPendingTasks();
+ client.runPendingTasks();
+
int count = 0;
while (!client.outboundMessages().isEmpty()) {
server.writeInbound(client.readOutbound());