You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by pr...@apache.org on 2016/05/31 20:55:07 UTC
hive git commit: HIVE-13751: LlapOutputFormatService should have a
configurable send buffer size (Prasanth Jayachandran reviewed by Jason Dere)
Repository: hive
Updated Branches:
refs/heads/master a55f4a3a4 -> 57d1f3d85
HIVE-13751: LlapOutputFormatService should have a configurable send buffer size (Prasanth Jayachandran reviewed by Jason Dere)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/57d1f3d8
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/57d1f3d8
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/57d1f3d8
Branch: refs/heads/master
Commit: 57d1f3d85e0fe04e2c84c907d713593958c79a1f
Parents: a55f4a3
Author: Prasanth Jayachandran <pr...@apache.org>
Authored: Tue May 31 13:54:55 2016 -0700
Committer: Prasanth Jayachandran <pr...@apache.org>
Committed: Tue May 31 13:54:55 2016 -0700
----------------------------------------------------------------------
.../org/apache/hadoop/hive/conf/HiveConf.java | 2 ++
.../hive/llap/LlapOutputFormatService.java | 24 +++++++++++++++-----
2 files changed, 20 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/57d1f3d8/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index 8706665..b2e6b6f 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -2874,6 +2874,8 @@ public class HiveConf extends Configuration {
"protocol or ZK paths), similar to how ssh refuses a key with bad access permissions."),
LLAP_DAEMON_OUTPUT_SERVICE_PORT("hive.llap.daemon.output.service.port", 15003,
"LLAP daemon output service port"),
+ LLAP_DAEMON_OUTPUT_SERVICE_SEND_BUFFER_SIZE("hive.llap.daemon.output.service.send.buffer.size",
+ 128 * 1024, "Send buffer size to be used by LLAP daemon output service"),
LLAP_ENABLE_GRACE_JOIN_IN_LLAP("hive.llap.enable.grace.join.in.llap", false,
"Override if grace join should be allowed to run in llap."),
http://git-wip-us.apache.org/repos/asf/hive/blob/57d1f3d8/ql/src/java/org/apache/hadoop/hive/llap/LlapOutputFormatService.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/llap/LlapOutputFormatService.java b/ql/src/java/org/apache/hadoop/hive/llap/LlapOutputFormatService.java
index f852041..06660b3 100644
--- a/ql/src/java/org/apache/hadoop/hive/llap/LlapOutputFormatService.java
+++ b/ql/src/java/org/apache/hadoop/hive/llap/LlapOutputFormatService.java
@@ -109,15 +109,18 @@ public class LlapOutputFormatService {
LOG.info("Starting LlapOutputFormatService");
int portFromConf = HiveConf.getIntVar(conf, HiveConf.ConfVars.LLAP_DAEMON_OUTPUT_SERVICE_PORT);
+ int sendBufferSize = HiveConf.getIntVar(conf,
+ HiveConf.ConfVars.LLAP_DAEMON_OUTPUT_SERVICE_SEND_BUFFER_SIZE);
eventLoopGroup = new NioEventLoopGroup(1);
serverBootstrap = new ServerBootstrap();
serverBootstrap.group(eventLoopGroup);
serverBootstrap.channel(NioServerSocketChannel.class);
- serverBootstrap.childHandler(new LlapOutputFormatServiceChannelHandler());
+ serverBootstrap.childHandler(new LlapOutputFormatServiceChannelHandler(sendBufferSize));
try {
listeningChannelFuture = serverBootstrap.bind(portFromConf).sync();
this.port = ((InetSocketAddress) listeningChannelFuture.channel().localAddress()).getPort();
- LOG.info("LlapOutputFormatService: Binding to port " + this.port);
+ LOG.info("LlapOutputFormatService: Binding to port: {} with send buffer size: {} ", this.port,
+ sendBufferSize);
} catch (InterruptedException err) {
throw new IOException("LlapOutputFormatService: Error binding to port " + portFromConf, err);
}
@@ -154,6 +157,11 @@ public class LlapOutputFormatService {
}
protected class LlapOutputFormatServiceHandler extends SimpleChannelInboundHandler<String> {
+ private final int sendBufferSize;
+ public LlapOutputFormatServiceHandler(final int sendBufferSize) {
+ this.sendBufferSize = sendBufferSize;
+ }
+
@Override
public void channelRead0(ChannelHandlerContext ctx, String msg) {
String id = msg;
@@ -162,9 +170,8 @@ public class LlapOutputFormatService {
private void registerReader(ChannelHandlerContext ctx, String id) {
synchronized(INSTANCE) {
- LOG.debug("registering socket for: "+id);
- int bufSize = 128 * 1024; // configable?
- OutputStream stream = new ChannelOutputStream(ctx, id, bufSize);
+ LOG.debug("registering socket for: " + id);
+ OutputStream stream = new ChannelOutputStream(ctx, id, sendBufferSize);
LlapRecordWriter writer = new LlapRecordWriter(stream);
writers.put(id, writer);
@@ -198,13 +205,18 @@ public class LlapOutputFormatService {
}
protected class LlapOutputFormatServiceChannelHandler extends ChannelInitializer<SocketChannel> {
+ private final int sendBufferSize;
+ public LlapOutputFormatServiceChannelHandler(final int sendBufferSize) {
+ this.sendBufferSize = sendBufferSize;
+ }
+
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(
new DelimiterBasedFrameDecoder(MAX_QUERY_ID_LENGTH, Delimiters.nulDelimiter()),
new StringDecoder(),
new StringEncoder(),
- new LlapOutputFormatServiceHandler());
+ new LlapOutputFormatServiceHandler(sendBufferSize));
}
}
}