You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by pr...@apache.org on 2016/05/31 20:55:07 UTC

hive git commit: HIVE-13751: LlapOutputFormatService should have a configurable send buffer size (Prasanth Jayachandran reviewed by Jason Dere)

Repository: hive
Updated Branches:
  refs/heads/master a55f4a3a4 -> 57d1f3d85


HIVE-13751: LlapOutputFormatService should have a configurable send buffer size (Prasanth Jayachandran reviewed by Jason Dere)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/57d1f3d8
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/57d1f3d8
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/57d1f3d8

Branch: refs/heads/master
Commit: 57d1f3d85e0fe04e2c84c907d713593958c79a1f
Parents: a55f4a3
Author: Prasanth Jayachandran <pr...@apache.org>
Authored: Tue May 31 13:54:55 2016 -0700
Committer: Prasanth Jayachandran <pr...@apache.org>
Committed: Tue May 31 13:54:55 2016 -0700

----------------------------------------------------------------------
 .../org/apache/hadoop/hive/conf/HiveConf.java   |  2 ++
 .../hive/llap/LlapOutputFormatService.java      | 24 +++++++++++++++-----
 2 files changed, 20 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/57d1f3d8/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index 8706665..b2e6b6f 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -2874,6 +2874,8 @@ public class HiveConf extends Configuration {
         "protocol or ZK paths), similar to how ssh refuses a key with bad access permissions."),
     LLAP_DAEMON_OUTPUT_SERVICE_PORT("hive.llap.daemon.output.service.port", 15003,
         "LLAP daemon output service port"),
+    LLAP_DAEMON_OUTPUT_SERVICE_SEND_BUFFER_SIZE("hive.llap.daemon.output.service.send.buffer.size",
+        128 * 1024, "Send buffer size to be used by LLAP daemon output service"),
     LLAP_ENABLE_GRACE_JOIN_IN_LLAP("hive.llap.enable.grace.join.in.llap", false,
         "Override if grace join should be allowed to run in llap."),
 

http://git-wip-us.apache.org/repos/asf/hive/blob/57d1f3d8/ql/src/java/org/apache/hadoop/hive/llap/LlapOutputFormatService.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/llap/LlapOutputFormatService.java b/ql/src/java/org/apache/hadoop/hive/llap/LlapOutputFormatService.java
index f852041..06660b3 100644
--- a/ql/src/java/org/apache/hadoop/hive/llap/LlapOutputFormatService.java
+++ b/ql/src/java/org/apache/hadoop/hive/llap/LlapOutputFormatService.java
@@ -109,15 +109,18 @@ public class LlapOutputFormatService {
     LOG.info("Starting LlapOutputFormatService");
 
     int portFromConf = HiveConf.getIntVar(conf, HiveConf.ConfVars.LLAP_DAEMON_OUTPUT_SERVICE_PORT);
+    int sendBufferSize = HiveConf.getIntVar(conf,
+        HiveConf.ConfVars.LLAP_DAEMON_OUTPUT_SERVICE_SEND_BUFFER_SIZE);
     eventLoopGroup = new NioEventLoopGroup(1);
     serverBootstrap = new ServerBootstrap();
     serverBootstrap.group(eventLoopGroup);
     serverBootstrap.channel(NioServerSocketChannel.class);
-    serverBootstrap.childHandler(new LlapOutputFormatServiceChannelHandler());
+    serverBootstrap.childHandler(new LlapOutputFormatServiceChannelHandler(sendBufferSize));
     try {
       listeningChannelFuture = serverBootstrap.bind(portFromConf).sync();
       this.port = ((InetSocketAddress) listeningChannelFuture.channel().localAddress()).getPort();
-      LOG.info("LlapOutputFormatService: Binding to port " + this.port);
+      LOG.info("LlapOutputFormatService: Binding to port: {} with send buffer size: {} ", this.port,
+          sendBufferSize);
     } catch (InterruptedException err) {
       throw new IOException("LlapOutputFormatService: Error binding to port " + portFromConf, err);
     }
@@ -154,6 +157,11 @@ public class LlapOutputFormatService {
   }
 
   protected class LlapOutputFormatServiceHandler extends SimpleChannelInboundHandler<String> {
+    private final int sendBufferSize;
+    public LlapOutputFormatServiceHandler(final int sendBufferSize) {
+      this.sendBufferSize = sendBufferSize;
+    }
+
     @Override
     public void channelRead0(ChannelHandlerContext ctx, String msg) {
       String id = msg;
@@ -162,9 +170,8 @@ public class LlapOutputFormatService {
 
     private void registerReader(ChannelHandlerContext ctx, String id) {
       synchronized(INSTANCE) {
-        LOG.debug("registering socket for: "+id);
-        int bufSize = 128 * 1024; // configable?
-        OutputStream stream = new ChannelOutputStream(ctx, id, bufSize);
+        LOG.debug("registering socket for: " + id);
+        OutputStream stream = new ChannelOutputStream(ctx, id, sendBufferSize);
         LlapRecordWriter writer = new LlapRecordWriter(stream);
         writers.put(id, writer);
 
@@ -198,13 +205,18 @@ public class LlapOutputFormatService {
   }
 
   protected class LlapOutputFormatServiceChannelHandler extends ChannelInitializer<SocketChannel> {
+    private final int sendBufferSize;
+    public LlapOutputFormatServiceChannelHandler(final int sendBufferSize) {
+      this.sendBufferSize = sendBufferSize;
+    }
+
     @Override
     public void initChannel(SocketChannel ch) throws Exception {
       ch.pipeline().addLast(
           new DelimiterBasedFrameDecoder(MAX_QUERY_ID_LENGTH, Delimiters.nulDelimiter()),
           new StringDecoder(),
           new StringEncoder(),
-          new LlapOutputFormatServiceHandler());
+          new LlapOutputFormatServiceHandler(sendBufferSize));
     }
   }
 }