You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by jd...@apache.org on 2020/02/10 20:36:27 UTC

[hive] branch master updated: HIVE-20312: Allow arrow clients to use their own BufferAllocator with LlapOutputFormatService (Eric Wohlstadter, reviewed by Teddy Choi)

This is an automated email from the ASF dual-hosted git repository.

jdere pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git


The following commit(s) were added to refs/heads/master by this push:
     new 46ed5c9  HIVE-20312: Allow arrow clients to use their own BufferAllocator with LlapOutputFormatService (Eric Wohlstadter, reviewed by Teddy Choi)
46ed5c9 is described below

commit 46ed5c9064eb1ff14c18cdbffe8f8338928505c5
Author: Eric Wohlstadter <wo...@gmail.com>
AuthorDate: Mon Feb 10 12:35:23 2020 -0800

    HIVE-20312: Allow arrow clients to use their own BufferAllocator with LlapOutputFormatService (Eric Wohlstadter, reviewed by Teddy Choi)
---
 .../hive/llap/LlapArrowBatchRecordReader.java      | 15 +++++++++++--
 .../hadoop/hive/llap/LlapArrowRowInputFormat.java  | 14 +++++++++++-
 .../hadoop/hive/llap/LlapBaseInputFormat.java      | 25 ++++++++++++++++++----
 3 files changed, 47 insertions(+), 7 deletions(-)

diff --git a/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapArrowBatchRecordReader.java b/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapArrowBatchRecordReader.java
index d9c5666..cb3d9cc 100644
--- a/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapArrowBatchRecordReader.java
+++ b/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapArrowBatchRecordReader.java
@@ -39,13 +39,21 @@ public class LlapArrowBatchRecordReader extends LlapBaseRecordReader<ArrowWrappe
   private BufferAllocator allocator;
   private ArrowStreamReader arrowStreamReader;
 
+  //Allows client to provide and manage their own arrow BufferAllocator
   public LlapArrowBatchRecordReader(InputStream in, Schema schema, Class<ArrowWrapperWritable> clazz,
-      JobConf job, Closeable client, Socket socket, long arrowAllocatorLimit) throws IOException {
+      JobConf job, Closeable client, Socket socket, BufferAllocator allocator) throws IOException {
     super(in, schema, clazz, job, client, socket);
-    allocator = RootAllocatorFactory.INSTANCE.getOrCreateRootAllocator(arrowAllocatorLimit);
+    this.allocator = allocator;
     this.arrowStreamReader = new ArrowStreamReader(socket.getInputStream(), allocator);
   }
 
+  //Use the global arrow BufferAllocator
+  public LlapArrowBatchRecordReader(InputStream in, Schema schema, Class<ArrowWrapperWritable> clazz,
+      JobConf job, Closeable client, Socket socket, long arrowAllocatorLimit) throws IOException {
+    this(in, schema, clazz, job, client, socket,
+        RootAllocatorFactory.INSTANCE.getOrCreateRootAllocator(arrowAllocatorLimit));
+  }
+
   @Override
   public boolean next(NullWritable key, ArrowWrapperWritable value) throws IOException {
     try {
@@ -76,6 +84,9 @@ public class LlapArrowBatchRecordReader extends LlapBaseRecordReader<ArrowWrappe
   @Override
   public void close() throws IOException {
     arrowStreamReader.close();
+    //allocator.close() will throw exception unless all buffers have been released
+    //See org.apache.arrow.memory.BaseAllocator.close()
+    allocator.close();
   }
 
 }
diff --git a/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapArrowRowInputFormat.java b/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapArrowRowInputFormat.java
index fafbdee..46566be 100644
--- a/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapArrowRowInputFormat.java
+++ b/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapArrowRowInputFormat.java
@@ -25,16 +25,28 @@ import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.RecordReader;
 import org.apache.hadoop.mapred.Reporter;
 import java.io.IOException;
+import org.apache.arrow.memory.BufferAllocator;
+import org.apache.hadoop.hive.ql.io.arrow.RootAllocatorFactory;
+import java.util.UUID;
 
 /*
  * Adapts an Arrow batch reader to a row reader
+ * Only used for testing
  */
 public class LlapArrowRowInputFormat implements InputFormat<NullWritable, Row> {
 
   private LlapBaseInputFormat baseInputFormat;
 
   public LlapArrowRowInputFormat(long arrowAllocatorLimit) {
-    baseInputFormat = new LlapBaseInputFormat(true, arrowAllocatorLimit);
+    BufferAllocator allocator = RootAllocatorFactory.INSTANCE.getOrCreateRootAllocator(arrowAllocatorLimit).newChildAllocator(
+        //allocator name, use UUID for testing
+        UUID.randomUUID().toString(),
+        //No use for reservation, allocators claim memory from the same pool,
+        //but allocate/releases are tracked per-allocator
+        0,
+        //Limit passed in by client
+        arrowAllocatorLimit);
+    baseInputFormat = new LlapBaseInputFormat(true, allocator);
   }
 
   @Override
diff --git a/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapBaseInputFormat.java b/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapBaseInputFormat.java
index 5c99655..6bf7f33 100644
--- a/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapBaseInputFormat.java
+++ b/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapBaseInputFormat.java
@@ -66,6 +66,7 @@ import org.apache.hadoop.mapred.InputSplit;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.RecordReader;
 import org.apache.hadoop.mapred.Reporter;
+import org.apache.arrow.memory.BufferAllocator;
 import org.apache.hadoop.mapreduce.MRJobConfig;
 import org.apache.hadoop.mapreduce.TaskAttemptID;
 import org.apache.hadoop.security.Credentials;
@@ -107,6 +108,7 @@ public class LlapBaseInputFormat<V extends WritableComparable<?>>
   private String query;
   private boolean useArrow;
   private long arrowAllocatorLimit;
+  private BufferAllocator allocator;
   private final Random rand = new Random();
 
   public static final String URL_KEY = "llap.if.hs2.connection";
@@ -129,11 +131,17 @@ public class LlapBaseInputFormat<V extends WritableComparable<?>>
     this.query = query;
   }
 
+  //Exposed only for testing, clients should use LlapBaseInputFormat(boolean, BufferAllocator instead)
   public LlapBaseInputFormat(boolean useArrow, long arrowAllocatorLimit) {
     this.useArrow = useArrow;
     this.arrowAllocatorLimit = arrowAllocatorLimit;
   }
 
+  public LlapBaseInputFormat(boolean useArrow, BufferAllocator allocator) {
+    this.useArrow = useArrow;
+    this.allocator = allocator;
+  }
+
   public LlapBaseInputFormat() {
     this.useArrow = false;
   }
@@ -214,10 +222,19 @@ public class LlapBaseInputFormat<V extends WritableComparable<?>>
     @SuppressWarnings("rawtypes")
     LlapBaseRecordReader recordReader;
     if(useArrow) {
-      recordReader = new LlapArrowBatchRecordReader(
-          socket.getInputStream(), llapSplit.getSchema(),
-          ArrowWrapperWritable.class, job, llapClient, socket,
-          arrowAllocatorLimit);
+      if(allocator != null) {
+        //Client provided their own allocator
+        recordReader = new LlapArrowBatchRecordReader(
+            socket.getInputStream(), llapSplit.getSchema(),
+            ArrowWrapperWritable.class, job, llapClient, socket,
+            allocator);
+      } else {
+        //Client did not provide their own allocator, use constructor for global allocator
+        recordReader = new LlapArrowBatchRecordReader(
+            socket.getInputStream(), llapSplit.getSchema(),
+            ArrowWrapperWritable.class, job, llapClient, socket,
+            arrowAllocatorLimit);
+      }
     } else {
       recordReader = new LlapBaseRecordReader(socket.getInputStream(),
           llapSplit.getSchema(), BytesWritable.class, job, llapClient, (java.io.Closeable)socket);