You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by jd...@apache.org on 2020/03/05 09:32:24 UTC

[hive] branch master updated: HIVE-22973: Handle 0 length batches in LlapArrowRowRecordReader (Shubham Chaurasia, reviewed by Jason Dere)

This is an automated email from the ASF dual-hosted git repository.

jdere pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git


The following commit(s) were added to refs/heads/master by this push:
     new 20e50f2  HIVE-22973: Handle 0 length batches in LlapArrowRowRecordReader (Shubham Chaurasia, reviewed by Jason Dere)
20e50f2 is described below

commit 20e50f2dc33c6a980d53d220354fd3c2ee0341fe
Author: Shubham Chaurasia <sc...@cloudera.com>
AuthorDate: Thu Mar 5 01:31:34 2020 -0800

    HIVE-22973: Handle 0 length batches in LlapArrowRowRecordReader (Shubham Chaurasia, reviewed by Jason Dere)
---
 .../org/apache/hive/jdbc/BaseJdbcWithMiniLlap.java | 45 ++++++++++++++++++++++
 .../hadoop/hive/llap/LlapArrowRowRecordReader.java | 22 +++++++----
 2 files changed, 59 insertions(+), 8 deletions(-)

diff --git a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/BaseJdbcWithMiniLlap.java b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/BaseJdbcWithMiniLlap.java
index 0fac1e4..3c0532c 100644
--- a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/BaseJdbcWithMiniLlap.java
+++ b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/BaseJdbcWithMiniLlap.java
@@ -73,6 +73,7 @@ public abstract class BaseJdbcWithMiniLlap {
   private static String dataFileDir;
   private static Path kvDataFilePath;
   private static Path dataTypesFilePath;
+  private static Path over10KFilePath;
 
   protected static MiniHS2 miniHS2 = null;
   protected static HiveConf conf = null;
@@ -86,6 +87,7 @@ public abstract class BaseJdbcWithMiniLlap {
     dataFileDir = conf.get("test.data.files").replace('\\', '/').replace("c:", "");
     kvDataFilePath = new Path(dataFileDir, "kv1.txt");
     dataTypesFilePath = new Path(dataFileDir, "datatypes.txt");
+    over10KFilePath = new Path(dataFileDir, "over10k");
     Map<String, String> confOverlay = new HashMap<String, String>();
     miniHS2.start(confOverlay);
     miniHS2.getDFS().getFileSystem().mkdirs(new Path("/apps_staging_dir/anonymous"));
@@ -185,6 +187,21 @@ public abstract class BaseJdbcWithMiniLlap {
     stmt.close();
   }
 
+  protected void createOver10KTable(String tableName) throws Exception {
+    try (Statement stmt = hs2Conn.createStatement()) {
+
+      String createQuery =
+          "create table " + tableName + " (t tinyint, si smallint, i int, b bigint, f float, d double, bo boolean, "
+              + "s string, ts timestamp, `dec` decimal(4,2), bin binary) row format delimited fields terminated by '|'";
+
+      // create table
+      stmt.execute("DROP TABLE IF EXISTS " + tableName);
+      stmt.execute(createQuery);
+      // load data
+      stmt.execute("load data local inpath '" + over10KFilePath.toString() + "' into table " + tableName);
+    }
+  }
+
   @Test(timeout = 60000)
   public void testLlapInputFormatEndToEnd() throws Exception {
     createTestTable("testtab1");
@@ -206,6 +223,34 @@ public abstract class BaseJdbcWithMiniLlap {
     assertEquals(0, rowCount);
   }
 
+  @Test(timeout = 300000)
+  public void testLlapInputFormatEndToEndWithMultipleBatches() throws Exception {
+    String tableName = "over10k_table";
+
+    createOver10KTable(tableName);
+
+    int rowCount;
+
+    // Try with more than one batch
+    RowCollector rowCollector = new RowCollector();
+    String query = "select * from " + tableName;
+    rowCount = processQuery(query, 1, rowCollector);
+    assertEquals(9999, rowCount);
+
+    // Try with less than one batch
+    rowCollector.rows.clear();
+    query = "select * from " + tableName + " where s = 'rachel brown'";
+    rowCount = processQuery(query, 1, rowCollector);
+    assertEquals(17, rowCount);
+
+    // Try empty rows query
+    rowCollector.rows.clear();
+    query = "select * from " + tableName + " where false";
+    rowCount = processQuery(query, 1, rowCollector);
+    assertEquals(0, rowCount);
+  }
+
+
   @Test(timeout = 60000)
   public void testNonAsciiStrings() throws Exception {
     createTestTable("testtab_nonascii");
diff --git a/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapArrowRowRecordReader.java b/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapArrowRowRecordReader.java
index d4179d5..24a82c7 100644
--- a/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapArrowRowRecordReader.java
+++ b/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapArrowRowRecordReader.java
@@ -61,16 +61,22 @@ public class LlapArrowRowRecordReader extends LlapRowRecordReader {
       //This is either the first batch or we've used up the current batch buffer
       batchSize = 0;
       rowIndex = 0;
-      hasNext = reader.next(key, data);
-      if(hasNext) {
+
+      // since HIVE-22856, a zero length batch doesn't mean that we won't have any more batches
+      // we can have more batches with data even after after a zero length batch
+      // we should keep trying until we get a batch with some data or reader.next() returns false
+      while (batchSize == 0 && (hasNext = reader.next(key, data))) {
+        List<FieldVector> vectors = batchData.getVectorSchemaRoot().getFieldVectors();
+        //hasNext implies there is some column in the batch
+        Preconditions.checkState(vectors.size() > 0);
+        //All the vectors have the same length,
+        //we can get the number of rows from the first vector
+        batchSize = vectors.get(0).getValueCount();
+      }
+
+      if (hasNext) {
         //There is another batch to buffer
         try {
-          List<FieldVector> vectors = batchData.getVectorSchemaRoot().getFieldVectors();
-          //hasNext implies there is some column in the batch
-          Preconditions.checkState(vectors.size() > 0);
-          //All the vectors have the same length,
-          //we can get the number of rows from the first vector
-          batchSize = vectors.get(0).getValueCount();
           ArrowWrapperWritable wrapper = new ArrowWrapperWritable(batchData.getVectorSchemaRoot());
           currentBatch = (Object[][]) serde.deserialize(wrapper);
           StructObjectInspector rowOI = (StructObjectInspector) serde.getObjectInspector();