You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by sa...@apache.org on 2019/07/09 03:53:33 UTC

[hive] branch master updated: HIVE-21966: Llap external client - Arrow Serializer throws ArrayIndexOutOfBoundsException in some cases (Shubham Chaurasia, reviewed by Sankar Hariappan)

This is an automated email from the ASF dual-hosted git repository.

sankarh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git


The following commit(s) were added to refs/heads/master by this push:
     new f923b74  HIVE-21966: Llap external client - Arrow Serializer throws ArrayIndexOutOfBoundsException in some cases (Shubham Chaurasia, reviewed by Sankar Hariappan)
f923b74 is described below

commit f923b7490a6085292e2d05425dcdc83b994d42e4
Author: Shubham Chaurasia <sc...@cloudera.com>
AuthorDate: Tue Jul 9 09:23:06 2019 +0530

    HIVE-21966: Llap external client - Arrow Serializer throws ArrayIndexOutOfBoundsException in some cases (Shubham Chaurasia, reviewed by Sankar Hariappan)
    
    Signed-off-by: Sankar Hariappan <sa...@apache.org>
---
 .../org/apache/hive/jdbc/BaseJdbcWithMiniLlap.java |   2 +-
 .../hive/jdbc/TestJdbcWithMiniLlapVectorArrow.java | 176 +++++++++++++++++++++
 .../apache/hadoop/hive/ql/io/arrow/Serializer.java |  67 +++++++-
 3 files changed, 237 insertions(+), 8 deletions(-)

diff --git a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/BaseJdbcWithMiniLlap.java b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/BaseJdbcWithMiniLlap.java
index ab79b42..8467cea 100644
--- a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/BaseJdbcWithMiniLlap.java
+++ b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/BaseJdbcWithMiniLlap.java
@@ -107,7 +107,7 @@ public abstract class BaseJdbcWithMiniLlap {
   private static Path dataTypesFilePath;
 
   protected static HiveConf conf = null;
-  private static Connection hs2Conn = null;
+  protected static Connection hs2Conn = null;
 
   // This method should be called by sub-classes in a @BeforeClass initializer
   public static MiniHS2 beforeTest(HiveConf inputConf) throws Exception {
diff --git a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniLlapVectorArrow.java b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniLlapVectorArrow.java
index 55a2df8..35eda6c 100644
--- a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniLlapVectorArrow.java
+++ b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniLlapVectorArrow.java
@@ -18,11 +18,18 @@
 
 package org.apache.hive.jdbc;
 
+import static java.util.Arrays.asList;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertArrayEquals;
+
 import java.math.BigDecimal;
+
+import com.google.common.collect.ImmutableMap;
 import org.apache.hadoop.hive.common.type.Date;
 import org.apache.hadoop.hive.common.type.Timestamp;
+
+import java.sql.Statement;
+import java.util.ArrayList;
 import java.util.List;
 import org.apache.hadoop.hive.llap.FieldDesc;
 import org.apache.hadoop.hive.llap.Row;
@@ -33,6 +40,7 @@ import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
 
 import org.apache.hadoop.mapred.InputFormat;
 import org.apache.hadoop.hive.llap.LlapArrowRowInputFormat;
+import org.junit.Test;
 
 /**
  * TestJdbcWithMiniLlap for Arrow format with vectorized output sink
@@ -231,5 +239,173 @@ public class TestJdbcWithMiniLlapVectorArrow extends BaseJdbcWithMiniLlap {
     assertArrayEquals("X'01FF'".getBytes("UTF-8"), (byte[]) rowValues[22]);
   }
 
+
+  @Test
+  public void testTypesNestedInListWithLimitAndFilters() throws Exception {
+    try (Statement statement = hs2Conn.createStatement()) {
+      statement.execute("CREATE TABLE complex_tbl(c1 array<string>, " +
+          "c2 array<struct<f1:string,f2:string>>, " +
+          "c3 array<array<struct<f1:string,f2:string>>>, " +
+          "c4 int) STORED AS ORC");
+
+      statement.executeUpdate("INSERT INTO complex_tbl VALUES " +
+          "(" +
+          "ARRAY('a1', 'a2', 'a3', null), " +
+          "ARRAY(NAMED_STRUCT('f1','a1', 'f2','a2'), NAMED_STRUCT('f1','a3', 'f2','a4')), " +
+          "ARRAY((ARRAY(NAMED_STRUCT('f1','a1', 'f2','a2'), NAMED_STRUCT('f1','a3', 'f2','a4')))), " +
+          "1),      " +
+          "(" +
+          "ARRAY('b1'), " +
+          "ARRAY(NAMED_STRUCT('f1','b1', 'f2','b2'), NAMED_STRUCT('f1','b3', 'f2','b4')), " +
+          "ARRAY((ARRAY(NAMED_STRUCT('f1','b1', 'f2','b2'), NAMED_STRUCT('f1','b3', 'f2','b4'))), " +
+          "(ARRAY(NAMED_STRUCT('f1','b5', 'f2','b6'), NAMED_STRUCT('f1','b7', 'f2','b8')))), " +
+          "2), " +
+          "(" +
+          "ARRAY('c1', 'c2'), ARRAY(NAMED_STRUCT('f1','c1', 'f2','c2'), NAMED_STRUCT('f1','c3', 'f2','c4'), " +
+          "NAMED_STRUCT('f1','c5', 'f2','c6')), ARRAY((ARRAY(NAMED_STRUCT('f1','c1', 'f2','c2'), " +
+          "NAMED_STRUCT('f1','c3', 'f2','c4'))), (ARRAY(NAMED_STRUCT('f1','c5', 'f2','c6'), " +
+          "NAMED_STRUCT('f1','c7', 'f2','c8'))), (ARRAY(NAMED_STRUCT('f1','c9', 'f2','c10'), " +
+          "NAMED_STRUCT('f1','c11', 'f2','c12')))), " +
+          "3), " +
+          "(" +
+          "ARRAY(null), " +
+          "ARRAY(NAMED_STRUCT('f1','d1', 'f2','d2'), NAMED_STRUCT('f1','d3', 'f2','d4'), " +
+          "NAMED_STRUCT('f1','d5', 'f2','d6'), NAMED_STRUCT('f1','d7', 'f2','d8')), " +
+          "ARRAY((ARRAY(NAMED_STRUCT('f1','d1', 'f2', 'd2')))), " +
+          "4)");
+
+    }
+
+    List<Object[]> expected = new ArrayList<>();
+    expected.add(new Object[]{
+        asList("a1", "a2", "a3", null),
+        asList(asList("a1", "a2"), asList("a3", "a4")),
+        asList(asList(asList("a1", "a2"), asList("a3", "a4"))),
+        1
+    });
+    expected.add(new Object[]{
+        asList("b1"),
+        asList(asList("b1", "b2"), asList("b3", "b4")),
+        asList(asList(asList("b1", "b2"), asList("b3", "b4")), asList(asList("b5", "b6"), asList("b7", "b8"))),
+        2
+    });
+    expected.add(new Object[]{
+        asList("c1", "c2"),
+        asList(asList("c1", "c2"), asList("c3", "c4"), asList("c5", "c6")),
+        asList(asList(asList("c1", "c2"), asList("c3", "c4")), asList(asList("c5", "c6"), asList("c7", "c8")),
+            asList(asList("c9", "c10"), asList("c11", "c12"))),
+        3
+    });
+    List<String> nullList = new ArrayList<>();
+    nullList.add(null);
+    expected.add(new Object[]{
+        nullList,
+        asList(asList("d1", "d2"), asList("d3", "d4"), asList("d5", "d6"), asList("d7", "d8")),
+        asList(asList(asList("d1", "d2"))),
+        4
+    });
+
+    // test without limit and filters (i.e VectorizedRowBatch#selectedInUse=false)
+    RowCollector2 rowCollector = new RowCollector2();
+    String query = "select * from complex_tbl";
+    processQuery(query, 1, rowCollector);
+    verifyResult(rowCollector.rows, expected.get(0),
+        expected.get(1),
+        expected.get(2),
+        expected.get(3));
+
+    // test with filter
+    rowCollector = new RowCollector2();
+    query = "select * from complex_tbl where c4 > 1 ";
+    processQuery(query, 1, rowCollector);
+    verifyResult(rowCollector.rows, expected.get(1), expected.get(2), expected.get(3));
+
+    // test with limit
+    rowCollector = new RowCollector2();
+    query = "select * from complex_tbl limit 3";
+    processQuery(query, 1, rowCollector);
+    verifyResult(rowCollector.rows, expected.get(0), expected.get(1), expected.get(2));
+
+    // test with filters and limit
+    rowCollector = new RowCollector2();
+    query = "select * from complex_tbl where c4 > 1 limit 2";
+    processQuery(query, 1, rowCollector);
+    verifyResult(rowCollector.rows, expected.get(1), expected.get(2));
+
+  }
+
+  @Test
+  public void testTypesNestedInMapWithLimitAndFilters() throws Exception {
+    try (Statement statement = hs2Conn.createStatement()) {
+      statement.execute("CREATE TABLE complex_tbl2(c1 map<int, string>," +
+          " c2 map<int, array<string>>, " +
+          " c3 map<int, struct<f1:string,f2:string>>, c4 int) STORED AS ORC");
+
+      statement.executeUpdate("INSERT INTO complex_tbl2 VALUES " +
+          "(MAP(1, 'a1'), MAP(1, ARRAY('a1', 'a2')), MAP(1, NAMED_STRUCT('f1','a1', 'f2','a2')), " +
+          "1), " +
+          "(MAP(1, 'b1',2, 'b2'), MAP(1, ARRAY('b1', 'b2'), 2, ARRAY('b3') ), " +
+          "MAP(1, NAMED_STRUCT('f1','b1', 'f2','b2')), " +
+          "2), " +
+          "(MAP(1, 'c1',2, 'c2'), MAP(1, ARRAY('c1', 'c2'), 2, ARRAY('c3') ), " +
+          "MAP(1, NAMED_STRUCT('f1','c1', 'f2','c2'), 2, NAMED_STRUCT('f1', 'c3', 'f2', 'c4') ), " +
+          "3)");
+
+    }
+
+    List<Object[]> expected = new ArrayList<>();
+    expected.add(new Object[]{
+        ImmutableMap.of(1, "a1"),
+        ImmutableMap.of(1, asList("a1", "a2")),
+        ImmutableMap.of(1, asList("a1", "a2")),
+        1,
+    });
+    expected.add(new Object[]{
+        ImmutableMap.of(1, "b1", 2, "b2"),
+        ImmutableMap.of(1, asList("b1", "b2"), 2, asList("b3")),
+        ImmutableMap.of(1, asList("b1", "b2")),
+        2,
+    });
+    expected.add(new Object[]{
+        ImmutableMap.of(1, "c1", 2, "c2"),
+        ImmutableMap.of(1, asList("c1", "c2"), 2, asList("c3")),
+        ImmutableMap.of(1, asList("c1", "c2"), 2, asList("c3", "c4")),
+        3,
+    });
+
+
+    // test without limit and filters (i.e. VectorizedRowBatch#selectedInUse=false)
+    RowCollector2 rowCollector = new RowCollector2();
+    String query = "select * from complex_tbl2";
+    processQuery(query, 1, rowCollector);
+    verifyResult(rowCollector.rows, expected.get(0), expected.get(1), expected.get(2));
+
+    // test with filter
+    rowCollector = new RowCollector2();
+    query = "select * from complex_tbl2 where c4 > 1 ";
+    processQuery(query, 1, rowCollector);
+    verifyResult(rowCollector.rows, expected.get(1), expected.get(2));
+
+    // test with limit
+    rowCollector = new RowCollector2();
+    query = "select * from complex_tbl2 limit 2";
+    processQuery(query, 1, rowCollector);
+    verifyResult(rowCollector.rows, expected.get(0), expected.get(1));
+
+    // test with filters and limit
+    rowCollector = new RowCollector2();
+    query = "select * from complex_tbl2 where c4 > 1 limit 1";
+    processQuery(query, 1, rowCollector);
+    verifyResult(rowCollector.rows, expected.get(1));
+
+  }
+
+  private void verifyResult(List<Object[]> actual, Object[]... expected) {
+    assertEquals(expected.length, actual.size());
+    for (int i = 0; i < expected.length; i++) {
+      assertArrayEquals(expected[i], actual.get(i));
+    }
+  }
+
 }
 
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/arrow/Serializer.java b/ql/src/java/org/apache/hadoop/hive/ql/io/arrow/Serializer.java
index 5ec800d..0350977 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/arrow/Serializer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/arrow/Serializer.java
@@ -56,6 +56,7 @@ import org.apache.hadoop.hive.ql.exec.vector.IntervalDayTimeColumnVector;
 import org.apache.hadoop.hive.ql.exec.vector.ListColumnVector;
 import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
 import org.apache.hadoop.hive.ql.exec.vector.MapColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.MultiValuedColumnVector;
 import org.apache.hadoop.hive.ql.exec.vector.StructColumnVector;
 import org.apache.hadoop.hive.ql.exec.vector.TimestampColumnVector;
 import org.apache.hadoop.hive.ql.exec.vector.UnionColumnVector;
@@ -314,7 +315,11 @@ public class Serializer {
 
     final ArrowBuf validityBuffer = arrowVector.getValidityBuffer();
     for (int rowIndex = 0; rowIndex < size; rowIndex++) {
-      if (hiveVector.isNull[rowIndex]) {
+      int selectedIndex = rowIndex;
+      if (vectorizedRowBatch.selectedInUse) {
+        selectedIndex = vectorizedRowBatch.selected[rowIndex];
+      }
+      if (hiveVector.isNull[selectedIndex]) {
         BitVectorHelper.setValidityBit(validityBuffer, rowIndex, 0);
       } else {
         BitVectorHelper.setValidityBitToOne(validityBuffer, rowIndex);
@@ -365,27 +370,75 @@ public class Serializer {
     }
   }
 
+  // selected[] points to the valid/filtered/selected records at row level.
+  // for MultiValuedColumnVector such as ListColumnVector one record of vector points to multiple nested records.
+  // In child vectors we get these records in exploded manner i.e. the number of records in child vectors can have size more
+  // than actual the VectorizedRowBatch, consequently selected[] also needs to be readjusted.
+  // This method creates a shallow copy of VectorizedRowBatch with corrected size and selected[]
+
+  private static VectorizedRowBatch correctSelectedAndSize(VectorizedRowBatch sourceVrb,
+                                                           MultiValuedColumnVector multiValuedColumnVector) {
+
+    VectorizedRowBatch vrb = new VectorizedRowBatch(sourceVrb.numCols, sourceVrb.size);
+    vrb.cols = sourceVrb.cols;
+    vrb.endOfFile = sourceVrb.endOfFile;
+    vrb.projectedColumns = sourceVrb.projectedColumns;
+    vrb.projectionSize = sourceVrb.projectionSize;
+    vrb.selectedInUse = sourceVrb.selectedInUse;
+    vrb.setPartitionInfo(sourceVrb.getDataColumnCount(), sourceVrb.getPartitionColumnCount());
+
+    int correctedSize = 0;
+    final int[] srcVrbSelected = sourceVrb.selected;
+    for (int i = 0; i < sourceVrb.size; i++) {
+      correctedSize += multiValuedColumnVector.lengths[srcVrbSelected[i]];
+    }
+
+    int newIndex = 0;
+    final int[] selectedOffsetsCorrected = new int[correctedSize];
+    for (int i = 0; i < sourceVrb.size; i++) {
+      long elementIndex = multiValuedColumnVector.offsets[srcVrbSelected[i]];
+      long elementSize = multiValuedColumnVector.lengths[srcVrbSelected[i]];
+      for (int j = 0; j < elementSize; j++) {
+        selectedOffsetsCorrected[newIndex++] = (int) (elementIndex + j);
+      }
+    }
+    vrb.selected = selectedOffsetsCorrected;
+    vrb.size = correctedSize;
+    return vrb;
+  }
+
   private void writeList(ListVector arrowVector, ListColumnVector hiveVector, ListTypeInfo typeInfo, int size,
-      VectorizedRowBatch vectorizedRowBatch, boolean isNative) {
+                         VectorizedRowBatch vectorizedRowBatch, boolean isNative) {
     final int OFFSET_WIDTH = 4;
     final TypeInfo elementTypeInfo = typeInfo.getListElementTypeInfo();
     final ColumnVector hiveElementVector = hiveVector.child;
     final FieldVector arrowElementVector =
-        (FieldVector) arrowVector.addOrGetVector(toFieldType(elementTypeInfo)).getVector();
-    arrowElementVector.setInitialCapacity(hiveVector.childCount);
+            (FieldVector) arrowVector.addOrGetVector(toFieldType(elementTypeInfo)).getVector();
+
+    VectorizedRowBatch correctedVrb = vectorizedRowBatch;
+    int correctedSize = hiveVector.childCount;
+    if (vectorizedRowBatch.selectedInUse) {
+      correctedVrb = correctSelectedAndSize(vectorizedRowBatch, hiveVector);
+      correctedSize = correctedVrb.size;
+    }
+    arrowElementVector.setInitialCapacity(correctedSize);
     arrowElementVector.allocateNew();
 
-    write(arrowElementVector, hiveElementVector, elementTypeInfo, hiveVector.childCount, vectorizedRowBatch, isNative);
+    write(arrowElementVector, hiveElementVector, elementTypeInfo, correctedSize, correctedVrb, isNative);
 
     final ArrowBuf offsetBuffer = arrowVector.getOffsetBuffer();
     int nextOffset = 0;
 
     for (int rowIndex = 0; rowIndex < size; rowIndex++) {
-      if (hiveVector.isNull[rowIndex]) {
+      int selectedIndex = rowIndex;
+      if (vectorizedRowBatch.selectedInUse) {
+        selectedIndex = vectorizedRowBatch.selected[rowIndex];
+      }
+      if (hiveVector.isNull[selectedIndex]) {
         offsetBuffer.setInt(rowIndex * OFFSET_WIDTH, nextOffset);
       } else {
         offsetBuffer.setInt(rowIndex * OFFSET_WIDTH, nextOffset);
-        nextOffset += (int) hiveVector.lengths[rowIndex];
+        nextOffset += (int) hiveVector.lengths[selectedIndex];
         arrowVector.setNotNull(rowIndex);
       }
     }