You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ss...@apache.org on 2017/05/17 17:57:47 UTC

hive git commit: HIVE-16692. LLAP: Keep alive connection in shuffle handler should not be closed until entire data is flushed out. (Rajesh Balamohan, reviewed by Siddharth Seth)

Repository: hive
Updated Branches:
  refs/heads/master c99549fba -> eead46c4c


HIVE-16692. LLAP: Keep alive connection in shuffle handler should not be closed until entire data is flushed out. (Rajesh Balamohan, reviewed by Siddharth Seth)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/eead46c4
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/eead46c4
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/eead46c4

Branch: refs/heads/master
Commit: eead46c4cdcff7b01e20d9abcc905e1520cec40f
Parents: c99549f
Author: Siddharth Seth <ss...@apache.org>
Authored: Wed May 17 10:57:19 2017 -0700
Committer: Siddharth Seth <ss...@apache.org>
Committed: Wed May 17 10:57:19 2017 -0700

----------------------------------------------------------------------
 .../llap/shufflehandler/ShuffleHandler.java     | 22 ++++++++++++++++++--
 1 file changed, 20 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/eead46c4/llap-server/src/java/org/apache/hadoop/hive/llap/shufflehandler/ShuffleHandler.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/shufflehandler/ShuffleHandler.java b/llap-server/src/java/org/apache/hadoop/hive/llap/shufflehandler/ShuffleHandler.java
index 0705225..f63375c 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/shufflehandler/ShuffleHandler.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/shufflehandler/ShuffleHandler.java
@@ -209,6 +209,7 @@ public class ShuffleHandler implements AttemptRegistrationListener {
   private static final AtomicBoolean started = new AtomicBoolean(false);
   private static final AtomicBoolean initing = new AtomicBoolean(false);
   private static ShuffleHandler INSTANCE;
+  private static final String TIMEOUT_HANDLER = "timeout";
 
 
   final boolean connectionKeepAliveEnabled;
@@ -520,9 +521,16 @@ public class ShuffleHandler implements AttemptRegistrationListener {
   }
 
   private static class TimeoutHandler extends IdleStateAwareChannelHandler {
+
+    private boolean enabledTimeout;
+
+    void setEnabledTimeout(boolean enabledTimeout) {
+      this.enabledTimeout = enabledTimeout;
+    }
+
     @Override
     public void channelIdle(ChannelHandlerContext ctx, IdleStateEvent e) {
-      if (e.getState() == IdleState.WRITER_IDLE) {
+      if (e.getState() == IdleState.WRITER_IDLE && enabledTimeout) {
         e.getChannel().close();
       }
     }
@@ -564,7 +572,7 @@ public class ShuffleHandler implements AttemptRegistrationListener {
       pipeline.addLast("chunking", new ChunkedWriteHandler());
       pipeline.addLast("shuffle", SHUFFLE);
       pipeline.addLast("idle", idleStateHandler);
-      pipeline.addLast("timeout", new TimeoutHandler());
+      pipeline.addLast(TIMEOUT_HANDLER, new TimeoutHandler());
       return pipeline;
       // TODO factor security manager into pipeline
       // TODO factor out encode/decode to permit binary shuffle
@@ -741,6 +749,13 @@ public class ShuffleHandler implements AttemptRegistrationListener {
       Map<String, MapOutputInfo> mapOutputInfoMap =
           new HashMap<String, MapOutputInfo>();
       Channel ch = evt.getChannel();
+
+      // In case of KeepAlive, ensure that timeout handler does not close connection until entire
+      // response is written (i.e, response headers + mapOutput).
+      ChannelPipeline pipeline = ch.getPipeline();
+      TimeoutHandler timeoutHandler = (TimeoutHandler)pipeline.get(TIMEOUT_HANDLER);
+      timeoutHandler.setEnabledTimeout(false);
+
       String user = userRsrc.get(jobId);
 
       try {
@@ -781,6 +796,9 @@ public class ShuffleHandler implements AttemptRegistrationListener {
       // If Keep alive is enabled, do not close the connection.
       if (!keepAliveParam && !connectionKeepAliveEnabled) {
         lastMap.addListener(ChannelFutureListener.CLOSE);
+      } else {
+        // Entire response is written out. Safe to enable timeout handling.
+        timeoutHandler.setEnabledTimeout(true);
       }
     }