You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ss...@apache.org on 2017/05/17 17:57:47 UTC
hive git commit: HIVE-16692. LLAP: Keep alive connection in shuffle
handler should not be closed until entire data is flushed out. (Rajesh
Balamohan, reviewed by Siddharth Seth)
Repository: hive
Updated Branches:
refs/heads/master c99549fba -> eead46c4c
HIVE-16692. LLAP: Keep alive connection in shuffle handler should not be closed until entire data is flushed out. (Rajesh Balamohan, reviewed by Siddharth Seth)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/eead46c4
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/eead46c4
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/eead46c4
Branch: refs/heads/master
Commit: eead46c4cdcff7b01e20d9abcc905e1520cec40f
Parents: c99549f
Author: Siddharth Seth <ss...@apache.org>
Authored: Wed May 17 10:57:19 2017 -0700
Committer: Siddharth Seth <ss...@apache.org>
Committed: Wed May 17 10:57:19 2017 -0700
----------------------------------------------------------------------
.../llap/shufflehandler/ShuffleHandler.java | 22 ++++++++++++++++++--
1 file changed, 20 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/eead46c4/llap-server/src/java/org/apache/hadoop/hive/llap/shufflehandler/ShuffleHandler.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/shufflehandler/ShuffleHandler.java b/llap-server/src/java/org/apache/hadoop/hive/llap/shufflehandler/ShuffleHandler.java
index 0705225..f63375c 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/shufflehandler/ShuffleHandler.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/shufflehandler/ShuffleHandler.java
@@ -209,6 +209,7 @@ public class ShuffleHandler implements AttemptRegistrationListener {
private static final AtomicBoolean started = new AtomicBoolean(false);
private static final AtomicBoolean initing = new AtomicBoolean(false);
private static ShuffleHandler INSTANCE;
+ private static final String TIMEOUT_HANDLER = "timeout";
final boolean connectionKeepAliveEnabled;
@@ -520,9 +521,16 @@ public class ShuffleHandler implements AttemptRegistrationListener {
}
private static class TimeoutHandler extends IdleStateAwareChannelHandler {
+
+ private boolean enabledTimeout;
+
+ void setEnabledTimeout(boolean enabledTimeout) {
+ this.enabledTimeout = enabledTimeout;
+ }
+
@Override
public void channelIdle(ChannelHandlerContext ctx, IdleStateEvent e) {
- if (e.getState() == IdleState.WRITER_IDLE) {
+ if (e.getState() == IdleState.WRITER_IDLE && enabledTimeout) {
e.getChannel().close();
}
}
@@ -564,7 +572,7 @@ public class ShuffleHandler implements AttemptRegistrationListener {
pipeline.addLast("chunking", new ChunkedWriteHandler());
pipeline.addLast("shuffle", SHUFFLE);
pipeline.addLast("idle", idleStateHandler);
- pipeline.addLast("timeout", new TimeoutHandler());
+ pipeline.addLast(TIMEOUT_HANDLER, new TimeoutHandler());
return pipeline;
// TODO factor security manager into pipeline
// TODO factor out encode/decode to permit binary shuffle
@@ -741,6 +749,13 @@ public class ShuffleHandler implements AttemptRegistrationListener {
Map<String, MapOutputInfo> mapOutputInfoMap =
new HashMap<String, MapOutputInfo>();
Channel ch = evt.getChannel();
+
+ // In case of KeepAlive, ensure that timeout handler does not close connection until entire
+ // response is written (i.e, response headers + mapOutput).
+ ChannelPipeline pipeline = ch.getPipeline();
+ TimeoutHandler timeoutHandler = (TimeoutHandler)pipeline.get(TIMEOUT_HANDLER);
+ timeoutHandler.setEnabledTimeout(false);
+
String user = userRsrc.get(jobId);
try {
@@ -781,6 +796,9 @@ public class ShuffleHandler implements AttemptRegistrationListener {
// If Keep alive is enabled, do not close the connection.
if (!keepAliveParam && !connectionKeepAliveEnabled) {
lastMap.addListener(ChannelFutureListener.CLOSE);
+ } else {
+ // Entire response is written out. Safe to enable timeout handling.
+ timeoutHandler.setEnabledTimeout(true);
}
}