You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by jd...@apache.org on 2020/02/10 20:36:27 UTC
[hive] branch master updated: HIVE-20312: Allow arrow clients to
use their own BufferAllocator with LlapOutputFormatService (Eric
Wohlstadter, reviewed by Teddy Choi)
This is an automated email from the ASF dual-hosted git repository.
jdere pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push:
new 46ed5c9 HIVE-20312: Allow arrow clients to use their own BufferAllocator with LlapOutputFormatService (Eric Wohlstadter, reviewed by Teddy Choi)
46ed5c9 is described below
commit 46ed5c9064eb1ff14c18cdbffe8f8338928505c5
Author: Eric Wohlstadter <wo...@gmail.com>
AuthorDate: Mon Feb 10 12:35:23 2020 -0800
HIVE-20312: Allow arrow clients to use their own BufferAllocator with LlapOutputFormatService (Eric Wohlstadter, reviewed by Teddy Choi)
---
.../hive/llap/LlapArrowBatchRecordReader.java | 15 +++++++++++--
.../hadoop/hive/llap/LlapArrowRowInputFormat.java | 14 +++++++++++-
.../hadoop/hive/llap/LlapBaseInputFormat.java | 25 ++++++++++++++++++----
3 files changed, 47 insertions(+), 7 deletions(-)
diff --git a/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapArrowBatchRecordReader.java b/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapArrowBatchRecordReader.java
index d9c5666..cb3d9cc 100644
--- a/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapArrowBatchRecordReader.java
+++ b/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapArrowBatchRecordReader.java
@@ -39,13 +39,21 @@ public class LlapArrowBatchRecordReader extends LlapBaseRecordReader<ArrowWrappe
private BufferAllocator allocator;
private ArrowStreamReader arrowStreamReader;
+ //Allows client to provide and manage their own arrow BufferAllocator
public LlapArrowBatchRecordReader(InputStream in, Schema schema, Class<ArrowWrapperWritable> clazz,
- JobConf job, Closeable client, Socket socket, long arrowAllocatorLimit) throws IOException {
+ JobConf job, Closeable client, Socket socket, BufferAllocator allocator) throws IOException {
super(in, schema, clazz, job, client, socket);
- allocator = RootAllocatorFactory.INSTANCE.getOrCreateRootAllocator(arrowAllocatorLimit);
+ this.allocator = allocator;
this.arrowStreamReader = new ArrowStreamReader(socket.getInputStream(), allocator);
}
+ //Use the global arrow BufferAllocator
+ public LlapArrowBatchRecordReader(InputStream in, Schema schema, Class<ArrowWrapperWritable> clazz,
+ JobConf job, Closeable client, Socket socket, long arrowAllocatorLimit) throws IOException {
+ this(in, schema, clazz, job, client, socket,
+ RootAllocatorFactory.INSTANCE.getOrCreateRootAllocator(arrowAllocatorLimit));
+ }
+
@Override
public boolean next(NullWritable key, ArrowWrapperWritable value) throws IOException {
try {
@@ -76,6 +84,9 @@ public class LlapArrowBatchRecordReader extends LlapBaseRecordReader<ArrowWrappe
@Override
public void close() throws IOException {
arrowStreamReader.close();
+ //allocator.close() will throw exception unless all buffers have been released
+ //See org.apache.arrow.memory.BaseAllocator.close()
+ allocator.close();
}
}
diff --git a/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapArrowRowInputFormat.java b/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapArrowRowInputFormat.java
index fafbdee..46566be 100644
--- a/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapArrowRowInputFormat.java
+++ b/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapArrowRowInputFormat.java
@@ -25,16 +25,28 @@ import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.RecordReader;
import org.apache.hadoop.mapred.Reporter;
import java.io.IOException;
+import org.apache.arrow.memory.BufferAllocator;
+import org.apache.hadoop.hive.ql.io.arrow.RootAllocatorFactory;
+import java.util.UUID;
/*
* Adapts an Arrow batch reader to a row reader
+ * Only used for testing
*/
public class LlapArrowRowInputFormat implements InputFormat<NullWritable, Row> {
private LlapBaseInputFormat baseInputFormat;
public LlapArrowRowInputFormat(long arrowAllocatorLimit) {
- baseInputFormat = new LlapBaseInputFormat(true, arrowAllocatorLimit);
+ BufferAllocator allocator = RootAllocatorFactory.INSTANCE.getOrCreateRootAllocator(arrowAllocatorLimit).newChildAllocator(
+ //allocator name, use UUID for testing
+ UUID.randomUUID().toString(),
+ //No use for reservation, allocators claim memory from the same pool,
+ //but allocate/releases are tracked per-allocator
+ 0,
+ //Limit passed in by client
+ arrowAllocatorLimit);
+ baseInputFormat = new LlapBaseInputFormat(true, allocator);
}
@Override
diff --git a/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapBaseInputFormat.java b/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapBaseInputFormat.java
index 5c99655..6bf7f33 100644
--- a/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapBaseInputFormat.java
+++ b/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapBaseInputFormat.java
@@ -66,6 +66,7 @@ import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.RecordReader;
import org.apache.hadoop.mapred.Reporter;
+import org.apache.arrow.memory.BufferAllocator;
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.TaskAttemptID;
import org.apache.hadoop.security.Credentials;
@@ -107,6 +108,7 @@ public class LlapBaseInputFormat<V extends WritableComparable<?>>
private String query;
private boolean useArrow;
private long arrowAllocatorLimit;
+ private BufferAllocator allocator;
private final Random rand = new Random();
public static final String URL_KEY = "llap.if.hs2.connection";
@@ -129,11 +131,17 @@ public class LlapBaseInputFormat<V extends WritableComparable<?>>
this.query = query;
}
+ //Exposed only for testing, clients should use LlapBaseInputFormat(boolean, BufferAllocator instead)
public LlapBaseInputFormat(boolean useArrow, long arrowAllocatorLimit) {
this.useArrow = useArrow;
this.arrowAllocatorLimit = arrowAllocatorLimit;
}
+ public LlapBaseInputFormat(boolean useArrow, BufferAllocator allocator) {
+ this.useArrow = useArrow;
+ this.allocator = allocator;
+ }
+
public LlapBaseInputFormat() {
this.useArrow = false;
}
@@ -214,10 +222,19 @@ public class LlapBaseInputFormat<V extends WritableComparable<?>>
@SuppressWarnings("rawtypes")
LlapBaseRecordReader recordReader;
if(useArrow) {
- recordReader = new LlapArrowBatchRecordReader(
- socket.getInputStream(), llapSplit.getSchema(),
- ArrowWrapperWritable.class, job, llapClient, socket,
- arrowAllocatorLimit);
+ if(allocator != null) {
+ //Client provided their own allocator
+ recordReader = new LlapArrowBatchRecordReader(
+ socket.getInputStream(), llapSplit.getSchema(),
+ ArrowWrapperWritable.class, job, llapClient, socket,
+ allocator);
+ } else {
+ //Client did not provide their own allocator, use constructor for global allocator
+ recordReader = new LlapArrowBatchRecordReader(
+ socket.getInputStream(), llapSplit.getSchema(),
+ ArrowWrapperWritable.class, job, llapClient, socket,
+ arrowAllocatorLimit);
+ }
} else {
recordReader = new LlapBaseRecordReader(socket.getInputStream(),
llapSplit.getSchema(), BytesWritable.class, job, llapClient, (java.io.Closeable)socket);