You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by jd...@apache.org on 2018/07/11 17:47:30 UTC
hive git commit: HIVE-20093: LlapOutputFomatService: Use ArrowBuf
with Netty for Accounting (Eric Wohlstadter, reviewed by Jason Dere)
Repository: hive
Updated Branches:
refs/heads/master 40635f7e3 -> 385a26ad8
HIVE-20093: LlapOutputFomatService: Use ArrowBuf with Netty for Accounting (Eric Wohlstadter, reviewed by Jason Dere)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/385a26ad
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/385a26ad
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/385a26ad
Branch: refs/heads/master
Commit: 385a26ad89a29492ff55bcba32ec104af54bf139
Parents: 40635f7
Author: Jason Dere <jd...@hortonworks.com>
Authored: Wed Jul 11 10:47:01 2018 -0700
Committer: Jason Dere <jd...@hortonworks.com>
Committed: Wed Jul 11 10:47:01 2018 -0700
----------------------------------------------------------------------
.../apache/hadoop/hive/llap/LlapOutputFormatService.java | 4 +++-
.../hadoop/hive/llap/WritableByteChannelAdapter.java | 11 ++++++++---
2 files changed, 11 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/385a26ad/ql/src/java/org/apache/hadoop/hive/llap/LlapOutputFormatService.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/llap/LlapOutputFormatService.java b/ql/src/java/org/apache/hadoop/hive/llap/LlapOutputFormatService.java
index c71c637..996f8b3 100644
--- a/ql/src/java/org/apache/hadoop/hive/llap/LlapOutputFormatService.java
+++ b/ql/src/java/org/apache/hadoop/hive/llap/LlapOutputFormatService.java
@@ -199,10 +199,12 @@ public class LlapOutputFormatService {
int maxPendingWrites = HiveConf.getIntVar(conf,
HiveConf.ConfVars.LLAP_DAEMON_OUTPUT_SERVICE_MAX_PENDING_WRITES);
boolean useArrow = HiveConf.getBoolVar(conf, HiveConf.ConfVars.LLAP_OUTPUT_FORMAT_ARROW);
+ long allocatorMax = HiveConf.getLongVar(conf,
+ HiveConf.ConfVars.HIVE_ARROW_ROOT_ALLOCATOR_LIMIT);
@SuppressWarnings("rawtypes")
RecordWriter writer = null;
if(useArrow) {
- writer = new LlapArrowRecordWriter(new WritableByteChannelAdapter(ctx, maxPendingWrites, id));
+ writer = new LlapArrowRecordWriter(new WritableByteChannelAdapter(ctx, maxPendingWrites, id, allocatorMax));
} else {
writer = new LlapRecordWriter(id,
new ChunkedOutputStream(
http://git-wip-us.apache.org/repos/asf/hive/blob/385a26ad/ql/src/java/org/apache/hadoop/hive/llap/WritableByteChannelAdapter.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/llap/WritableByteChannelAdapter.java b/ql/src/java/org/apache/hadoop/hive/llap/WritableByteChannelAdapter.java
index 57da1d9..753da22 100644
--- a/ql/src/java/org/apache/hadoop/hive/llap/WritableByteChannelAdapter.java
+++ b/ql/src/java/org/apache/hadoop/hive/llap/WritableByteChannelAdapter.java
@@ -18,7 +18,8 @@
package org.apache.hadoop.hive.llap;
-import io.netty.buffer.Unpooled;
+import org.apache.hadoop.hive.ql.io.arrow.RootAllocatorFactory;
+import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import java.io.IOException;
@@ -48,6 +49,7 @@ public class WritableByteChannelAdapter implements WritableByteChannel {
private final Semaphore writeResources;
private boolean closed = false;
private final String id;
+ private long allocatorMax;
private ChannelFutureListener writeListener = new ChannelFutureListener() {
@Override
@@ -75,11 +77,12 @@ public class WritableByteChannelAdapter implements WritableByteChannel {
}
};
- public WritableByteChannelAdapter(ChannelHandlerContext chc, int maxPendingWrites, String id) {
+ public WritableByteChannelAdapter(ChannelHandlerContext chc, int maxPendingWrites, String id, long allocatorMax) {
this.chc = chc;
this.maxPendingWrites = maxPendingWrites;
this.writeResources = new Semaphore(maxPendingWrites);
this.id = id;
+ this.allocatorMax = allocatorMax;
}
@Override
@@ -87,7 +90,9 @@ public class WritableByteChannelAdapter implements WritableByteChannel {
int size = src.remaining();
//Down the semaphore or block until available
takeWriteResources(1);
- chc.writeAndFlush(Unpooled.wrappedBuffer(src)).addListener(writeListener);
+ ByteBuf buf = RootAllocatorFactory.INSTANCE.getOrCreateRootAllocator(allocatorMax).buffer(size);
+ buf.writeBytes(src);
+ chc.writeAndFlush(buf).addListener(writeListener);
return size;
}