You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by sa...@apache.org on 2019/07/09 03:53:33 UTC
[hive] branch master updated: HIVE-21966: Llap external client -
Arrow Serializer throws ArrayIndexOutOfBoundsException in some cases
(Shubham Chaurasia, reviewed by Sankar Hariappan)
This is an automated email from the ASF dual-hosted git repository.
sankarh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push:
new f923b74 HIVE-21966: Llap external client - Arrow Serializer throws ArrayIndexOutOfBoundsException in some cases (Shubham Chaurasia, reviewed by Sankar Hariappan)
f923b74 is described below
commit f923b7490a6085292e2d05425dcdc83b994d42e4
Author: Shubham Chaurasia <sc...@cloudera.com>
AuthorDate: Tue Jul 9 09:23:06 2019 +0530
HIVE-21966: Llap external client - Arrow Serializer throws ArrayIndexOutOfBoundsException in some cases (Shubham Chaurasia, reviewed by Sankar Hariappan)
Signed-off-by: Sankar Hariappan <sa...@apache.org>
---
.../org/apache/hive/jdbc/BaseJdbcWithMiniLlap.java | 2 +-
.../hive/jdbc/TestJdbcWithMiniLlapVectorArrow.java | 176 +++++++++++++++++++++
.../apache/hadoop/hive/ql/io/arrow/Serializer.java | 67 +++++++-
3 files changed, 237 insertions(+), 8 deletions(-)
diff --git a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/BaseJdbcWithMiniLlap.java b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/BaseJdbcWithMiniLlap.java
index ab79b42..8467cea 100644
--- a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/BaseJdbcWithMiniLlap.java
+++ b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/BaseJdbcWithMiniLlap.java
@@ -107,7 +107,7 @@ public abstract class BaseJdbcWithMiniLlap {
private static Path dataTypesFilePath;
protected static HiveConf conf = null;
- private static Connection hs2Conn = null;
+ protected static Connection hs2Conn = null;
// This method should be called by sub-classes in a @BeforeClass initializer
public static MiniHS2 beforeTest(HiveConf inputConf) throws Exception {
diff --git a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniLlapVectorArrow.java b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniLlapVectorArrow.java
index 55a2df8..35eda6c 100644
--- a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniLlapVectorArrow.java
+++ b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniLlapVectorArrow.java
@@ -18,11 +18,18 @@
package org.apache.hive.jdbc;
+import static java.util.Arrays.asList;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertArrayEquals;
+
import java.math.BigDecimal;
+
+import com.google.common.collect.ImmutableMap;
import org.apache.hadoop.hive.common.type.Date;
import org.apache.hadoop.hive.common.type.Timestamp;
+
+import java.sql.Statement;
+import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.hive.llap.FieldDesc;
import org.apache.hadoop.hive.llap.Row;
@@ -33,6 +40,7 @@ import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
import org.apache.hadoop.mapred.InputFormat;
import org.apache.hadoop.hive.llap.LlapArrowRowInputFormat;
+import org.junit.Test;
/**
* TestJdbcWithMiniLlap for Arrow format with vectorized output sink
@@ -231,5 +239,173 @@ public class TestJdbcWithMiniLlapVectorArrow extends BaseJdbcWithMiniLlap {
assertArrayEquals("X'01FF'".getBytes("UTF-8"), (byte[]) rowValues[22]);
}
+
+ @Test
+ public void testTypesNestedInListWithLimitAndFilters() throws Exception {
+ try (Statement statement = hs2Conn.createStatement()) {
+ statement.execute("CREATE TABLE complex_tbl(c1 array<string>, " +
+ "c2 array<struct<f1:string,f2:string>>, " +
+ "c3 array<array<struct<f1:string,f2:string>>>, " +
+ "c4 int) STORED AS ORC");
+
+ statement.executeUpdate("INSERT INTO complex_tbl VALUES " +
+ "(" +
+ "ARRAY('a1', 'a2', 'a3', null), " +
+ "ARRAY(NAMED_STRUCT('f1','a1', 'f2','a2'), NAMED_STRUCT('f1','a3', 'f2','a4')), " +
+ "ARRAY((ARRAY(NAMED_STRUCT('f1','a1', 'f2','a2'), NAMED_STRUCT('f1','a3', 'f2','a4')))), " +
+ "1), " +
+ "(" +
+ "ARRAY('b1'), " +
+ "ARRAY(NAMED_STRUCT('f1','b1', 'f2','b2'), NAMED_STRUCT('f1','b3', 'f2','b4')), " +
+ "ARRAY((ARRAY(NAMED_STRUCT('f1','b1', 'f2','b2'), NAMED_STRUCT('f1','b3', 'f2','b4'))), " +
+ "(ARRAY(NAMED_STRUCT('f1','b5', 'f2','b6'), NAMED_STRUCT('f1','b7', 'f2','b8')))), " +
+ "2), " +
+ "(" +
+ "ARRAY('c1', 'c2'), ARRAY(NAMED_STRUCT('f1','c1', 'f2','c2'), NAMED_STRUCT('f1','c3', 'f2','c4'), " +
+ "NAMED_STRUCT('f1','c5', 'f2','c6')), ARRAY((ARRAY(NAMED_STRUCT('f1','c1', 'f2','c2'), " +
+ "NAMED_STRUCT('f1','c3', 'f2','c4'))), (ARRAY(NAMED_STRUCT('f1','c5', 'f2','c6'), " +
+ "NAMED_STRUCT('f1','c7', 'f2','c8'))), (ARRAY(NAMED_STRUCT('f1','c9', 'f2','c10'), " +
+ "NAMED_STRUCT('f1','c11', 'f2','c12')))), " +
+ "3), " +
+ "(" +
+ "ARRAY(null), " +
+ "ARRAY(NAMED_STRUCT('f1','d1', 'f2','d2'), NAMED_STRUCT('f1','d3', 'f2','d4'), " +
+ "NAMED_STRUCT('f1','d5', 'f2','d6'), NAMED_STRUCT('f1','d7', 'f2','d8')), " +
+ "ARRAY((ARRAY(NAMED_STRUCT('f1','d1', 'f2', 'd2')))), " +
+ "4)");
+
+ }
+
+ List<Object[]> expected = new ArrayList<>();
+ expected.add(new Object[]{
+ asList("a1", "a2", "a3", null),
+ asList(asList("a1", "a2"), asList("a3", "a4")),
+ asList(asList(asList("a1", "a2"), asList("a3", "a4"))),
+ 1
+ });
+ expected.add(new Object[]{
+ asList("b1"),
+ asList(asList("b1", "b2"), asList("b3", "b4")),
+ asList(asList(asList("b1", "b2"), asList("b3", "b4")), asList(asList("b5", "b6"), asList("b7", "b8"))),
+ 2
+ });
+ expected.add(new Object[]{
+ asList("c1", "c2"),
+ asList(asList("c1", "c2"), asList("c3", "c4"), asList("c5", "c6")),
+ asList(asList(asList("c1", "c2"), asList("c3", "c4")), asList(asList("c5", "c6"), asList("c7", "c8")),
+ asList(asList("c9", "c10"), asList("c11", "c12"))),
+ 3
+ });
+ List<String> nullList = new ArrayList<>();
+ nullList.add(null);
+ expected.add(new Object[]{
+ nullList,
+ asList(asList("d1", "d2"), asList("d3", "d4"), asList("d5", "d6"), asList("d7", "d8")),
+ asList(asList(asList("d1", "d2"))),
+ 4
+ });
+
+ // test without limit and filters (i.e VectorizedRowBatch#selectedInUse=false)
+ RowCollector2 rowCollector = new RowCollector2();
+ String query = "select * from complex_tbl";
+ processQuery(query, 1, rowCollector);
+ verifyResult(rowCollector.rows, expected.get(0),
+ expected.get(1),
+ expected.get(2),
+ expected.get(3));
+
+ // test with filter
+ rowCollector = new RowCollector2();
+ query = "select * from complex_tbl where c4 > 1 ";
+ processQuery(query, 1, rowCollector);
+ verifyResult(rowCollector.rows, expected.get(1), expected.get(2), expected.get(3));
+
+ // test with limit
+ rowCollector = new RowCollector2();
+ query = "select * from complex_tbl limit 3";
+ processQuery(query, 1, rowCollector);
+ verifyResult(rowCollector.rows, expected.get(0), expected.get(1), expected.get(2));
+
+ // test with filters and limit
+ rowCollector = new RowCollector2();
+ query = "select * from complex_tbl where c4 > 1 limit 2";
+ processQuery(query, 1, rowCollector);
+ verifyResult(rowCollector.rows, expected.get(1), expected.get(2));
+
+ }
+
+ @Test
+ public void testTypesNestedInMapWithLimitAndFilters() throws Exception {
+ try (Statement statement = hs2Conn.createStatement()) {
+ statement.execute("CREATE TABLE complex_tbl2(c1 map<int, string>," +
+ " c2 map<int, array<string>>, " +
+ " c3 map<int, struct<f1:string,f2:string>>, c4 int) STORED AS ORC");
+
+ statement.executeUpdate("INSERT INTO complex_tbl2 VALUES " +
+ "(MAP(1, 'a1'), MAP(1, ARRAY('a1', 'a2')), MAP(1, NAMED_STRUCT('f1','a1', 'f2','a2')), " +
+ "1), " +
+ "(MAP(1, 'b1',2, 'b2'), MAP(1, ARRAY('b1', 'b2'), 2, ARRAY('b3') ), " +
+ "MAP(1, NAMED_STRUCT('f1','b1', 'f2','b2')), " +
+ "2), " +
+ "(MAP(1, 'c1',2, 'c2'), MAP(1, ARRAY('c1', 'c2'), 2, ARRAY('c3') ), " +
+ "MAP(1, NAMED_STRUCT('f1','c1', 'f2','c2'), 2, NAMED_STRUCT('f1', 'c3', 'f2', 'c4') ), " +
+ "3)");
+
+ }
+
+ List<Object[]> expected = new ArrayList<>();
+ expected.add(new Object[]{
+ ImmutableMap.of(1, "a1"),
+ ImmutableMap.of(1, asList("a1", "a2")),
+ ImmutableMap.of(1, asList("a1", "a2")),
+ 1,
+ });
+ expected.add(new Object[]{
+ ImmutableMap.of(1, "b1", 2, "b2"),
+ ImmutableMap.of(1, asList("b1", "b2"), 2, asList("b3")),
+ ImmutableMap.of(1, asList("b1", "b2")),
+ 2,
+ });
+ expected.add(new Object[]{
+ ImmutableMap.of(1, "c1", 2, "c2"),
+ ImmutableMap.of(1, asList("c1", "c2"), 2, asList("c3")),
+ ImmutableMap.of(1, asList("c1", "c2"), 2, asList("c3", "c4")),
+ 3,
+ });
+
+
+ // test without limit and filters (i.e. VectorizedRowBatch#selectedInUse=false)
+ RowCollector2 rowCollector = new RowCollector2();
+ String query = "select * from complex_tbl2";
+ processQuery(query, 1, rowCollector);
+ verifyResult(rowCollector.rows, expected.get(0), expected.get(1), expected.get(2));
+
+ // test with filter
+ rowCollector = new RowCollector2();
+ query = "select * from complex_tbl2 where c4 > 1 ";
+ processQuery(query, 1, rowCollector);
+ verifyResult(rowCollector.rows, expected.get(1), expected.get(2));
+
+ // test with limit
+ rowCollector = new RowCollector2();
+ query = "select * from complex_tbl2 limit 2";
+ processQuery(query, 1, rowCollector);
+ verifyResult(rowCollector.rows, expected.get(0), expected.get(1));
+
+ // test with filters and limit
+ rowCollector = new RowCollector2();
+ query = "select * from complex_tbl2 where c4 > 1 limit 1";
+ processQuery(query, 1, rowCollector);
+ verifyResult(rowCollector.rows, expected.get(1));
+
+ }
+
+ private void verifyResult(List<Object[]> actual, Object[]... expected) {
+ assertEquals(expected.length, actual.size());
+ for (int i = 0; i < expected.length; i++) {
+ assertArrayEquals(expected[i], actual.get(i));
+ }
+ }
+
}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/arrow/Serializer.java b/ql/src/java/org/apache/hadoop/hive/ql/io/arrow/Serializer.java
index 5ec800d..0350977 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/arrow/Serializer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/arrow/Serializer.java
@@ -56,6 +56,7 @@ import org.apache.hadoop.hive.ql.exec.vector.IntervalDayTimeColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.ListColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.MapColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.MultiValuedColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.StructColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.TimestampColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.UnionColumnVector;
@@ -314,7 +315,11 @@ public class Serializer {
final ArrowBuf validityBuffer = arrowVector.getValidityBuffer();
for (int rowIndex = 0; rowIndex < size; rowIndex++) {
- if (hiveVector.isNull[rowIndex]) {
+ int selectedIndex = rowIndex;
+ if (vectorizedRowBatch.selectedInUse) {
+ selectedIndex = vectorizedRowBatch.selected[rowIndex];
+ }
+ if (hiveVector.isNull[selectedIndex]) {
BitVectorHelper.setValidityBit(validityBuffer, rowIndex, 0);
} else {
BitVectorHelper.setValidityBitToOne(validityBuffer, rowIndex);
@@ -365,27 +370,75 @@ public class Serializer {
}
}
+ // selected[] points to the valid/filtered/selected records at row level.
+ // for MultiValuedColumnVector such as ListColumnVector one record of vector points to multiple nested records.
+ // In child vectors we get these records in exploded manner i.e. the number of records in child vectors can have size more
+ // than actual the VectorizedRowBatch, consequently selected[] also needs to be readjusted.
+ // This method creates a shallow copy of VectorizedRowBatch with corrected size and selected[]
+
+ private static VectorizedRowBatch correctSelectedAndSize(VectorizedRowBatch sourceVrb,
+ MultiValuedColumnVector multiValuedColumnVector) {
+
+ VectorizedRowBatch vrb = new VectorizedRowBatch(sourceVrb.numCols, sourceVrb.size);
+ vrb.cols = sourceVrb.cols;
+ vrb.endOfFile = sourceVrb.endOfFile;
+ vrb.projectedColumns = sourceVrb.projectedColumns;
+ vrb.projectionSize = sourceVrb.projectionSize;
+ vrb.selectedInUse = sourceVrb.selectedInUse;
+ vrb.setPartitionInfo(sourceVrb.getDataColumnCount(), sourceVrb.getPartitionColumnCount());
+
+ int correctedSize = 0;
+ final int[] srcVrbSelected = sourceVrb.selected;
+ for (int i = 0; i < sourceVrb.size; i++) {
+ correctedSize += multiValuedColumnVector.lengths[srcVrbSelected[i]];
+ }
+
+ int newIndex = 0;
+ final int[] selectedOffsetsCorrected = new int[correctedSize];
+ for (int i = 0; i < sourceVrb.size; i++) {
+ long elementIndex = multiValuedColumnVector.offsets[srcVrbSelected[i]];
+ long elementSize = multiValuedColumnVector.lengths[srcVrbSelected[i]];
+ for (int j = 0; j < elementSize; j++) {
+ selectedOffsetsCorrected[newIndex++] = (int) (elementIndex + j);
+ }
+ }
+ vrb.selected = selectedOffsetsCorrected;
+ vrb.size = correctedSize;
+ return vrb;
+ }
+
private void writeList(ListVector arrowVector, ListColumnVector hiveVector, ListTypeInfo typeInfo, int size,
- VectorizedRowBatch vectorizedRowBatch, boolean isNative) {
+ VectorizedRowBatch vectorizedRowBatch, boolean isNative) {
final int OFFSET_WIDTH = 4;
final TypeInfo elementTypeInfo = typeInfo.getListElementTypeInfo();
final ColumnVector hiveElementVector = hiveVector.child;
final FieldVector arrowElementVector =
- (FieldVector) arrowVector.addOrGetVector(toFieldType(elementTypeInfo)).getVector();
- arrowElementVector.setInitialCapacity(hiveVector.childCount);
+ (FieldVector) arrowVector.addOrGetVector(toFieldType(elementTypeInfo)).getVector();
+
+ VectorizedRowBatch correctedVrb = vectorizedRowBatch;
+ int correctedSize = hiveVector.childCount;
+ if (vectorizedRowBatch.selectedInUse) {
+ correctedVrb = correctSelectedAndSize(vectorizedRowBatch, hiveVector);
+ correctedSize = correctedVrb.size;
+ }
+ arrowElementVector.setInitialCapacity(correctedSize);
arrowElementVector.allocateNew();
- write(arrowElementVector, hiveElementVector, elementTypeInfo, hiveVector.childCount, vectorizedRowBatch, isNative);
+ write(arrowElementVector, hiveElementVector, elementTypeInfo, correctedSize, correctedVrb, isNative);
final ArrowBuf offsetBuffer = arrowVector.getOffsetBuffer();
int nextOffset = 0;
for (int rowIndex = 0; rowIndex < size; rowIndex++) {
- if (hiveVector.isNull[rowIndex]) {
+ int selectedIndex = rowIndex;
+ if (vectorizedRowBatch.selectedInUse) {
+ selectedIndex = vectorizedRowBatch.selected[rowIndex];
+ }
+ if (hiveVector.isNull[selectedIndex]) {
offsetBuffer.setInt(rowIndex * OFFSET_WIDTH, nextOffset);
} else {
offsetBuffer.setInt(rowIndex * OFFSET_WIDTH, nextOffset);
- nextOffset += (int) hiveVector.lengths[rowIndex];
+ nextOffset += (int) hiveVector.lengths[selectedIndex];
arrowVector.setNotNull(rowIndex);
}
}