You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tw...@apache.org on 2018/01/31 13:45:02 UTC
[1/2] flink git commit: [FLINK-8230] [orc] Fix NPEs when reading
nested columns.
Repository: flink
Updated Branches:
refs/heads/master 3cfc5ae9f -> bcead3be3
http://git-wip-us.apache.org/repos/asf/flink/blob/bcead3be/flink-connectors/flink-orc/src/main/java/org/apache/flink/orc/OrcUtils.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-orc/src/main/java/org/apache/flink/orc/OrcUtils.java b/flink-connectors/flink-orc/src/main/java/org/apache/flink/orc/OrcUtils.java
deleted file mode 100644
index cfb4e0e..0000000
--- a/flink-connectors/flink-orc/src/main/java/org/apache/flink/orc/OrcUtils.java
+++ /dev/null
@@ -1,1508 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.orc;
-
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
-import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
-import org.apache.flink.api.common.typeinfo.SqlTimeTypeInfo;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.typeutils.MapTypeInfo;
-import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo;
-import org.apache.flink.api.java.typeutils.RowTypeInfo;
-import org.apache.flink.types.Row;
-
-import org.apache.hadoop.hive.common.type.HiveDecimal;
-import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
-import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
-import org.apache.hadoop.hive.ql.exec.vector.DecimalColumnVector;
-import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector;
-import org.apache.hadoop.hive.ql.exec.vector.ListColumnVector;
-import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
-import org.apache.hadoop.hive.ql.exec.vector.MapColumnVector;
-import org.apache.hadoop.hive.ql.exec.vector.StructColumnVector;
-import org.apache.hadoop.hive.ql.exec.vector.TimestampColumnVector;
-import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
-import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable;
-import org.apache.orc.TypeDescription;
-
-import java.lang.reflect.Array;
-import java.math.BigDecimal;
-import java.nio.charset.StandardCharsets;
-import java.sql.Date;
-import java.sql.Timestamp;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.List;
-import java.util.TimeZone;
-import java.util.function.DoubleFunction;
-import java.util.function.IntFunction;
-import java.util.function.LongFunction;
-
-/**
- * A class that provides utility methods for orc file reading.
- */
-class OrcUtils {
-
- private static final long MILLIS_PER_DAY = 86400000; // = 24 * 60 * 60 * 1000
- private static final TimeZone LOCAL_TZ = TimeZone.getDefault();
-
- /**
- * Converts an ORC schema to a Flink TypeInformation.
- *
- * @param schema The ORC schema.
- * @return The TypeInformation that corresponds to the ORC schema.
- */
- static TypeInformation schemaToTypeInfo(TypeDescription schema) {
- switch (schema.getCategory()) {
- case BOOLEAN:
- return BasicTypeInfo.BOOLEAN_TYPE_INFO;
- case BYTE:
- return BasicTypeInfo.BYTE_TYPE_INFO;
- case SHORT:
- return BasicTypeInfo.SHORT_TYPE_INFO;
- case INT:
- return BasicTypeInfo.INT_TYPE_INFO;
- case LONG:
- return BasicTypeInfo.LONG_TYPE_INFO;
- case FLOAT:
- return BasicTypeInfo.FLOAT_TYPE_INFO;
- case DOUBLE:
- return BasicTypeInfo.DOUBLE_TYPE_INFO;
- case DECIMAL:
- return BasicTypeInfo.BIG_DEC_TYPE_INFO;
- case STRING:
- case CHAR:
- case VARCHAR:
- return BasicTypeInfo.STRING_TYPE_INFO;
- case DATE:
- return SqlTimeTypeInfo.DATE;
- case TIMESTAMP:
- return SqlTimeTypeInfo.TIMESTAMP;
- case BINARY:
- return PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO;
- case STRUCT:
- List<TypeDescription> fieldSchemas = schema.getChildren();
- TypeInformation[] fieldTypes = new TypeInformation[fieldSchemas.size()];
- for (int i = 0; i < fieldSchemas.size(); i++) {
- fieldTypes[i] = schemaToTypeInfo(fieldSchemas.get(i));
- }
- String[] fieldNames = schema.getFieldNames().toArray(new String[]{});
- return new RowTypeInfo(fieldTypes, fieldNames);
- case LIST:
- TypeDescription elementSchema = schema.getChildren().get(0);
- TypeInformation<?> elementType = schemaToTypeInfo(elementSchema);
- // arrays of primitive types are handled as object arrays to support null values
- return ObjectArrayTypeInfo.getInfoFor(elementType);
- case MAP:
- TypeDescription keySchema = schema.getChildren().get(0);
- TypeDescription valSchema = schema.getChildren().get(1);
- TypeInformation<?> keyType = schemaToTypeInfo(keySchema);
- TypeInformation<?> valType = schemaToTypeInfo(valSchema);
- return new MapTypeInfo<>(keyType, valType);
- case UNION:
- throw new UnsupportedOperationException("UNION type is not supported yet.");
- default:
- throw new IllegalArgumentException("Unknown type " + schema);
- }
- }
-
- /**
- * Fills an ORC batch into an array of Row.
- *
- * @param rows The batch of rows need to be filled.
- * @param schema The schema of the ORC data.
- * @param batch The ORC data.
- * @param selectedFields The list of selected ORC fields.
- * @return The number of rows that were filled.
- */
- static int fillRows(Row[] rows, TypeDescription schema, VectorizedRowBatch batch, int[] selectedFields) {
-
- int rowsToRead = Math.min((int) batch.count(), rows.length);
-
- List<TypeDescription> fieldTypes = schema.getChildren();
- // read each selected field
- for (int rowIdx = 0; rowIdx < selectedFields.length; rowIdx++) {
- int orcIdx = selectedFields[rowIdx];
- readField(rows, rowIdx, fieldTypes.get(orcIdx), batch.cols[orcIdx], null, rowsToRead);
- }
- return rowsToRead;
- }
-
- /**
- * Reads a vector of data into an array of objects.
- *
- * @param vals The array that needs to be filled.
- * @param fieldIdx If the vals array is an array of Row, the index of the field that needs to be filled.
- * Otherwise a -1 must be passed and the data is directly filled into the array.
- * @param schema The schema of the vector to read.
- * @param vector The vector to read.
- * @param lengthVector If the vector is of type List or Map, the number of sub-elements to read for each field. Otherwise, it must be null.
- * @param childCount The number of vector entries to read.
- */
- private static void readField(Object[] vals, int fieldIdx, TypeDescription schema, ColumnVector vector, long[] lengthVector, int childCount) {
-
- // check the type of the vector to decide how to read it.
- switch (schema.getCategory()) {
- case BOOLEAN:
- if (vector.noNulls) {
- readNonNullLongColumn(vals, fieldIdx, (LongColumnVector) vector, lengthVector, childCount, OrcUtils::readBoolean, OrcUtils::boolArray);
- } else {
- readLongColumn(vals, fieldIdx, (LongColumnVector) vector, lengthVector, childCount, OrcUtils::readBoolean, OrcUtils::boolArray);
- }
- break;
- case BYTE:
- if (vector.noNulls) {
- readNonNullLongColumn(vals, fieldIdx, (LongColumnVector) vector, lengthVector, childCount, OrcUtils::readByte, OrcUtils::byteArray);
- } else {
- readLongColumn(vals, fieldIdx, (LongColumnVector) vector, lengthVector, childCount, OrcUtils::readByte, OrcUtils::byteArray);
- }
- break;
- case SHORT:
- if (vector.noNulls) {
- readNonNullLongColumn(vals, fieldIdx, (LongColumnVector) vector, lengthVector, childCount, OrcUtils::readShort, OrcUtils::shortArray);
- } else {
- readLongColumn(vals, fieldIdx, (LongColumnVector) vector, lengthVector, childCount, OrcUtils::readShort, OrcUtils::shortArray);
- }
- break;
- case INT:
- if (vector.noNulls) {
- readNonNullLongColumn(vals, fieldIdx, (LongColumnVector) vector, lengthVector, childCount, OrcUtils::readInt, OrcUtils::intArray);
- } else {
- readLongColumn(vals, fieldIdx, (LongColumnVector) vector, lengthVector, childCount, OrcUtils::readInt, OrcUtils::intArray);
- }
- break;
- case LONG:
- if (vector.noNulls) {
- readNonNullLongColumn(vals, fieldIdx, (LongColumnVector) vector, lengthVector, childCount, OrcUtils::readLong, OrcUtils::longArray);
- } else {
- readLongColumn(vals, fieldIdx, (LongColumnVector) vector, lengthVector, childCount, OrcUtils::readLong, OrcUtils::longArray);
- }
- break;
- case FLOAT:
- if (vector.noNulls) {
- readNonNullDoubleColumn(vals, fieldIdx, (DoubleColumnVector) vector, lengthVector, childCount, OrcUtils::readFloat, OrcUtils::floatArray);
- } else {
- readDoubleColumn(vals, fieldIdx, (DoubleColumnVector) vector, lengthVector, childCount, OrcUtils::readFloat, OrcUtils::floatArray);
- }
- break;
- case DOUBLE:
- if (vector.noNulls) {
- readNonNullDoubleColumn(vals, fieldIdx, (DoubleColumnVector) vector, lengthVector, childCount, OrcUtils::readDouble, OrcUtils::doubleArray);
- } else {
- readDoubleColumn(vals, fieldIdx, (DoubleColumnVector) vector, lengthVector, childCount, OrcUtils::readDouble, OrcUtils::doubleArray);
- }
- break;
- case CHAR:
- case VARCHAR:
- case STRING:
- if (vector.noNulls) {
- readNonNullBytesColumnAsString(vals, fieldIdx, (BytesColumnVector) vector, lengthVector, childCount);
- } else {
- readBytesColumnAsString(vals, fieldIdx, (BytesColumnVector) vector, lengthVector, childCount);
- }
- break;
- case DATE:
- if (vector.noNulls) {
- readNonNullLongColumnAsDate(vals, fieldIdx, (LongColumnVector) vector, lengthVector, childCount);
- } else {
- readLongColumnAsDate(vals, fieldIdx, (LongColumnVector) vector, lengthVector, childCount);
- }
- break;
- case TIMESTAMP:
- if (vector.noNulls) {
- readNonNullTimestampColumn(vals, fieldIdx, (TimestampColumnVector) vector, lengthVector, childCount);
- } else {
- readTimestampColumn(vals, fieldIdx, (TimestampColumnVector) vector, lengthVector, childCount);
- }
- break;
- case BINARY:
- if (vector.noNulls) {
- readNonNullBytesColumnAsBinary(vals, fieldIdx, (BytesColumnVector) vector, lengthVector, childCount);
- } else {
- readBytesColumnAsBinary(vals, fieldIdx, (BytesColumnVector) vector, lengthVector, childCount);
- }
- break;
- case DECIMAL:
- if (vector.noNulls) {
- readNonNullDecimalColumn(vals, fieldIdx, (DecimalColumnVector) vector, lengthVector, childCount);
- } else {
- readDecimalColumn(vals, fieldIdx, (DecimalColumnVector) vector, lengthVector, childCount);
- }
- break;
- case STRUCT:
- if (vector.noNulls) {
- readNonNullStructColumn(vals, fieldIdx, (StructColumnVector) vector, schema, lengthVector, childCount);
- } else {
- readStructColumn(vals, fieldIdx, (StructColumnVector) vector, schema, lengthVector, childCount);
- }
- break;
- case LIST:
- if (vector.noNulls) {
- readNonNullListColumn(vals, fieldIdx, (ListColumnVector) vector, schema, lengthVector, childCount);
- } else {
- readListColumn(vals, fieldIdx, (ListColumnVector) vector, schema, lengthVector, childCount);
- }
- break;
- case MAP:
- if (vector.noNulls) {
- readNonNullMapColumn(vals, fieldIdx, (MapColumnVector) vector, schema, lengthVector, childCount);
- } else {
- readMapColumn(vals, fieldIdx, (MapColumnVector) vector, schema, lengthVector, childCount);
- }
- break;
- case UNION:
- throw new UnsupportedOperationException("UNION type not supported yet");
- default:
- throw new IllegalArgumentException("Unknown type " + schema);
- }
- }
-
- private static <T> void readNonNullLongColumn(Object[] vals, int fieldIdx, LongColumnVector vector, long[] lengthVector, int childCount,
- LongFunction<T> reader, IntFunction<T[]> array) {
-
- // check if the values need to be read into lists or as single values
- if (lengthVector == null) {
- if (vector.isRepeating) { // fill complete column with first value
- T repeatingValue = reader.apply(vector.vector[0]);
- fillColumnWithRepeatingValue(vals, fieldIdx, repeatingValue, childCount);
- } else {
- if (fieldIdx == -1) { // set as an object
- for (int i = 0; i < childCount; i++) {
- vals[i] = reader.apply(vector.vector[i]);
- }
- } else { // set as a field of Row
- Row[] rows = (Row[]) vals;
- for (int i = 0; i < childCount; i++) {
- rows[i].setField(fieldIdx, reader.apply(vector.vector[i]));
- }
- }
- }
- } else { // in a list
- T[] temp;
- int offset = 0;
- if (vector.isRepeating) { // fill complete list with first value
- T repeatingValue = reader.apply(vector.vector[0]);
- for (int i = 0; offset < childCount; i++) {
- temp = array.apply((int) lengthVector[i]);
- Arrays.fill(temp, repeatingValue);
- offset += temp.length;
- if (fieldIdx == -1) {
- vals[i] = temp;
- } else {
- ((Row) vals[i]).setField(fieldIdx, temp);
- }
- }
- } else {
- for (int i = 0; offset < childCount; i++) {
- temp = array.apply((int) lengthVector[i]);
- for (int j = 0; j < temp.length; j++) {
- temp[j] = reader.apply(vector.vector[offset++]);
- }
- if (fieldIdx == -1) {
- vals[i] = temp;
- } else {
- ((Row) vals[i]).setField(fieldIdx, temp);
- }
- }
- }
- }
- }
-
- private static <T> void readNonNullDoubleColumn(Object[] vals, int fieldIdx, DoubleColumnVector vector, long[] lengthVector, int childCount,
- DoubleFunction<T> reader, IntFunction<T[]> array) {
-
- // check if the values need to be read into lists or as single values
- if (lengthVector == null) {
- if (vector.isRepeating) { // fill complete column with first value
- T repeatingValue = reader.apply(vector.vector[0]);
- fillColumnWithRepeatingValue(vals, fieldIdx, repeatingValue, childCount);
- } else {
- if (fieldIdx == -1) { // set as an object
- for (int i = 0; i < childCount; i++) {
- vals[i] = reader.apply(vector.vector[i]);
- }
- } else { // set as a field of Row
- Row[] rows = (Row[]) vals;
- for (int i = 0; i < childCount; i++) {
- rows[i].setField(fieldIdx, reader.apply(vector.vector[i]));
- }
- }
- }
- } else { // in a list
- T[] temp;
- int offset = 0;
- if (vector.isRepeating) { // fill complete list with first value
- T repeatingValue = reader.apply(vector.vector[0]);
- for (int i = 0; offset < childCount; i++) {
- temp = array.apply((int) lengthVector[i]);
- Arrays.fill(temp, repeatingValue);
- offset += temp.length;
- if (fieldIdx == -1) {
- vals[i] = temp;
- } else {
- ((Row) vals[i]).setField(fieldIdx, temp);
- }
- }
- } else {
- for (int i = 0; offset < childCount; i++) {
- temp = array.apply((int) lengthVector[i]);
- for (int j = 0; j < temp.length; j++) {
- temp[j] = reader.apply(vector.vector[offset++]);
- }
- if (fieldIdx == -1) {
- vals[i] = temp;
- } else {
- ((Row) vals[i]).setField(fieldIdx, temp);
- }
- }
- }
- }
- }
-
- private static void readNonNullBytesColumnAsString(Object[] vals, int fieldIdx, BytesColumnVector bytes, long[] lengthVector, int childCount) {
- // check if the values need to be read into lists or as single values
- if (lengthVector == null) {
- if (bytes.isRepeating) { // fill complete column with first value
- String repeatingValue = new String(bytes.vector[0], bytes.start[0], bytes.length[0]);
- fillColumnWithRepeatingValue(vals, fieldIdx, repeatingValue, childCount);
- } else {
- if (fieldIdx == -1) { // set as an object
- for (int i = 0; i < childCount; i++) {
- vals[i] = new String(bytes.vector[i], bytes.start[i], bytes.length[i], StandardCharsets.UTF_8);
- }
- } else { // set as a field of Row
- Row[] rows = (Row[]) vals;
- for (int i = 0; i < childCount; i++) {
- rows[i].setField(fieldIdx, new String(bytes.vector[i], bytes.start[i], bytes.length[i], StandardCharsets.UTF_8));
- }
- }
- }
- } else {
- String[] temp;
- int offset = 0;
- if (bytes.isRepeating) { // fill complete list with first value
- String repeatingValue = new String(bytes.vector[0], bytes.start[0], bytes.length[0], StandardCharsets.UTF_8);
- for (int i = 0; offset < childCount; i++) {
- temp = new String[(int) lengthVector[i]];
- Arrays.fill(temp, repeatingValue);
- offset += temp.length;
- if (fieldIdx == -1) {
- vals[i] = temp;
- } else {
- ((Row) vals[i]).setField(fieldIdx, temp);
- }
- }
- } else {
- for (int i = 0; offset < childCount; i++) {
- temp = new String[(int) lengthVector[i]];
- for (int j = 0; j < temp.length; j++) {
- temp[j] = new String(bytes.vector[offset], bytes.start[offset], bytes.length[offset], StandardCharsets.UTF_8);
- offset++;
- }
- if (fieldIdx == -1) {
- vals[i] = temp;
- } else {
- ((Row) vals[i]).setField(fieldIdx, temp);
- }
- }
- }
- }
- }
-
- private static void readNonNullBytesColumnAsBinary(Object[] vals, int fieldIdx, BytesColumnVector bytes, long[] lengthVector, int childCount) {
- // check if the values need to be read into lists or as single values
- if (lengthVector == null) {
- if (bytes.isRepeating) { // fill complete column with first value
- if (fieldIdx == -1) { // set as an object
- for (int i = 0; i < childCount; i++) {
- // don't reuse repeating val to avoid object mutation
- vals[i] = readBinary(bytes.vector[0], bytes.start[0], bytes.length[0]);
- }
- } else { // set as a field of Row
- Row[] rows = (Row[]) vals;
- for (int i = 0; i < childCount; i++) {
- // don't reuse repeating val to avoid object mutation
- rows[i].setField(fieldIdx, readBinary(bytes.vector[0], bytes.start[0], bytes.length[0]));
- }
- }
- } else {
- if (fieldIdx == -1) { // set as an object
- for (int i = 0; i < childCount; i++) {
- vals[i] = readBinary(bytes.vector[i], bytes.start[i], bytes.length[i]);
- }
- } else { // set as a field of Row
- Row[] rows = (Row[]) vals;
- for (int i = 0; i < childCount; i++) {
- rows[i].setField(fieldIdx, readBinary(bytes.vector[i], bytes.start[i], bytes.length[i]));
- }
- }
- }
- } else {
- byte[][] temp;
- int offset = 0;
- if (bytes.isRepeating) { // fill complete list with first value
- for (int i = 0; offset < childCount; i++) {
- temp = new byte[(int) lengthVector[i]][];
- for (int j = 0; j < temp.length; j++) {
- temp[j] = readBinary(bytes.vector[0], bytes.start[0], bytes.length[0]);
- }
- offset += temp.length;
- if (fieldIdx == -1) {
- vals[i] = temp;
- } else {
- ((Row) vals[i]).setField(fieldIdx, temp);
- }
- }
- } else {
- for (int i = 0; offset < childCount; i++) {
- temp = new byte[(int) lengthVector[i]][];
- for (int j = 0; j < temp.length; j++) {
- temp[j] = readBinary(bytes.vector[offset], bytes.start[offset], bytes.length[offset]);
- offset++;
- }
- if (fieldIdx == -1) {
- vals[i] = temp;
- } else {
- ((Row) vals[i]).setField(fieldIdx, temp);
- }
- }
- }
- }
- }
-
- private static void readNonNullLongColumnAsDate(Object[] vals, int fieldIdx, LongColumnVector vector, long[] lengthVector, int childCount) {
-
- // check if the values need to be read into lists or as single values
- if (lengthVector == null) {
- if (vector.isRepeating) { // fill complete column with first value
- if (fieldIdx == -1) { // set as an object
- for (int i = 0; i < childCount; i++) {
- // do not reuse repeated value due to mutability of Date
- vals[i] = readDate(vector.vector[0]);
- }
- } else { // set as a field of Row
- Row[] rows = (Row[]) vals;
- for (int i = 0; i < childCount; i++) {
- // do not reuse repeated value due to mutability of Date
- rows[i].setField(fieldIdx, readDate(vector.vector[0]));
- }
- }
- } else {
- if (fieldIdx == -1) { // set as an object
- for (int i = 0; i < childCount; i++) {
- vals[i] = readDate(vector.vector[i]);
- }
- } else { // set as a field of Row
- Row[] rows = (Row[]) vals;
- for (int i = 0; i < childCount; i++) {
- rows[i].setField(fieldIdx, readDate(vector.vector[i]));
- }
- }
- }
- } else { // in a list
- Date[] temp;
- int offset = 0;
- if (vector.isRepeating) { // fill complete list with first value
- for (int i = 0; offset < childCount; i++) {
- temp = new Date[(int) lengthVector[i]];
- for (int j = 0; j < temp.length; j++) {
- temp[j] = readDate(vector.vector[0]);
- }
- offset += temp.length;
- if (fieldIdx == -1) {
- vals[i] = temp;
- } else {
- ((Row) vals[i]).setField(fieldIdx, temp);
- }
- }
- } else {
- for (int i = 0; offset < childCount; i++) {
- temp = new Date[(int) lengthVector[i]];
- for (int j = 0; j < temp.length; j++) {
- temp[j] = readDate(vector.vector[offset++]);
- }
- if (fieldIdx == -1) {
- vals[i] = temp;
- } else {
- ((Row) vals[i]).setField(fieldIdx, temp);
- }
- }
- }
- }
- }
-
- private static void readNonNullTimestampColumn(Object[] vals, int fieldIdx, TimestampColumnVector vector, long[] lengthVector, int childCount) {
-
- // check if the timestamps need to be read into lists or as single values
- if (lengthVector == null) {
- if (vector.isRepeating) { // fill complete column with first value
- if (fieldIdx == -1) { // set as an object
- for (int i = 0; i < childCount; i++) {
- // do not reuse value to prevent object mutation
- vals[i] = readTimestamp(vector.time[0], vector.nanos[0]);
- }
- } else { // set as a field of Row
- Row[] rows = (Row[]) vals;
- for (int i = 0; i < childCount; i++) {
- // do not reuse value to prevent object mutation
- rows[i].setField(fieldIdx, readTimestamp(vector.time[0], vector.nanos[0]));
- }
- }
- } else {
- if (fieldIdx == -1) { // set as an object
- for (int i = 0; i < childCount; i++) {
- vals[i] = readTimestamp(vector.time[i], vector.nanos[i]);
- }
- } else { // set as a field of Row
- Row[] rows = (Row[]) vals;
- for (int i = 0; i < childCount; i++) {
- rows[i].setField(fieldIdx, readTimestamp(vector.time[i], vector.nanos[i]));
- }
- }
- }
- } else {
- Timestamp[] temp;
- int offset = 0;
- if (vector.isRepeating) { // fill complete list with first value
- for (int i = 0; offset < childCount; i++) {
- temp = new Timestamp[(int) lengthVector[i]];
- for (int j = 0; j < temp.length; j++) {
- // do not reuse value to prevent object mutation
- temp[j] = readTimestamp(vector.time[0], vector.nanos[0]);
- }
- offset += temp.length;
- if (fieldIdx == -1) {
- vals[i] = temp;
- } else {
- ((Row) vals[i]).setField(fieldIdx, temp);
- }
- }
- } else {
- for (int i = 0; offset < childCount; i++) {
- temp = new Timestamp[(int) lengthVector[i]];
- for (int j = 0; j < temp.length; j++) {
- temp[j] = readTimestamp(vector.time[offset], vector.nanos[offset]);
- offset++;
- }
- if (fieldIdx == -1) {
- vals[i] = temp;
- } else {
- ((Row) vals[i]).setField(fieldIdx, temp);
- }
- }
- }
- }
- }
-
- private static void readNonNullDecimalColumn(Object[] vals, int fieldIdx, DecimalColumnVector vector, long[] lengthVector, int childCount) {
-
- // check if the decimals need to be read into lists or as single values
- if (lengthVector == null) {
- if (vector.isRepeating) { // fill complete column with first value
- fillColumnWithRepeatingValue(vals, fieldIdx, readBigDecimal(vector.vector[0]), childCount);
- } else {
- if (fieldIdx == -1) { // set as an object
- for (int i = 0; i < childCount; i++) {
- vals[i] = readBigDecimal(vector.vector[i]);
- }
- } else { // set as a field of Row
- Row[] rows = (Row[]) vals;
- for (int i = 0; i < childCount; i++) {
- rows[i].setField(fieldIdx, readBigDecimal(vector.vector[i]));
- }
- }
- }
- } else {
- BigDecimal[] temp;
- int offset = 0;
- if (vector.isRepeating) { // fill complete list with first value
- BigDecimal repeatingValue = readBigDecimal(vector.vector[0]);
- for (int i = 0; offset < childCount; i++) {
- temp = new BigDecimal[(int) lengthVector[i]];
- Arrays.fill(temp, repeatingValue);
- offset += temp.length;
- if (fieldIdx == -1) {
- vals[i] = temp;
- } else {
- ((Row) vals[i]).setField(fieldIdx, temp);
- }
- }
- } else {
- for (int i = 0; offset < childCount; i++) {
- temp = new BigDecimal[(int) lengthVector[i]];
- for (int j = 0; j < temp.length; j++) {
- temp[j] = readBigDecimal(vector.vector[offset++]);
- }
- if (fieldIdx == -1) {
- vals[i] = temp;
- } else {
- ((Row) vals[i]).setField(fieldIdx, temp);
- }
- }
- }
- }
-
- }
-
- private static void readNonNullStructColumn(Object[] vals, int fieldIdx, StructColumnVector structVector, TypeDescription schema, long[] lengthVector, int childCount) {
-
- List<TypeDescription> childrenTypes = schema.getChildren();
-
- int numFields = childrenTypes.size();
- // create a batch of Rows to read the structs
- Row[] structs = new Row[childCount];
- // TODO: possible improvement: reuse existing Row objects
- for (int i = 0; i < childCount; i++) {
- structs[i] = new Row(numFields);
- }
-
- // read struct fields
- for (int i = 0; i < numFields; i++) {
- readField(structs, i, childrenTypes.get(i), structVector.fields[i], null, childCount);
- }
-
- // check if the structs need to be read into lists or as single values
- if (lengthVector == null) {
- if (fieldIdx == -1) { // set struct as an object
- System.arraycopy(structs, 0, vals, 0, childCount);
- } else { // set struct as a field of Row
- Row[] rows = (Row[]) vals;
- for (int i = 0; i < childCount; i++) {
- rows[i].setField(fieldIdx, structs[i]);
- }
- }
- } else { // struct in a list
- int offset = 0;
- Row[] temp;
- for (int i = 0; offset < childCount; i++) {
- temp = new Row[(int) lengthVector[i]];
- System.arraycopy(structs, offset, temp, 0, temp.length);
- offset = offset + temp.length;
- if (fieldIdx == -1) {
- vals[i] = temp;
- } else {
- ((Row) vals[i]).setField(fieldIdx, temp);
- }
- }
- }
- }
-
- private static void readNonNullListColumn(Object[] vals, int fieldIdx, ListColumnVector list, TypeDescription schema, long[] lengthVector, int childCount) {
-
- TypeDescription fieldType = schema.getChildren().get(0);
- // check if the list need to be read into lists or as single values
- if (lengthVector == null) {
- long[] lengthVectorNested = list.lengths;
- readField(vals, fieldIdx, fieldType, list.child, lengthVectorNested, list.childCount);
- } else { // list in a list
- Object[] nestedLists = new Object[childCount];
- // length vector for nested list
- long[] lengthVectorNested = list.lengths;
- // read nested list
- readField(nestedLists, -1, fieldType, list.child, lengthVectorNested, list.childCount);
- // get type of nestedList
- Class<?> classType = nestedLists[0].getClass();
-
- // fill outer list with nested list
- int offset = 0;
- int length;
- for (int i = 0; offset < childCount; i++) {
- length = (int) lengthVector[i];
- Object[] temp = (Object[]) Array.newInstance(classType, length);
- System.arraycopy(nestedLists, offset, temp, 0, length);
- offset = offset + length;
- if (fieldIdx == -1) {
- vals[i] = temp;
- } else {
- ((Row) vals[i]).setField(fieldIdx, temp);
- }
- }
- }
- }
-
- private static void readNonNullMapColumn(Object[] vals, int fieldIdx, MapColumnVector mapsVector, TypeDescription schema, long[] lengthVector, int childCount) {
-
- List<TypeDescription> fieldType = schema.getChildren();
- TypeDescription keyType = fieldType.get(0);
- TypeDescription valueType = fieldType.get(1);
-
- ColumnVector keys = mapsVector.keys;
- ColumnVector values = mapsVector.values;
- Object[] keyRows = new Object[mapsVector.childCount];
- Object[] valueRows = new Object[mapsVector.childCount];
-
- // read map keys and values
- readField(keyRows, -1, keyType, keys, null, keyRows.length);
- readField(valueRows, -1, valueType, values, null, valueRows.length);
-
- // check if the maps need to be read into lists or as single values
- if (lengthVector == null) {
- long[] lengthVectorMap = mapsVector.lengths;
- int offset = 0;
-
- for (int i = 0; i < childCount; i++) {
- long numMapEntries = lengthVectorMap[i];
- HashMap map = readHashMap(keyRows, valueRows, offset, numMapEntries);
- offset += numMapEntries;
-
- if (fieldIdx == -1) {
- vals[i] = map;
- } else {
- ((Row) vals[i]).setField(fieldIdx, map);
- }
- }
- } else { // list of map
-
- long[] lengthVectorMap = mapsVector.lengths;
- int mapOffset = 0; // offset of map element
- int offset = 0; // offset of map
- HashMap[] temp;
-
- for (int i = 0; offset < childCount; i++) {
- temp = new HashMap[(int) lengthVector[i]];
- for (int j = 0; j < temp.length; j++) {
- long numMapEntries = lengthVectorMap[offset];
- temp[j] = readHashMap(keyRows, valueRows, mapOffset, numMapEntries);
- mapOffset += numMapEntries;
- offset++;
- }
- if (fieldIdx == 1) {
- vals[i] = temp;
- } else {
- ((Row) vals[i]).setField(fieldIdx, temp);
- }
- }
- }
- }
-
- private static <T> void readLongColumn(Object[] vals, int fieldIdx, LongColumnVector vector, long[] lengthVector, int childCount,
- LongFunction<T> reader, IntFunction<T[]> array) {
-
- // check if the values need to be read into lists or as single values
- if (lengthVector == null) {
- if (vector.isRepeating) { // fill complete column with first value
- // since the column contains null values and has just one distinct value, the repeated value is null
- fillColumnWithRepeatingValue(vals, fieldIdx, null, childCount);
- } else {
- boolean[] isNullVector = vector.isNull;
- if (fieldIdx == -1) { // set as an object
- for (int i = 0; i < childCount; i++) {
- if (isNullVector[i]) {
- vals[i] = null;
- } else {
- vals[i] = reader.apply(vector.vector[i]);
- }
- }
- } else { // set as a field of Row
- Row[] rows = (Row[]) vals;
- for (int i = 0; i < childCount; i++) {
- if (isNullVector[i]) {
- rows[i].setField(fieldIdx, null);
- } else {
- rows[i].setField(fieldIdx, reader.apply(vector.vector[i]));
- }
- }
- }
- }
- } else { // in a list
- if (vector.isRepeating) { // // fill complete list with first value
- // since the column contains null values and has just one distinct value, the repeated value is null
- fillListWithRepeatingNull(vals, fieldIdx, lengthVector, childCount, array);
- } else {
- // column contain null values
- int offset = 0;
- T[] temp;
- boolean[] isNullVector = vector.isNull;
- for (int i = 0; offset < childCount; i++) {
- temp = array.apply((int) lengthVector[i]);
- for (int j = 0; j < temp.length; j++) {
- if (isNullVector[offset]) {
- offset++;
- } else {
- temp[j] = reader.apply(vector.vector[offset++]);
- }
- }
- if (fieldIdx == -1) {
- vals[i] = temp;
- } else {
- ((Row) vals[i]).setField(fieldIdx, temp);
- }
- }
- }
- }
- }
-
- private static <T> void readDoubleColumn(Object[] vals, int fieldIdx, DoubleColumnVector vector, long[] lengthVector, int childCount,
- DoubleFunction<T> reader, IntFunction<T[]> array) {
-
- // check if the values need to be read into lists or as single values
- if (lengthVector == null) {
- if (vector.isRepeating) { // fill complete column with first value
- // since the column contains null values and has just one distinct value, the repeated value is null
- fillColumnWithRepeatingValue(vals, fieldIdx, null, childCount);
- } else {
- boolean[] isNullVector = vector.isNull;
- if (fieldIdx == -1) { // set as an object
- for (int i = 0; i < childCount; i++) {
- if (isNullVector[i]) {
- vals[i] = null;
- } else {
- vals[i] = reader.apply(vector.vector[i]);
- }
- }
- } else { // set as a field of Row
- Row[] rows = (Row[]) vals;
- for (int i = 0; i < childCount; i++) {
- if (isNullVector[i]) {
- rows[i].setField(fieldIdx, null);
- } else {
- rows[i].setField(fieldIdx, reader.apply(vector.vector[i]));
- }
- }
- }
- }
- } else { // in a list
- if (vector.isRepeating) { // // fill complete list with first value
- // since the column contains null values and has just one distinct value, the repeated value is null
- fillListWithRepeatingNull(vals, fieldIdx, lengthVector, childCount, array);
- } else {
- // column contain null values
- int offset = 0;
- T[] temp;
- boolean[] isNullVector = vector.isNull;
- for (int i = 0; offset < childCount; i++) {
- temp = array.apply((int) lengthVector[i]);
- for (int j = 0; j < temp.length; j++) {
- if (isNullVector[offset]) {
- offset++;
- } else {
- temp[j] = reader.apply(vector.vector[offset++]);
- }
- }
- if (fieldIdx == -1) {
- vals[i] = temp;
- } else {
- ((Row) vals[i]).setField(fieldIdx, temp);
- }
- }
- }
- }
- }
-
- private static void readBytesColumnAsString(Object[] vals, int fieldIdx, BytesColumnVector bytes, long[] lengthVector, int childCount) {
-
- // check if the values need to be read into lists or as single values
- if (lengthVector == null) {
- if (bytes.isRepeating) { // fill complete column with first value
- // since the column contains null values and has just one distinct value, the repeated value is null
- fillColumnWithRepeatingValue(vals, fieldIdx, null, childCount);
- } else {
- boolean[] isNullVector = bytes.isNull;
- if (fieldIdx == -1) { // set as an object
- for (int i = 0; i < childCount; i++) {
- if (isNullVector[i]) {
- vals[i] = null;
- } else {
- vals[i] = new String(bytes.vector[i], bytes.start[i], bytes.length[i]);
- }
- }
- } else { // set as a field of Row
- Row[] rows = (Row[]) vals;
- for (int i = 0; i < childCount; i++) {
- if (isNullVector[i]) {
- rows[i].setField(fieldIdx, null);
- } else {
- rows[i].setField(fieldIdx, new String(bytes.vector[i], bytes.start[i], bytes.length[i]));
- }
- }
- }
- }
- } else { // in a list
- if (bytes.isRepeating) { // fill list with first value
- // since the column contains null values and has just one distinct value, the repeated value is null
- fillListWithRepeatingNull(vals, fieldIdx, lengthVector, childCount, OrcUtils::stringArray);
- } else {
- int offset = 0;
- String[] temp;
- boolean[] isNullVector = bytes.isNull;
- for (int i = 0; offset < childCount; i++) {
- temp = new String[(int) lengthVector[i]];
- for (int j = 0; j < temp.length; j++) {
- if (isNullVector[offset]) {
- offset++;
- } else {
- temp[j] = new String(bytes.vector[offset], bytes.start[offset], bytes.length[offset]);
- offset++;
- }
- }
- if (fieldIdx == -1) {
- vals[i] = temp;
- } else {
- ((Row) vals[i]).setField(fieldIdx, temp);
- }
- }
- }
- }
- }
-
- private static void readBytesColumnAsBinary(Object[] vals, int fieldIdx, BytesColumnVector bytes, long[] lengthVector, int childCount) {
-
- // check if the binary need to be read into lists or as single values
- if (lengthVector == null) {
- if (bytes.isRepeating) { // fill complete column with first value
- // since the column contains null values and has just one distinct value, the repeated value is null
- fillColumnWithRepeatingValue(vals, fieldIdx, null, childCount);
- } else {
- boolean[] isNullVector = bytes.isNull;
- if (fieldIdx == -1) { // set as an object
- for (int i = 0; i < childCount; i++) {
- if (isNullVector[i]) {
- vals[i] = null;
- } else {
- vals[i] = readBinary(bytes.vector[i], bytes.start[i], bytes.length[i]);
- }
- }
- } else { // set as a field of Row
- Row[] rows = (Row[]) vals;
- for (int i = 0; i < childCount; i++) {
- if (isNullVector[i]) {
- rows[i].setField(fieldIdx, null);
- } else {
- rows[i].setField(fieldIdx, readBinary(bytes.vector[i], bytes.start[i], bytes.length[i]));
- }
- }
- }
- }
- } else {
- if (bytes.isRepeating) { // fill complete list with first value
- // since the column contains null values and has just one distinct value, the repeated value is null
- fillListWithRepeatingNull(vals, fieldIdx, lengthVector, childCount, OrcUtils::binaryArray);
- } else {
- int offset = 0;
- byte[][] temp;
- boolean[] isNullVector = bytes.isNull;
- for (int i = 0; offset < childCount; i++) {
- temp = new byte[(int) lengthVector[i]][];
- for (int j = 0; j < temp.length; j++) {
- if (isNullVector[offset]) {
- offset++;
- } else {
- temp[j] = readBinary(bytes.vector[offset], bytes.start[offset], bytes.length[offset]);
- offset++;
- }
- }
- if (fieldIdx == -1) {
- vals[i] = temp;
- } else {
- ((Row) vals[i]).setField(fieldIdx, temp);
- }
- }
- }
- }
- }
-
- private static void readLongColumnAsDate(Object[] vals, int fieldIdx, LongColumnVector vector, long[] lengthVector, int childCount) {
-
- // check if the values need to be read into lists or as single values
- if (lengthVector == null) {
- if (vector.isRepeating) { // fill complete column with first value
- // since the column contains null values and has just one distinct value, the repeated value is null
- fillColumnWithRepeatingValue(vals, fieldIdx, null, childCount);
- } else {
- boolean[] isNullVector = vector.isNull;
- if (fieldIdx == -1) { // set as an object
- for (int i = 0; i < childCount; i++) {
- if (isNullVector[i]) {
- vals[i] = null;
- } else {
- vals[i] = readDate(vector.vector[i]);
- }
- }
- } else { // set as a field of Row
- Row[] rows = (Row[]) vals;
- for (int i = 0; i < childCount; i++) {
- if (isNullVector[i]) {
- rows[i].setField(fieldIdx, null);
- } else {
- rows[i].setField(fieldIdx, readDate(vector.vector[i]));
- }
- }
- }
- }
- } else { // in a list
- if (vector.isRepeating) { // // fill complete list with first value
- // since the column contains null values and has just one distinct value, the repeated value is null
- fillListWithRepeatingNull(vals, fieldIdx, lengthVector, childCount, OrcUtils::dateArray);
- } else {
- // column contain null values
- int offset = 0;
- Date[] temp;
- boolean[] isNullVector = vector.isNull;
- for (int i = 0; offset < childCount; i++) {
- temp = new Date[(int) lengthVector[i]];
- for (int j = 0; j < temp.length; j++) {
- if (isNullVector[offset]) {
- offset++;
- } else {
- temp[j] = readDate(vector.vector[offset++]);
- }
- }
- if (fieldIdx == -1) {
- vals[i] = temp;
- } else {
- ((Row) vals[i]).setField(fieldIdx, temp);
- }
- }
- }
- }
- }
-
- private static void readTimestampColumn(Object[] vals, int fieldIdx, TimestampColumnVector vector, long[] lengthVector, int childCount) {
-
- // check if the timestamps need to be read into lists or as single values
- if (lengthVector == null) {
- if (vector.isRepeating) { // fill complete column with first value
- // since the column contains null values and has just one distinct value, the repeated value is null
- fillColumnWithRepeatingValue(vals, fieldIdx, null, childCount);
- } else {
- boolean[] isNullVector = vector.isNull;
- if (fieldIdx == -1) { // set as an object
- for (int i = 0; i < childCount; i++) {
- if (isNullVector[i]) {
- vals[i] = null;
- } else {
- Timestamp ts = readTimestamp(vector.time[i], vector.nanos[i]);
- vals[i] = ts;
- }
- }
- } else { // set as a field of Row
- Row[] rows = (Row[]) vals;
- for (int i = 0; i < childCount; i++) {
- if (isNullVector[i]) {
- rows[i].setField(fieldIdx, null);
- } else {
- Timestamp ts = readTimestamp(vector.time[i], vector.nanos[i]);
- rows[i].setField(fieldIdx, ts);
- }
- }
- }
- }
- } else {
- if (vector.isRepeating) { // fill complete list with first value
- // since the column contains null values and has just one distinct value, the repeated value is null
- fillListWithRepeatingNull(vals, fieldIdx, lengthVector, childCount, OrcUtils::timestampArray);
- } else {
- int offset = 0;
- Timestamp[] temp;
- boolean[] isNullVector = vector.isNull;
- for (int i = 0; offset < childCount; i++) {
- temp = new Timestamp[(int) lengthVector[i]];
- for (int j = 0; j < temp.length; j++) {
- if (isNullVector[offset]) {
- offset++;
- } else {
- temp[j] = readTimestamp(vector.time[offset], vector.nanos[offset]);
- offset++;
- }
- }
- if (fieldIdx == -1) {
- vals[i] = temp;
- } else {
- ((Row) vals[i]).setField(fieldIdx, temp);
- }
- }
- }
- }
- }
-
- private static void readDecimalColumn(Object[] vals, int fieldIdx, DecimalColumnVector vector, long[] lengthVector, int childCount) {
-
- // check if the decimals need to be read into lists or as single values
- if (lengthVector == null) {
- if (vector.isRepeating) { // fill complete column with first value
- // since the column contains null values and has just one distinct value, the repeated value is null
- fillColumnWithRepeatingValue(vals, fieldIdx, null, childCount);
- } else {
- boolean[] isNullVector = vector.isNull;
- if (fieldIdx == -1) { // set as an object
- for (int i = 0; i < childCount; i++) {
- if (isNullVector[i]) {
- vals[i] = null;
- } else {
- vals[i] = readBigDecimal(vector.vector[i]);
- }
- }
- } else { // set as a field of Row
- Row[] rows = (Row[]) vals;
- for (int i = 0; i < childCount; i++) {
- if (isNullVector[i]) {
- rows[i].setField(fieldIdx, null);
- } else {
- rows[i].setField(fieldIdx, readBigDecimal(vector.vector[i]));
- }
- }
- }
- }
- } else {
- if (vector.isRepeating) { // fill complete list with first value
- // since the column contains null values and has just one distinct value, the repeated value is null
- fillListWithRepeatingNull(vals, fieldIdx, lengthVector, childCount, OrcUtils::decimalArray);
- } else {
- int offset = 0;
- BigDecimal[] temp;
- boolean[] isNullVector = vector.isNull;
- for (int i = 0; offset < childCount; i++) {
- temp = new BigDecimal[(int) lengthVector[i]];
- for (int j = 0; j < temp.length; j++) {
- if (isNullVector[offset]) {
- offset++;
- } else {
- temp[j] = readBigDecimal(vector.vector[offset++]);
- }
- }
- if (fieldIdx == -1) {
- vals[i] = temp;
- } else {
- ((Row) vals[i]).setField(fieldIdx, temp);
- }
- }
- }
- }
- }
-
- private static void readStructColumn(Object[] vals, int fieldIdx, StructColumnVector structVector, TypeDescription schema, long[] lengthVector, int childCount) {
-
- List<TypeDescription> childrenTypes = schema.getChildren();
-
- int numFields = childrenTypes.size();
- // create a batch of Rows to read the structs
- Row[] structs = new Row[childCount];
- // TODO: possible improvement: reuse existing Row objects
- for (int i = 0; i < childCount; i++) {
- structs[i] = new Row(numFields);
- }
-
- // read struct fields
- for (int i = 0; i < numFields; i++) {
- readField(structs, i, childrenTypes.get(i), structVector.fields[i], null, childCount);
- }
-
- boolean[] isNullVector = structVector.isNull;
-
- // check if the structs need to be read into lists or as single values
- if (lengthVector == null) {
- if (fieldIdx == -1) { // set struct as an object
- for (int i = 0; i < childCount; i++) {
- if (isNullVector[i]) {
- vals[i] = null;
- } else {
- vals[i] = structs[i];
- }
- }
- } else { // set struct as a field of Row
- Row[] rows = (Row[]) vals;
- for (int i = 0; i < childCount; i++) {
- if (isNullVector[i]) {
- rows[i].setField(fieldIdx, null);
- } else {
- rows[i].setField(fieldIdx, structs[i]);
- }
- }
- }
- } else { // struct in a list
- int offset = 0;
- Row[] temp;
- for (int i = 0; offset < childCount; i++) {
- temp = new Row[(int) lengthVector[i]];
- for (int j = 0; j < temp.length; j++) {
- if (isNullVector[offset]) {
- temp[j] = null;
- } else {
- temp[j] = structs[offset++];
- }
- }
- if (fieldIdx == -1) { // set list of structs as an object
- vals[i] = temp;
- } else { // set list of structs as field of row
- ((Row) vals[i]).setField(fieldIdx, temp);
- }
- }
- }
- }
-
- private static void readListColumn(Object[] vals, int fieldIdx, ListColumnVector list, TypeDescription schema, long[] lengthVector, int childCount) {
-
- TypeDescription fieldType = schema.getChildren().get(0);
- // check if the lists need to be read into lists or as single values
- if (lengthVector == null) {
- long[] lengthVectorNested = list.lengths;
- readField(vals, fieldIdx, fieldType, list.child, lengthVectorNested, list.childCount);
- } else { // list in a list
- Object[] nestedList = new Object[childCount];
- // length vector for nested list
- long[] lengthVectorNested = list.lengths;
- // read nested list
- readField(nestedList, -1, fieldType, list.child, lengthVectorNested, list.childCount);
-
- // fill outer list with nested list
- int offset = 0;
- int length;
- // get type of nestedList
- Class<?> classType = nestedList[0].getClass();
- for (int i = 0; offset < childCount; i++) {
- length = (int) lengthVector[i];
- Object[] temp = (Object[]) Array.newInstance(classType, length);
- System.arraycopy(nestedList, offset, temp, 0, length);
- offset = offset + length;
- if (fieldIdx == -1) { // set list of list as an object
- vals[i] = temp;
- } else { // set list of list as field of row
- ((Row) vals[i]).setField(fieldIdx, temp);
- }
- }
- }
- }
-
- private static void readMapColumn(Object[] vals, int fieldIdx, MapColumnVector map, TypeDescription schema, long[] lengthVector, int childCount) {
-
- List<TypeDescription> fieldType = schema.getChildren();
- TypeDescription keyType = fieldType.get(0);
- TypeDescription valueType = fieldType.get(1);
-
- ColumnVector keys = map.keys;
- ColumnVector values = map.values;
- Object[] keyRows = new Object[map.childCount];
- Object[] valueRows = new Object[map.childCount];
-
- // read map kes and values
- readField(keyRows, -1, keyType, keys, null, keyRows.length);
- readField(valueRows, -1, valueType, values, null, valueRows.length);
-
- boolean[] isNullVector = map.isNull;
-
- // check if the maps need to be read into lists or as single values
- if (lengthVector == null) {
- long[] lengthVectorMap = map.lengths;
- int offset = 0;
- if (fieldIdx == -1) { // set map as an object
- for (int i = 0; i < childCount; i++) {
- if (isNullVector[i]) {
- vals[i] = null;
- } else {
- vals[i] = readHashMap(keyRows, valueRows, offset, lengthVectorMap[i]);
- offset += lengthVectorMap[i];
- }
- }
- } else { // set map as a field of Row
- Row[] rows = (Row[]) vals;
- for (int i = 0; i < childCount; i++) {
- if (isNullVector[i]) {
- rows[i].setField(fieldIdx, null);
- } else {
- rows[i].setField(fieldIdx, readHashMap(keyRows, valueRows, offset, lengthVectorMap[i]));
- offset += lengthVectorMap[i];
- }
- }
- }
- } else { // list of map
- long[] lengthVectorMap = map.lengths;
- int mapOffset = 0; // offset of map element
- int offset = 0; // offset of map
- HashMap[] temp;
-
- for (int i = 0; offset < childCount; i++) {
- temp = new HashMap[(int) lengthVector[i]];
- for (int j = 0; j < temp.length; j++) {
- if (isNullVector[offset]) {
- temp[j] = null;
- } else {
- temp[j] = readHashMap(keyRows, valueRows, mapOffset, lengthVectorMap[offset]);
- mapOffset += lengthVectorMap[offset];
- offset++;
- }
- }
- if (fieldIdx == -1) {
- vals[i] = temp;
- } else {
- ((Row) vals[i]).setField(fieldIdx, temp);
- }
- }
- }
- }
-
- /**
- * Sets a repeating value to all objects or row fields of the passed vals array.
- *
- * @param vals The array of objects or Rows.
- * @param fieldIdx If the objs array is an array of Row, the index of the field that needs to be filled.
- * Otherwise a -1 must be passed and the data is directly filled into the array.
- * @param repeatingValue The value that is set.
- * @param childCount The number of times the value is set.
- */
- private static void fillColumnWithRepeatingValue(Object[] vals, int fieldIdx, Object repeatingValue, int childCount) {
-
- if (fieldIdx == -1) {
- // set value as an object
- Arrays.fill(vals, 0, childCount, repeatingValue);
- } else {
- // set value as a field of Row
- Row[] rows = (Row[]) vals;
- for (int i = 0; i < childCount; i++) {
- rows[i].setField(fieldIdx, repeatingValue);
- }
- }
- }
-
- /**
- * Sets arrays containing only null values to all objects or row fields of the passed vals array.
- *
- * @param vals The array of objects or Rows to which the empty arrays are set.
- * @param fieldIdx If the objs array is an array of Row, the index of the field that needs to be filled.
- * Otherwise a -1 must be passed and the data is directly filled into the array.
- * @param lengthVector The vector containing the lengths of the individual empty arrays.
- * @param childCount The number of objects or Rows to fill.
- * @param array A method to create arrays of the appropriate type.
- * @param <T> The type of the arrays to create.
- */
- private static <T> void fillListWithRepeatingNull(Object[] vals, int fieldIdx, long[] lengthVector, int childCount, IntFunction<T[]> array) {
-
- if (fieldIdx == -1) {
- // set empty array as object
- for (int i = 0; i < childCount; i++) {
- vals[i] = array.apply((int) lengthVector[i]);
- }
- } else {
- // set empty array as field in Row
- Row[] rows = (Row[]) vals;
- for (int i = 0; i < childCount; i++) {
- rows[i].setField(fieldIdx, array.apply((int) lengthVector[i]));
- }
- }
- }
-
- private static Boolean readBoolean(long l) {
- return l != 0;
- }
-
- private static Byte readByte(long l) {
- return (byte) l;
- }
-
- private static Short readShort(long l) {
- return (short) l;
- }
-
- private static Integer readInt(long l) {
- return (int) l;
- }
-
- private static Long readLong(long l) {
- return l;
- }
-
- private static Float readFloat(double d) {
- return (float) d;
- }
-
- private static Double readDouble(double d) {
- return d;
- }
-
- private static Date readDate(long l) {
- // day to milliseconds
- final long t = l * MILLIS_PER_DAY;
- // adjust by local timezone
- return new java.sql.Date(t - LOCAL_TZ.getOffset(t));
- }
-
- private static byte[] readBinary(byte[] src, int srcPos, int length) {
- byte[] result = new byte[length];
- System.arraycopy(src, srcPos, result, 0, length);
- return result;
- }
-
- private static BigDecimal readBigDecimal(HiveDecimalWritable hiveDecimalWritable) {
- HiveDecimal hiveDecimal = hiveDecimalWritable.getHiveDecimal();
- return hiveDecimal.bigDecimalValue();
- }
-
- private static Timestamp readTimestamp(long time, int nanos) {
- Timestamp ts = new Timestamp(time);
- ts.setNanos(nanos);
- return ts;
- }
-
- private static HashMap readHashMap(Object[] keyRows, Object[] valueRows, int offset, long length) {
- HashMap<Object, Object> resultMap = new HashMap<>();
- for (int j = 0; j < length; j++) {
- resultMap.put(keyRows[offset], valueRows[offset]);
- offset++;
- }
- return resultMap;
- }
-
- private static Boolean[] boolArray(int len) {
- return new Boolean[len];
- }
-
- private static Byte[] byteArray(int len) {
- return new Byte[len];
- }
-
- private static Short[] shortArray(int len) {
- return new Short[len];
- }
-
- private static Integer[] intArray(int len) {
- return new Integer[len];
- }
-
- private static Long[] longArray(int len) {
- return new Long[len];
- }
-
- private static Float[] floatArray(int len) {
- return new Float[len];
- }
-
- private static Double[] doubleArray(int len) {
- return new Double[len];
- }
-
- private static Date[] dateArray(int len) {
- return new Date[len];
- }
-
- private static byte[][] binaryArray(int len) {
- return new byte[len][];
- }
-
- private static String[] stringArray(int len) {
- return new String[len];
- }
-
- private static BigDecimal[] decimalArray(int len) {
- return new BigDecimal[len];
- }
-
- private static Timestamp[] timestampArray(int len) {
- return new Timestamp[len];
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/bcead3be/flink-connectors/flink-orc/src/test/java/org/apache/flink/orc/OrcBatchReaderTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-orc/src/test/java/org/apache/flink/orc/OrcBatchReaderTest.java b/flink-connectors/flink-orc/src/test/java/org/apache/flink/orc/OrcBatchReaderTest.java
new file mode 100644
index 0000000..b90313e
--- /dev/null
+++ b/flink-connectors/flink-orc/src/test/java/org/apache/flink/orc/OrcBatchReaderTest.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.orc;
+
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.java.typeutils.MapTypeInfo;
+import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+
+import org.apache.orc.TypeDescription;
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ * Unit tests for {@link OrcBatchReader}.
+ *
+ */
+public class OrcBatchReaderTest {
+
+ @Test
+ public void testFlatSchemaToTypeInfo1() {
+
+ String schema =
+ "struct<" +
+ "boolean1:boolean," +
+ "byte1:tinyint," +
+ "short1:smallint," +
+ "int1:int," +
+ "long1:bigint," +
+ "float1:float," +
+ "double1:double," +
+ "bytes1:binary," +
+ "string1:string," +
+ "date1:date," +
+ "timestamp1:timestamp," +
+ "decimal1:decimal(5,2)" +
+ ">";
+ TypeInformation typeInfo = OrcBatchReader.schemaToTypeInfo(TypeDescription.fromString(schema));
+
+ Assert.assertNotNull(typeInfo);
+ Assert.assertTrue(typeInfo instanceof RowTypeInfo);
+ RowTypeInfo rowTypeInfo = (RowTypeInfo) typeInfo;
+
+ // validate field types
+ Assert.assertArrayEquals(
+ new TypeInformation[]{
+ Types.BOOLEAN, Types.BYTE, Types.SHORT, Types.INT, Types.LONG, Types.FLOAT, Types.DOUBLE,
+ PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO, Types.STRING,
+ Types.SQL_DATE, Types.SQL_TIMESTAMP, BasicTypeInfo.BIG_DEC_TYPE_INFO
+ },
+ rowTypeInfo.getFieldTypes());
+
+ // validate field names
+ Assert.assertArrayEquals(
+ new String[] {
+ "boolean1", "byte1", "short1", "int1", "long1", "float1", "double1",
+ "bytes1", "string1", "date1", "timestamp1", "decimal1"
+ },
+ rowTypeInfo.getFieldNames());
+
+ }
+
+ @Test
+ public void testNestedSchemaToTypeInfo1() {
+
+ String schema =
+ "struct<" +
+ "middle:struct<" +
+ "list:array<" +
+ "struct<" +
+ "int1:int," +
+ "string1:string" +
+ ">" +
+ ">" +
+ ">," +
+ "list:array<" +
+ "struct<" +
+ "int1:int," +
+ "string1:string" +
+ ">" +
+ ">," +
+ "map:map<" +
+ "string," +
+ "struct<" +
+ "int1:int," +
+ "string1:string" +
+ ">" +
+ ">" +
+ ">";
+ TypeInformation typeInfo = OrcBatchReader.schemaToTypeInfo(TypeDescription.fromString(schema));
+
+ Assert.assertNotNull(typeInfo);
+ Assert.assertTrue(typeInfo instanceof RowTypeInfo);
+ RowTypeInfo rowTypeInfo = (RowTypeInfo) typeInfo;
+
+ // validate field types
+ Assert.assertArrayEquals(
+ new TypeInformation[]{
+ Types.ROW_NAMED(
+ new String[]{"list"},
+ ObjectArrayTypeInfo.getInfoFor(
+ Types.ROW_NAMED(
+ new String[]{"int1", "string1"},
+ Types.INT, Types.STRING
+ )
+ )
+ ),
+ ObjectArrayTypeInfo.getInfoFor(
+ Types.ROW_NAMED(
+ new String[]{"int1", "string1"},
+ Types.INT, Types.STRING
+ )
+ ),
+ new MapTypeInfo<>(
+ Types.STRING,
+ Types.ROW_NAMED(
+ new String[]{"int1", "string1"},
+ Types.INT, Types.STRING
+ )
+ )
+ },
+ rowTypeInfo.getFieldTypes());
+
+ // validate field names
+ Assert.assertArrayEquals(
+ new String[] {"middle", "list", "map"},
+ rowTypeInfo.getFieldNames());
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/bcead3be/flink-connectors/flink-orc/src/test/java/org/apache/flink/orc/OrcRowInputFormatTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-orc/src/test/java/org/apache/flink/orc/OrcRowInputFormatTest.java b/flink-connectors/flink-orc/src/test/java/org/apache/flink/orc/OrcRowInputFormatTest.java
index 0efe41f..2eb3231 100644
--- a/flink-connectors/flink-orc/src/test/java/org/apache/flink/orc/OrcRowInputFormatTest.java
+++ b/flink-connectors/flink-orc/src/test/java/org/apache/flink/orc/OrcRowInputFormatTest.java
@@ -26,6 +26,7 @@ import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.core.fs.FileInputSplit;
import org.apache.flink.core.fs.Path;
+import org.apache.flink.orc.util.OrcTestFileGenerator;
import org.apache.flink.types.Row;
import org.apache.flink.util.InstantiationUtil;
@@ -50,6 +51,7 @@ import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.any;
@@ -124,6 +126,32 @@ public class OrcRowInputFormatTest {
private static final String TEST_FILE_NESTEDLIST = "test-data-nestedlist.orc";
private static final String TEST_SCHEMA_NESTEDLIST = "struct<mylist1:array<array<struct<mylong1:bigint>>>>";
+ /** Generated by {@link OrcTestFileGenerator#writeCompositeTypesWithNullsFile(String)}. */
+ private static final String TEST_FILE_COMPOSITES_NULLS = "test-data-composites-with-nulls.orc";
+ private static final String TEST_SCHEMA_COMPOSITES_NULLS =
+ "struct<" +
+ "int1:int," +
+ "record1:struct<f1:int,f2:string>," +
+ "list1:array<array<array<struct<f1:string,f2:string>>>>," +
+ "list2:array<map<string,int>>" +
+ ">";
+
+ /** Generated by {@link OrcTestFileGenerator#writeCompositeTypesWithRepeatingFile(String)}. */
+ private static final String TEST_FILE_REPEATING = "test-data-repeating.orc";
+ private static final String TEST_SCHEMA_REPEATING =
+ "struct<" +
+ "int1:int," +
+ "int2:int," +
+ "int3:int," +
+ "record1:struct<f1:int,f2:string>," +
+ "record2:struct<f1:int,f2:string>," +
+ "list1:array<int>," +
+ "list2:array<int>," +
+ "list3:array<int>," +
+ "map1:map<int,string>," +
+ "map2:map<int,string>" +
+ ">";
+
@Test(expected = FileNotFoundException.class)
public void testInvalidPath() throws IOException{
rowOrcInputFormat =
@@ -477,7 +505,7 @@ public class OrcRowInputFormatTest {
}
@Test
- public void testReadNestedFile() throws IOException{
+ public void testReadNestedFile() throws IOException {
rowOrcInputFormat = new OrcRowInputFormat(getPath(TEST_FILE_NESTED), TEST_SCHEMA_NESTED, new Configuration());
FileInputSplit[] splits = rowOrcInputFormat.createInputSplits(1);
@@ -563,7 +591,7 @@ public class OrcRowInputFormatTest {
}
@Test
- public void testReadTimeTypeFile() throws IOException{
+ public void testReadTimeTypeFile() throws IOException {
rowOrcInputFormat = new OrcRowInputFormat(getPath(TEST_FILE_TIMETYPES), TEST_SCHEMA_TIMETYPES, new Configuration());
FileInputSplit[] splits = rowOrcInputFormat.createInputSplits(1);
@@ -590,7 +618,7 @@ public class OrcRowInputFormatTest {
}
@Test
- public void testReadDecimalTypeFile() throws IOException{
+ public void testReadDecimalTypeFile() throws IOException {
rowOrcInputFormat = new OrcRowInputFormat(getPath(TEST_FILE_DECIMAL), TEST_SCHEMA_DECIMAL, new Configuration());
FileInputSplit[] splits = rowOrcInputFormat.createInputSplits(1);
@@ -653,7 +681,183 @@ public class OrcRowInputFormatTest {
}
@Test
- public void testReadWithProjection() throws IOException{
+ public void testReadCompositesNullsFile() throws Exception {
+ rowOrcInputFormat = new OrcRowInputFormat(
+ getPath(TEST_FILE_COMPOSITES_NULLS),
+ TEST_SCHEMA_COMPOSITES_NULLS,
+ new Configuration());
+
+ FileInputSplit[] splits = rowOrcInputFormat.createInputSplits(1);
+ assertEquals(1, splits.length);
+ rowOrcInputFormat.openInputFormat();
+ rowOrcInputFormat.open(splits[0]);
+
+ assertFalse(rowOrcInputFormat.reachedEnd());
+
+ Row row = null;
+ long cnt = 0;
+
+ int structNullCnt = 0;
+ int nestedListNullCnt = 0;
+ int mapListNullCnt = 0;
+
+ // read all rows
+ while (!rowOrcInputFormat.reachedEnd()) {
+
+ row = rowOrcInputFormat.nextRecord(row);
+ assertEquals(4, row.getArity());
+
+ assertTrue(row.getField(0) instanceof Integer);
+
+ if (row.getField(1) == null) {
+ structNullCnt++;
+ } else {
+ Object f = row.getField(1);
+ assertTrue(f instanceof Row);
+ assertEquals(2, ((Row) f).getArity());
+ }
+
+ if (row.getField(2) == null) {
+ nestedListNullCnt++;
+ } else {
+ Object f = row.getField(2);
+ assertTrue(f instanceof Row[][][]);
+ assertEquals(4, ((Row[][][]) f).length);
+ }
+
+ if (row.getField(3) == null) {
+ mapListNullCnt++;
+ } else {
+ Object f = row.getField(3);
+ assertTrue(f instanceof HashMap[]);
+ assertEquals(3, ((HashMap[]) f).length);
+ }
+ cnt++;
+ }
+ // number of rows in file
+ assertEquals(2500, cnt);
+ // check number of null fields
+ assertEquals(1250, structNullCnt);
+ assertEquals(835, nestedListNullCnt);
+ assertEquals(835, mapListNullCnt);
+ }
+
+ @SuppressWarnings("unchecked")
+ @Test
+ public void testReadRepeatingValuesFile() throws IOException {
+ rowOrcInputFormat = new OrcRowInputFormat(
+ getPath(TEST_FILE_REPEATING),
+ TEST_SCHEMA_REPEATING,
+ new Configuration());
+
+ FileInputSplit[] splits = rowOrcInputFormat.createInputSplits(1);
+ assertEquals(1, splits.length);
+ rowOrcInputFormat.openInputFormat();
+ rowOrcInputFormat.open(splits[0]);
+
+ assertFalse(rowOrcInputFormat.reachedEnd());
+
+ Row row = null;
+ long cnt = 0;
+
+ Row firstRow1 = null;
+ Integer[] firstList1 = null;
+ HashMap firstMap1 = null;
+
+ // read all rows
+ while (!rowOrcInputFormat.reachedEnd()) {
+
+ cnt++;
+ row = rowOrcInputFormat.nextRecord(row);
+ assertEquals(10, row.getArity());
+
+ // check first int field (always 42)
+ assertNotNull(row.getField(0));
+ assertTrue(row.getField(0) instanceof Integer);
+ assertEquals(42, ((Integer) row.getField(0)).intValue());
+
+ // check second int field (always null)
+ assertNull(row.getField(1));
+
+ // check first int field (always 99)
+ assertNotNull(row.getField(2));
+ assertTrue(row.getField(2) instanceof Integer);
+ assertEquals(99, ((Integer) row.getField(2)).intValue());
+
+ // check first row field (always (23, null))
+ assertNotNull(row.getField(3));
+ assertTrue(row.getField(3) instanceof Row);
+ Row nestedRow = (Row) row.getField(3);
+ // check first field of nested row
+ assertNotNull(nestedRow.getField(0));
+ assertTrue(nestedRow.getField(0) instanceof Integer);
+ assertEquals(23, ((Integer) nestedRow.getField(0)).intValue());
+ // check second field of nested row
+ assertNull(nestedRow.getField(1));
+ // validate reference
+ if (firstRow1 == null) {
+ firstRow1 = nestedRow;
+ } else {
+ // repeated rows must be different instances
+ assertTrue(firstRow1 != nestedRow);
+ }
+
+ // check second row field (always null)
+ assertNull(row.getField(4));
+
+ // check first list field (always [1, 2, 3])
+ assertNotNull(row.getField(5));
+ assertTrue(row.getField(5) instanceof Integer[]);
+ Integer[] list1 = ((Integer[]) row.getField(5));
+ assertEquals(1, list1[0].intValue());
+ assertEquals(2, list1[1].intValue());
+ assertEquals(3, list1[2].intValue());
+ // validate reference
+ if (firstList1 == null) {
+ firstList1 = list1;
+ } else {
+ // repeated list must be different instances
+ assertTrue(firstList1 != list1);
+ }
+
+ // check second list field (always [7, 7, 7])
+ assertNotNull(row.getField(6));
+ assertTrue(row.getField(6) instanceof Integer[]);
+ Integer[] list2 = ((Integer[]) row.getField(6));
+ assertEquals(7, list2[0].intValue());
+ assertEquals(7, list2[1].intValue());
+ assertEquals(7, list2[2].intValue());
+
+ // check third list field (always null)
+ assertNull(row.getField(7));
+
+ // check first map field (always {2->"Hello", 4->"Hello})
+ assertNotNull(row.getField(8));
+ assertTrue(row.getField(8) instanceof HashMap);
+ HashMap<Integer, String> map = (HashMap<Integer, String>) row.getField(8);
+ assertEquals(2, map.size());
+ assertEquals("Hello", map.get(2));
+ assertEquals("Hello", map.get(4));
+ // validate reference
+ if (firstMap1 == null) {
+ firstMap1 = map;
+ } else {
+ // repeated list must be different instances
+ assertTrue(firstMap1 != map);
+ }
+
+ // check second map field (always null)
+ assertNull(row.getField(9));
+ }
+
+ rowOrcInputFormat.close();
+ rowOrcInputFormat.closeInputFormat();
+
+ assertEquals(256, cnt);
+ }
+
+ @Test
+ public void testReadWithProjection() throws IOException {
rowOrcInputFormat = new OrcRowInputFormat(getPath(TEST_FILE_NESTED), TEST_SCHEMA_NESTED, new Configuration());
rowOrcInputFormat.selectFields(7, 0, 10, 8);
@@ -691,7 +895,7 @@ public class OrcRowInputFormatTest {
}
@Test
- public void testReadFileInSplits() throws IOException{
+ public void testReadFileInSplits() throws IOException {
rowOrcInputFormat = new OrcRowInputFormat(getPath(TEST_FILE_FLAT), TEST_SCHEMA_FLAT, new Configuration());
rowOrcInputFormat.selectFields(0, 1);
@@ -717,7 +921,7 @@ public class OrcRowInputFormatTest {
}
@Test
- public void testReadFileWithFilter() throws IOException{
+ public void testReadFileWithFilter() throws IOException {
rowOrcInputFormat = new OrcRowInputFormat(getPath(TEST_FILE_FLAT), TEST_SCHEMA_FLAT, new Configuration());
rowOrcInputFormat.selectFields(0, 1);
@@ -751,7 +955,7 @@ public class OrcRowInputFormatTest {
}
@Test
- public void testReadFileWithEvolvedSchema() throws IOException{
+ public void testReadFileWithEvolvedSchema() throws IOException {
rowOrcInputFormat = new OrcRowInputFormat(
getPath(TEST_FILE_FLAT),
http://git-wip-us.apache.org/repos/asf/flink/blob/bcead3be/flink-connectors/flink-orc/src/test/java/org/apache/flink/orc/OrcUtilsTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-orc/src/test/java/org/apache/flink/orc/OrcUtilsTest.java b/flink-connectors/flink-orc/src/test/java/org/apache/flink/orc/OrcUtilsTest.java
deleted file mode 100644
index 2cb1715..0000000
--- a/flink-connectors/flink-orc/src/test/java/org/apache/flink/orc/OrcUtilsTest.java
+++ /dev/null
@@ -1,148 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.orc;
-
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
-import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.common.typeinfo.Types;
-import org.apache.flink.api.java.typeutils.MapTypeInfo;
-import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo;
-import org.apache.flink.api.java.typeutils.RowTypeInfo;
-
-import org.apache.orc.TypeDescription;
-import org.junit.Assert;
-import org.junit.Test;
-
-/**
- * Unit tests for {@link OrcUtils}.
- *
- */
-public class OrcUtilsTest {
-
- @Test
- public void testFlatSchemaToTypeInfo1() {
-
- String schema =
- "struct<" +
- "boolean1:boolean," +
- "byte1:tinyint," +
- "short1:smallint," +
- "int1:int," +
- "long1:bigint," +
- "float1:float," +
- "double1:double," +
- "bytes1:binary," +
- "string1:string," +
- "date1:date," +
- "timestamp1:timestamp," +
- "decimal1:decimal(5,2)" +
- ">";
- TypeInformation typeInfo = OrcUtils.schemaToTypeInfo(TypeDescription.fromString(schema));
-
- Assert.assertNotNull(typeInfo);
- Assert.assertTrue(typeInfo instanceof RowTypeInfo);
- RowTypeInfo rowTypeInfo = (RowTypeInfo) typeInfo;
-
- // validate field types
- Assert.assertArrayEquals(
- new TypeInformation[]{
- Types.BOOLEAN, Types.BYTE, Types.SHORT, Types.INT, Types.LONG, Types.FLOAT, Types.DOUBLE,
- PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO, Types.STRING,
- Types.SQL_DATE, Types.SQL_TIMESTAMP, BasicTypeInfo.BIG_DEC_TYPE_INFO
- },
- rowTypeInfo.getFieldTypes());
-
- // validate field names
- Assert.assertArrayEquals(
- new String[] {
- "boolean1", "byte1", "short1", "int1", "long1", "float1", "double1",
- "bytes1", "string1", "date1", "timestamp1", "decimal1"
- },
- rowTypeInfo.getFieldNames());
-
- }
-
- @Test
- public void testNestedSchemaToTypeInfo1() {
-
- String schema =
- "struct<" +
- "middle:struct<" +
- "list:array<" +
- "struct<" +
- "int1:int," +
- "string1:string" +
- ">" +
- ">" +
- ">," +
- "list:array<" +
- "struct<" +
- "int1:int," +
- "string1:string" +
- ">" +
- ">," +
- "map:map<" +
- "string," +
- "struct<" +
- "int1:int," +
- "string1:string" +
- ">" +
- ">" +
- ">";
- TypeInformation typeInfo = OrcUtils.schemaToTypeInfo(TypeDescription.fromString(schema));
-
- Assert.assertNotNull(typeInfo);
- Assert.assertTrue(typeInfo instanceof RowTypeInfo);
- RowTypeInfo rowTypeInfo = (RowTypeInfo) typeInfo;
-
- // validate field types
- Assert.assertArrayEquals(
- new TypeInformation[]{
- Types.ROW_NAMED(
- new String[]{"list"},
- ObjectArrayTypeInfo.getInfoFor(
- Types.ROW_NAMED(
- new String[]{"int1", "string1"},
- Types.INT, Types.STRING
- )
- )
- ),
- ObjectArrayTypeInfo.getInfoFor(
- Types.ROW_NAMED(
- new String[]{"int1", "string1"},
- Types.INT, Types.STRING
- )
- ),
- new MapTypeInfo<>(
- Types.STRING,
- Types.ROW_NAMED(
- new String[]{"int1", "string1"},
- Types.INT, Types.STRING
- )
- )
- },
- rowTypeInfo.getFieldTypes());
-
- // validate field names
- Assert.assertArrayEquals(
- new String[] {"middle", "list", "map"},
- rowTypeInfo.getFieldNames());
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/bcead3be/flink-connectors/flink-orc/src/test/java/org/apache/flink/orc/util/OrcTestFileGenerator.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-orc/src/test/java/org/apache/flink/orc/util/OrcTestFileGenerator.java b/flink-connectors/flink-orc/src/test/java/org/apache/flink/orc/util/OrcTestFileGenerator.java
new file mode 100644
index 0000000..9d3be63
--- /dev/null
+++ b/flink-connectors/flink-orc/src/test/java/org/apache/flink/orc/util/OrcTestFileGenerator.java
@@ -0,0 +1,373 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.orc.util;
+
+import org.apache.flink.orc.OrcRowInputFormatTest;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.ListColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.MapColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.StructColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.apache.orc.OrcFile;
+import org.apache.orc.TypeDescription;
+import org.apache.orc.Writer;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+
+/**
+ * A generator for ORC test files.
+ */
+public class OrcTestFileGenerator {
+
+ public static void main(String[] args) throws IOException {
+ writeCompositeTypesWithNullsFile(args[0]);
+// writeCompositeTypesWithRepeatingFile(args[0]);
+ }
+
+ /**
+ * Writes an ORC file with nested composite types and null values on different levels.
+ * Generates {@link OrcRowInputFormatTest#TEST_FILE_COMPOSITES_NULLS}.
+ */
+ private static void writeCompositeTypesWithNullsFile(String path) throws IOException {
+
+ Path filePath = new Path(path);
+ Configuration conf = new Configuration();
+
+ TypeDescription schema =
+ TypeDescription.fromString(
+ "struct<" +
+ "int1:int," +
+ "record1:struct<f1:int,f2:string>," +
+ "list1:array<array<array<struct<f1:string,f2:string>>>>," +
+ "list2:array<map<string,int>>" +
+ ">");
+
+ Writer writer =
+ OrcFile.createWriter(filePath,
+ OrcFile.writerOptions(conf).setSchema(schema));
+
+ VectorizedRowBatch batch = schema.createRowBatch();
+ LongColumnVector int1 = (LongColumnVector) batch.cols[0];
+
+ StructColumnVector record1 = (StructColumnVector) batch.cols[1];
+ LongColumnVector record1F1 = (LongColumnVector) record1.fields[0];
+ BytesColumnVector record1F2 = (BytesColumnVector) record1.fields[1];
+
+ ListColumnVector list1 = (ListColumnVector) batch.cols[2];
+ ListColumnVector nestedList = (ListColumnVector) list1.child;
+ ListColumnVector nestedList2 = (ListColumnVector) nestedList.child;
+ StructColumnVector listEntries = (StructColumnVector) nestedList2.child;
+ BytesColumnVector entryField1 = (BytesColumnVector) listEntries.fields[0];
+ BytesColumnVector entryField2 = (BytesColumnVector) listEntries.fields[1];
+
+ ListColumnVector list2 = (ListColumnVector) batch.cols[3];
+ MapColumnVector map1 = (MapColumnVector) list2.child;
+ BytesColumnVector keys = (BytesColumnVector) map1.keys;
+ LongColumnVector vals = (LongColumnVector) map1.values;
+
+ final int list1Size = 4;
+ final int nestedListSize = 3;
+ final int nestedList2Size = 2;
+ final int list2Size = 3;
+ final int mapSize = 3;
+
+ final int batchSize = batch.getMaxSize();
+
+ // Ensure the vectors have sufficient capacity
+ nestedList.ensureSize(batchSize * list1Size, false);
+ nestedList2.ensureSize(batchSize * list1Size * nestedListSize, false);
+ listEntries.ensureSize(batchSize * list1Size * nestedListSize * nestedList2Size, false);
+ map1.ensureSize(batchSize * list2Size, false);
+ keys.ensureSize(batchSize * list2Size * mapSize, false);
+ vals.ensureSize(batchSize * list2Size * mapSize, false);
+
+ // add 2500 rows to file
+ for (int r = 0; r < 2500; ++r) {
+ int row = batch.size++;
+
+ // mark nullable fields
+ list1.noNulls = false;
+ nestedList.noNulls = false;
+ listEntries.noNulls = false;
+ entryField1.noNulls = false;
+ record1.noNulls = false;
+ record1F2.noNulls = false;
+ list2.noNulls = false;
+ map1.noNulls = false;
+ keys.noNulls = false;
+ vals.noNulls = false;
+
+ // first field: int1
+ int1.vector[row] = r;
+
+ // second field: struct
+ if (row % 2 != 0) {
+ // in every second row, the struct is null
+ record1F1.vector[row] = row;
+ if (row % 5 != 0) {
+ // in every fifth row, the second field of the struct is null
+ record1F2.setVal(row, ("f2-" + row).getBytes(StandardCharsets.UTF_8));
+ } else {
+ record1F2.isNull[row] = true;
+ }
+ } else {
+ record1.isNull[row] = true;
+ }
+
+ // third field: deeply nested list
+ if (row % 3 != 0) {
+ // in every third row, the nested list is null
+ list1.offsets[row] = list1.childCount;
+ list1.lengths[row] = list1Size;
+ list1.childCount += list1Size;
+
+ for (int i = 0; i < list1Size; i++) {
+
+ int listOffset = (int) list1.offsets[row] + i;
+ if (i != 2) {
+ // second nested list is always null
+ nestedList.offsets[listOffset] = nestedList.childCount;
+ nestedList.lengths[listOffset] = nestedListSize;
+ nestedList.childCount += nestedListSize;
+
+ for (int j = 0; j < nestedListSize; j++) {
+ int nestedOffset = (int) nestedList.offsets[listOffset] + j;
+ nestedList2.offsets[nestedOffset] = nestedList2.childCount;
+ nestedList2.lengths[nestedOffset] = nestedList2Size;
+ nestedList2.childCount += nestedList2Size;
+
+ for (int k = 0; k < nestedList2Size; k++) {
+ int nestedOffset2 = (int) nestedList2.offsets[nestedOffset] + k;
+ // list entries
+ if (k != 1) {
+ // second struct is always null
+ if (k != 0) {
+ // first struct field in first struct is always null
+ entryField1.setVal(nestedOffset2, ("f1-" + k).getBytes(StandardCharsets.UTF_8));
+ } else {
+ entryField1.isNull[nestedOffset2] = true;
+ }
+ entryField2.setVal(nestedOffset2, ("f2-" + k).getBytes(StandardCharsets.UTF_8));
+ } else {
+ listEntries.isNull[nestedOffset2] = true;
+ }
+ }
+ }
+ } else {
+ nestedList.isNull[listOffset] = true;
+ }
+ }
+ } else {
+ list1.isNull[row] = true;
+ }
+
+ // forth field: map in list
+ if (row % 3 != 0) {
+ // in every third row, the map list is null
+ list2.offsets[row] = list2.childCount;
+ list2.lengths[row] = list2Size;
+ list2.childCount += list2Size;
+
+ for (int i = 0; i < list2Size; i++) {
+ int mapOffset = (int) list2.offsets[row] + i;
+
+ if (i != 2) {
+ // second map list entry is always null
+ map1.offsets[mapOffset] = map1.childCount;
+ map1.lengths[mapOffset] = mapSize;
+ map1.childCount += mapSize;
+
+ for (int j = 0; j < mapSize; j++) {
+ int mapEntryOffset = (int) map1.offsets[mapOffset] + j;
+
+ if (j != 1) {
+ // key in second map entry is always null
+ keys.setVal(mapEntryOffset, ("key-" + row + "-" + j).getBytes(StandardCharsets.UTF_8));
+ } else {
+ keys.isNull[mapEntryOffset] = true;
+ }
+ if (j != 2) {
+ // value in third map entry is always null
+ vals.vector[mapEntryOffset] = row + i + j;
+ } else {
+ vals.isNull[mapEntryOffset] = true;
+ }
+ }
+ } else {
+ map1.isNull[mapOffset] = true;
+ }
+ }
+ } else {
+ list2.isNull[row] = true;
+ }
+
+ if (row == batchSize - 1) {
+ writer.addRowBatch(batch);
+ batch.reset();
+ }
+ }
+ if (batch.size != 0) {
+ writer.addRowBatch(batch);
+ batch.reset();
+ }
+ writer.close();
+ }
+
+ /**
+ * Writes an ORC file with nested composite types and repeated values.
+ * Generates {@link OrcRowInputFormatTest#TEST_FILE_REPEATING}.
+ */
+ private static void writeCompositeTypesWithRepeatingFile(String path) throws IOException {
+
+ Path filePath = new Path(path);
+ Configuration conf = new Configuration();
+
+ TypeDescription schema =
+ TypeDescription.fromString(
+ "struct<" +
+ "int1:int," +
+ "int2:int," +
+ "int3:int," +
+ "record1:struct<f1:int,f2:string>," +
+ "record2:struct<f1:int,f2:string>," +
+ "list1:array<int>," +
+ "list2:array<int>," +
+ "list3:array<int>," +
+ "map1:map<int,string>," +
+ "map2:map<int,string>" +
+ ">");
+
+ Writer writer =
+ OrcFile.createWriter(filePath,
+ OrcFile.writerOptions(conf).setSchema(schema));
+
+ VectorizedRowBatch batch = schema.createRowBatch();
+
+ LongColumnVector int1 = (LongColumnVector) batch.cols[0];
+ LongColumnVector int2 = (LongColumnVector) batch.cols[1];
+ LongColumnVector int3 = (LongColumnVector) batch.cols[2];
+
+ StructColumnVector record1 = (StructColumnVector) batch.cols[3];
+ LongColumnVector record1F1 = (LongColumnVector) record1.fields[0];
+ BytesColumnVector record1F2 = (BytesColumnVector) record1.fields[1];
+ StructColumnVector record2 = (StructColumnVector) batch.cols[4];
+
+ ListColumnVector list1 = (ListColumnVector) batch.cols[5];
+ LongColumnVector list1int = (LongColumnVector) list1.child;
+ ListColumnVector list2 = (ListColumnVector) batch.cols[6];
+ LongColumnVector list2int = (LongColumnVector) list2.child;
+ ListColumnVector list3 = (ListColumnVector) batch.cols[7];
+
+ MapColumnVector map1 = (MapColumnVector) batch.cols[8];
+ LongColumnVector map1keys = (LongColumnVector) map1.keys;
+ BytesColumnVector map1vals = (BytesColumnVector) map1.values;
+ MapColumnVector map2 = (MapColumnVector) batch.cols[9];
+
+ final int listSize = 3;
+ final int mapSize = 2;
+
+ final int batchSize = batch.getMaxSize();
+
+ // Ensure the vectors have sufficient capacity
+ list1int.ensureSize(batchSize * listSize, false);
+ list2int.ensureSize(batchSize * listSize, false);
+ map1keys.ensureSize(batchSize * mapSize, false);
+ map1vals.ensureSize(batchSize * mapSize, false);
+
+ // int1: all values are 42
+ int1.noNulls = true;
+ int1.setRepeating(true);
+ int1.vector[0] = 42;
+
+ // int2: all values are null
+ int2.noNulls = false;
+ int2.setRepeating(true);
+ int2.isNull[0] = true;
+
+ // int3: all values are 99
+ int3.noNulls = false;
+ int3.setRepeating(true);
+ int3.isNull[0] = false;
+ int3.vector[0] = 99;
+
+ // record1: all records are [23, "Hello"]
+ record1.noNulls = true;
+ record1.setRepeating(true);
+ for (int i = 0; i < batchSize; i++) {
+ record1F1.vector[i] = i + 23;
+ }
+ record1F2.noNulls = false;
+ record1F2.isNull[0] = true;
+
+ // record2: all records are null
+ record2.noNulls = false;
+ record2.setRepeating(true);
+ record2.isNull[0] = true;
+
+ // list1: all lists are [1, 2, 3]
+ list1.noNulls = true;
+ list1.setRepeating(true);
+ list1.lengths[0] = listSize;
+ list1.offsets[0] = 1;
+ for (int i = 0; i < batchSize * listSize; i++) {
+ list1int.vector[i] = i;
+ }
+
+ // list2: all lists are [7, 7, 7]
+ list2.noNulls = true;
+ list2.setRepeating(true);
+ list2.lengths[0] = listSize;
+ list2.offsets[0] = 0;
+ list2int.setRepeating(true);
+ list2int.vector[0] = 7;
+
+ // list3: all lists are null
+ list3.noNulls = false;
+ list3.setRepeating(true);
+ list3.isNull[0] = true;
+
+ // map1: all maps are [2 -> "HELLO", 4 -> "HELLO"]
+ map1.noNulls = true;
+ map1.setRepeating(true);
+ map1.lengths[0] = mapSize;
+ map1.offsets[0] = 1;
+ for (int i = 0; i < batchSize * mapSize; i++) {
+ map1keys.vector[i] = i * 2;
+ }
+ map1vals.setRepeating(true);
+ map1vals.setVal(0, "Hello".getBytes(StandardCharsets.UTF_8));
+
+ // map2: all maps are null
+ map2.noNulls = false;
+ map2.setRepeating(true);
+ map2.isNull[0] = true;
+
+ batch.size = 256;
+
+ writer.addRowBatch(batch);
+ batch.reset();
+ writer.close();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/bcead3be/flink-connectors/flink-orc/src/test/resources/test-data-composites-with-nulls.orc
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-orc/src/test/resources/test-data-composites-with-nulls.orc b/flink-connectors/flink-orc/src/test/resources/test-data-composites-with-nulls.orc
new file mode 100644
index 0000000..eed1c55
Binary files /dev/null and b/flink-connectors/flink-orc/src/test/resources/test-data-composites-with-nulls.orc differ
http://git-wip-us.apache.org/repos/asf/flink/blob/bcead3be/flink-connectors/flink-orc/src/test/resources/test-data-repeating.orc
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-orc/src/test/resources/test-data-repeating.orc b/flink-connectors/flink-orc/src/test/resources/test-data-repeating.orc
new file mode 100644
index 0000000..ff2c917
Binary files /dev/null and b/flink-connectors/flink-orc/src/test/resources/test-data-repeating.orc differ
[2/2] flink git commit: [FLINK-8230] [orc] Fix NPEs when reading
nested columns.
Posted by tw...@apache.org.
[FLINK-8230] [orc] Fix NPEs when reading nested columns.
- fixes NPEs for null-valued structs, lists, and maps
- fixes NPEs for repeating structs, lists, and maps
- adds test for deeply nested data with nulls
- adds test for columns with repeating values
This closes #5373.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/bcead3be
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/bcead3be
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/bcead3be
Branch: refs/heads/master
Commit: bcead3be32c624008730555d828fd8e9447fbeff
Parents: 3cfc5ae
Author: Fabian Hueske <fh...@apache.org>
Authored: Tue Dec 12 12:09:14 2017 +0100
Committer: twalthr <tw...@apache.org>
Committed: Wed Jan 31 14:44:23 2018 +0100
----------------------------------------------------------------------
.../org/apache/flink/orc/OrcBatchReader.java | 1402 ++++++++++++++++
.../org/apache/flink/orc/OrcRowInputFormat.java | 4 +-
.../org/apache/flink/orc/OrcTableSource.java | 2 +-
.../java/org/apache/flink/orc/OrcUtils.java | 1508 ------------------
.../apache/flink/orc/OrcBatchReaderTest.java | 148 ++
.../apache/flink/orc/OrcRowInputFormatTest.java | 218 ++-
.../java/org/apache/flink/orc/OrcUtilsTest.java | 148 --
.../flink/orc/util/OrcTestFileGenerator.java | 373 +++++
.../test-data-composites-with-nulls.orc | Bin 0 -> 10597 bytes
.../src/test/resources/test-data-repeating.orc | Bin 0 -> 1203 bytes
10 files changed, 2137 insertions(+), 1666 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/bcead3be/flink-connectors/flink-orc/src/main/java/org/apache/flink/orc/OrcBatchReader.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-orc/src/main/java/org/apache/flink/orc/OrcBatchReader.java b/flink-connectors/flink-orc/src/main/java/org/apache/flink/orc/OrcBatchReader.java
new file mode 100644
index 0000000..3ecdeb3
--- /dev/null
+++ b/flink-connectors/flink-orc/src/main/java/org/apache/flink/orc/OrcBatchReader.java
@@ -0,0 +1,1402 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.orc;
+
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
+import org.apache.flink.api.common.typeinfo.SqlTimeTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.MapTypeInfo;
+import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.types.Row;
+
+import org.apache.hadoop.hive.common.type.HiveDecimal;
+import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.DecimalColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.ListColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.MapColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.StructColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.TimestampColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable;
+import org.apache.orc.TypeDescription;
+
+import java.lang.reflect.Array;
+import java.math.BigDecimal;
+import java.nio.charset.StandardCharsets;
+import java.sql.Date;
+import java.sql.Timestamp;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.TimeZone;
+import java.util.function.DoubleFunction;
+import java.util.function.Function;
+import java.util.function.LongFunction;
+
+/**
+ * A class that provides utility methods for orc file reading.
+ */
+class OrcBatchReader {
+
+ private static final long MILLIS_PER_DAY = 86400000; // = 24 * 60 * 60 * 1000
+ private static final TimeZone LOCAL_TZ = TimeZone.getDefault();
+
+ /**
+ * Converts an ORC schema to a Flink TypeInformation.
+ *
+ * @param schema The ORC schema.
+ * @return The TypeInformation that corresponds to the ORC schema.
+ */
+ static TypeInformation schemaToTypeInfo(TypeDescription schema) {
+ switch (schema.getCategory()) {
+ case BOOLEAN:
+ return BasicTypeInfo.BOOLEAN_TYPE_INFO;
+ case BYTE:
+ return BasicTypeInfo.BYTE_TYPE_INFO;
+ case SHORT:
+ return BasicTypeInfo.SHORT_TYPE_INFO;
+ case INT:
+ return BasicTypeInfo.INT_TYPE_INFO;
+ case LONG:
+ return BasicTypeInfo.LONG_TYPE_INFO;
+ case FLOAT:
+ return BasicTypeInfo.FLOAT_TYPE_INFO;
+ case DOUBLE:
+ return BasicTypeInfo.DOUBLE_TYPE_INFO;
+ case DECIMAL:
+ return BasicTypeInfo.BIG_DEC_TYPE_INFO;
+ case STRING:
+ case CHAR:
+ case VARCHAR:
+ return BasicTypeInfo.STRING_TYPE_INFO;
+ case DATE:
+ return SqlTimeTypeInfo.DATE;
+ case TIMESTAMP:
+ return SqlTimeTypeInfo.TIMESTAMP;
+ case BINARY:
+ return PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO;
+ case STRUCT:
+ List<TypeDescription> fieldSchemas = schema.getChildren();
+ TypeInformation[] fieldTypes = new TypeInformation[fieldSchemas.size()];
+ for (int i = 0; i < fieldSchemas.size(); i++) {
+ fieldTypes[i] = schemaToTypeInfo(fieldSchemas.get(i));
+ }
+ String[] fieldNames = schema.getFieldNames().toArray(new String[]{});
+ return new RowTypeInfo(fieldTypes, fieldNames);
+ case LIST:
+ TypeDescription elementSchema = schema.getChildren().get(0);
+ TypeInformation<?> elementType = schemaToTypeInfo(elementSchema);
+ // arrays of primitive types are handled as object arrays to support null values
+ return ObjectArrayTypeInfo.getInfoFor(elementType);
+ case MAP:
+ TypeDescription keySchema = schema.getChildren().get(0);
+ TypeDescription valSchema = schema.getChildren().get(1);
+ TypeInformation<?> keyType = schemaToTypeInfo(keySchema);
+ TypeInformation<?> valType = schemaToTypeInfo(valSchema);
+ return new MapTypeInfo<>(keyType, valType);
+ case UNION:
+ throw new UnsupportedOperationException("UNION type is not supported yet.");
+ default:
+ throw new IllegalArgumentException("Unknown type " + schema);
+ }
+ }
+
+ /**
+ * Fills an ORC batch into an array of Row.
+ *
+ * @param rows The batch of rows need to be filled.
+ * @param schema The schema of the ORC data.
+ * @param batch The ORC data.
+ * @param selectedFields The list of selected ORC fields.
+ * @return The number of rows that were filled.
+ */
+ static int fillRows(Row[] rows, TypeDescription schema, VectorizedRowBatch batch, int[] selectedFields) {
+
+ int rowsToRead = Math.min((int) batch.count(), rows.length);
+
+ List<TypeDescription> fieldTypes = schema.getChildren();
+ // read each selected field
+ for (int fieldIdx = 0; fieldIdx < selectedFields.length; fieldIdx++) {
+ int orcIdx = selectedFields[fieldIdx];
+ readField(rows, fieldIdx, fieldTypes.get(orcIdx), batch.cols[orcIdx], rowsToRead);
+ }
+ return rowsToRead;
+ }
+
+ /**
+ * Reads a vector of data into an array of objects.
+ *
+ * @param vals The array that needs to be filled.
+ * @param fieldIdx If the vals array is an array of Row, the index of the field that needs to be filled.
+ * Otherwise a -1 must be passed and the data is directly filled into the array.
+ * @param schema The schema of the vector to read.
+ * @param vector The vector to read.
+ * @param childCount The number of vector entries to read.
+ */
+ private static void readField(Object[] vals, int fieldIdx, TypeDescription schema, ColumnVector vector, int childCount) {
+
+ // check the type of the vector to decide how to read it.
+ switch (schema.getCategory()) {
+ case BOOLEAN:
+ if (vector.noNulls) {
+ readNonNullLongColumn(vals, fieldIdx, (LongColumnVector) vector, childCount, OrcBatchReader::readBoolean);
+ } else {
+ readLongColumn(vals, fieldIdx, (LongColumnVector) vector, childCount, OrcBatchReader::readBoolean);
+ }
+ break;
+ case BYTE:
+ if (vector.noNulls) {
+ readNonNullLongColumn(vals, fieldIdx, (LongColumnVector) vector, childCount, OrcBatchReader::readByte);
+ } else {
+ readLongColumn(vals, fieldIdx, (LongColumnVector) vector, childCount, OrcBatchReader::readByte);
+ }
+ break;
+ case SHORT:
+ if (vector.noNulls) {
+ readNonNullLongColumn(vals, fieldIdx, (LongColumnVector) vector, childCount, OrcBatchReader::readShort);
+ } else {
+ readLongColumn(vals, fieldIdx, (LongColumnVector) vector, childCount, OrcBatchReader::readShort);
+ }
+ break;
+ case INT:
+ if (vector.noNulls) {
+ readNonNullLongColumn(vals, fieldIdx, (LongColumnVector) vector, childCount, OrcBatchReader::readInt);
+ } else {
+ readLongColumn(vals, fieldIdx, (LongColumnVector) vector, childCount, OrcBatchReader::readInt);
+ }
+ break;
+ case LONG:
+ if (vector.noNulls) {
+ readNonNullLongColumn(vals, fieldIdx, (LongColumnVector) vector, childCount, OrcBatchReader::readLong);
+ } else {
+ readLongColumn(vals, fieldIdx, (LongColumnVector) vector, childCount, OrcBatchReader::readLong);
+ }
+ break;
+ case FLOAT:
+ if (vector.noNulls) {
+ readNonNullDoubleColumn(vals, fieldIdx, (DoubleColumnVector) vector, childCount, OrcBatchReader::readFloat);
+ } else {
+ readDoubleColumn(vals, fieldIdx, (DoubleColumnVector) vector, childCount, OrcBatchReader::readFloat);
+ }
+ break;
+ case DOUBLE:
+ if (vector.noNulls) {
+ readNonNullDoubleColumn(vals, fieldIdx, (DoubleColumnVector) vector, childCount, OrcBatchReader::readDouble);
+ } else {
+ readDoubleColumn(vals, fieldIdx, (DoubleColumnVector) vector, childCount, OrcBatchReader::readDouble);
+ }
+ break;
+ case CHAR:
+ case VARCHAR:
+ case STRING:
+ if (vector.noNulls) {
+ readNonNullBytesColumnAsString(vals, fieldIdx, (BytesColumnVector) vector, childCount);
+ } else {
+ readBytesColumnAsString(vals, fieldIdx, (BytesColumnVector) vector, childCount);
+ }
+ break;
+ case DATE:
+ if (vector.noNulls) {
+ readNonNullLongColumnAsDate(vals, fieldIdx, (LongColumnVector) vector, childCount);
+ } else {
+ readLongColumnAsDate(vals, fieldIdx, (LongColumnVector) vector, childCount);
+ }
+ break;
+ case TIMESTAMP:
+ if (vector.noNulls) {
+ readNonNullTimestampColumn(vals, fieldIdx, (TimestampColumnVector) vector, childCount);
+ } else {
+ readTimestampColumn(vals, fieldIdx, (TimestampColumnVector) vector, childCount);
+ }
+ break;
+ case BINARY:
+ if (vector.noNulls) {
+ readNonNullBytesColumnAsBinary(vals, fieldIdx, (BytesColumnVector) vector, childCount);
+ } else {
+ readBytesColumnAsBinary(vals, fieldIdx, (BytesColumnVector) vector, childCount);
+ }
+ break;
+ case DECIMAL:
+ if (vector.noNulls) {
+ readNonNullDecimalColumn(vals, fieldIdx, (DecimalColumnVector) vector, childCount);
+ } else {
+ readDecimalColumn(vals, fieldIdx, (DecimalColumnVector) vector, childCount);
+ }
+ break;
+ case STRUCT:
+ if (vector.noNulls) {
+ readNonNullStructColumn(vals, fieldIdx, (StructColumnVector) vector, schema, childCount);
+ } else {
+ readStructColumn(vals, fieldIdx, (StructColumnVector) vector, schema, childCount);
+ }
+ break;
+ case LIST:
+ if (vector.noNulls) {
+ readNonNullListColumn(vals, fieldIdx, (ListColumnVector) vector, schema, childCount);
+ } else {
+ readListColumn(vals, fieldIdx, (ListColumnVector) vector, schema, childCount);
+ }
+ break;
+ case MAP:
+ if (vector.noNulls) {
+ readNonNullMapColumn(vals, fieldIdx, (MapColumnVector) vector, schema, childCount);
+ } else {
+ readMapColumn(vals, fieldIdx, (MapColumnVector) vector, schema, childCount);
+ }
+ break;
+ case UNION:
+ throw new UnsupportedOperationException("UNION type not supported yet");
+ default:
+ throw new IllegalArgumentException("Unknown type " + schema);
+ }
+ }
+
+ private static <T> void readNonNullLongColumn(Object[] vals, int fieldIdx, LongColumnVector vector,
+ int childCount, LongFunction<T> reader) {
+
+ if (vector.isRepeating) { // fill complete column with first value
+ T repeatingValue = reader.apply(vector.vector[0]);
+ fillColumnWithRepeatingValue(vals, fieldIdx, repeatingValue, childCount);
+ } else {
+ if (fieldIdx == -1) { // set as an object
+ for (int i = 0; i < childCount; i++) {
+ vals[i] = reader.apply(vector.vector[i]);
+ }
+ } else { // set as a field of Row
+ Row[] rows = (Row[]) vals;
+ for (int i = 0; i < childCount; i++) {
+ rows[i].setField(fieldIdx, reader.apply(vector.vector[i]));
+ }
+ }
+ }
+ }
+
+ private static <T> void readNonNullDoubleColumn(Object[] vals, int fieldIdx, DoubleColumnVector vector,
+ int childCount, DoubleFunction<T> reader) {
+
+ if (vector.isRepeating) { // fill complete column with first value
+ T repeatingValue = reader.apply(vector.vector[0]);
+ fillColumnWithRepeatingValue(vals, fieldIdx, repeatingValue, childCount);
+ } else {
+ if (fieldIdx == -1) { // set as an object
+ for (int i = 0; i < childCount; i++) {
+ vals[i] = reader.apply(vector.vector[i]);
+ }
+ } else { // set as a field of Row
+ Row[] rows = (Row[]) vals;
+ for (int i = 0; i < childCount; i++) {
+ rows[i].setField(fieldIdx, reader.apply(vector.vector[i]));
+ }
+ }
+ }
+ }
+
+ private static void readNonNullBytesColumnAsString(Object[] vals, int fieldIdx, BytesColumnVector bytes, int childCount) {
+ if (bytes.isRepeating) { // fill complete column with first value
+ String repeatingValue = readString(bytes.vector[0], bytes.start[0], bytes.length[0]);
+ fillColumnWithRepeatingValue(vals, fieldIdx, repeatingValue, childCount);
+ } else {
+ if (fieldIdx == -1) { // set as an object
+ for (int i = 0; i < childCount; i++) {
+ vals[i] = readString(bytes.vector[i], bytes.start[i], bytes.length[i]);
+ }
+ } else { // set as a field of Row
+ Row[] rows = (Row[]) vals;
+ for (int i = 0; i < childCount; i++) {
+ rows[i].setField(fieldIdx, readString(bytes.vector[i], bytes.start[i], bytes.length[i]));
+ }
+ }
+ }
+ }
+
+ private static void readNonNullBytesColumnAsBinary(Object[] vals, int fieldIdx, BytesColumnVector bytes, int childCount) {
+ if (bytes.isRepeating) { // fill complete column with first value
+ if (fieldIdx == -1) { // set as an object
+ for (int i = 0; i < childCount; i++) {
+ // don't reuse repeating val to avoid object mutation
+ vals[i] = readBinary(bytes.vector[0], bytes.start[0], bytes.length[0]);
+ }
+ } else { // set as a field of Row
+ Row[] rows = (Row[]) vals;
+ for (int i = 0; i < childCount; i++) {
+ // don't reuse repeating val to avoid object mutation
+ rows[i].setField(fieldIdx, readBinary(bytes.vector[0], bytes.start[0], bytes.length[0]));
+ }
+ }
+ } else {
+ if (fieldIdx == -1) { // set as an object
+ for (int i = 0; i < childCount; i++) {
+ vals[i] = readBinary(bytes.vector[i], bytes.start[i], bytes.length[i]);
+ }
+ } else { // set as a field of Row
+ Row[] rows = (Row[]) vals;
+ for (int i = 0; i < childCount; i++) {
+ rows[i].setField(fieldIdx, readBinary(bytes.vector[i], bytes.start[i], bytes.length[i]));
+ }
+ }
+ }
+ }
+
+ private static void readNonNullLongColumnAsDate(Object[] vals, int fieldIdx, LongColumnVector vector, int childCount) {
+
+ if (vector.isRepeating) { // fill complete column with first value
+ if (fieldIdx == -1) { // set as an object
+ for (int i = 0; i < childCount; i++) {
+ // do not reuse repeated value due to mutability of Date
+ vals[i] = readDate(vector.vector[0]);
+ }
+ } else { // set as a field of Row
+ Row[] rows = (Row[]) vals;
+ for (int i = 0; i < childCount; i++) {
+ // do not reuse repeated value due to mutability of Date
+ rows[i].setField(fieldIdx, readDate(vector.vector[0]));
+ }
+ }
+ } else {
+ if (fieldIdx == -1) { // set as an object
+ for (int i = 0; i < childCount; i++) {
+ vals[i] = readDate(vector.vector[i]);
+ }
+ } else { // set as a field of Row
+ Row[] rows = (Row[]) vals;
+ for (int i = 0; i < childCount; i++) {
+ rows[i].setField(fieldIdx, readDate(vector.vector[i]));
+ }
+ }
+ }
+ }
+
+ private static void readNonNullTimestampColumn(Object[] vals, int fieldIdx, TimestampColumnVector vector, int childCount) {
+
+ if (vector.isRepeating) { // fill complete column with first value
+ if (fieldIdx == -1) { // set as an object
+ for (int i = 0; i < childCount; i++) {
+ // do not reuse value to prevent object mutation
+ vals[i] = readTimestamp(vector.time[0], vector.nanos[0]);
+ }
+ } else { // set as a field of Row
+ Row[] rows = (Row[]) vals;
+ for (int i = 0; i < childCount; i++) {
+ // do not reuse value to prevent object mutation
+ rows[i].setField(fieldIdx, readTimestamp(vector.time[0], vector.nanos[0]));
+ }
+ }
+ } else {
+ if (fieldIdx == -1) { // set as an object
+ for (int i = 0; i < childCount; i++) {
+ vals[i] = readTimestamp(vector.time[i], vector.nanos[i]);
+ }
+ } else { // set as a field of Row
+ Row[] rows = (Row[]) vals;
+ for (int i = 0; i < childCount; i++) {
+ rows[i].setField(fieldIdx, readTimestamp(vector.time[i], vector.nanos[i]));
+ }
+ }
+ }
+ }
+
+ private static void readNonNullDecimalColumn(Object[] vals, int fieldIdx, DecimalColumnVector vector, int childCount) {
+
+ if (vector.isRepeating) { // fill complete column with first value
+ fillColumnWithRepeatingValue(vals, fieldIdx, readBigDecimal(vector.vector[0]), childCount);
+ } else {
+ if (fieldIdx == -1) { // set as an object
+ for (int i = 0; i < childCount; i++) {
+ vals[i] = readBigDecimal(vector.vector[i]);
+ }
+ } else { // set as a field of Row
+ Row[] rows = (Row[]) vals;
+ for (int i = 0; i < childCount; i++) {
+ rows[i].setField(fieldIdx, readBigDecimal(vector.vector[i]));
+ }
+ }
+ }
+ }
+
+ private static void readNonNullStructColumn(Object[] vals, int fieldIdx, StructColumnVector structVector, TypeDescription schema, int childCount) {
+
+ List<TypeDescription> childrenTypes = schema.getChildren();
+
+ int numFields = childrenTypes.size();
+ // create a batch of Rows to read the structs
+ Row[] structs = new Row[childCount];
+ // TODO: possible improvement: reuse existing Row objects
+ for (int i = 0; i < childCount; i++) {
+ structs[i] = new Row(numFields);
+ }
+
+ // read struct fields
+ // we don't have to handle isRepeating because ORC assumes that it is propagated into the children.
+ for (int i = 0; i < numFields; i++) {
+ readField(structs, i, childrenTypes.get(i), structVector.fields[i], childCount);
+ }
+
+ if (fieldIdx == -1) { // set struct as an object
+ System.arraycopy(structs, 0, vals, 0, childCount);
+ } else { // set struct as a field of Row
+ Row[] rows = (Row[]) vals;
+ for (int i = 0; i < childCount; i++) {
+ rows[i].setField(fieldIdx, structs[i]);
+ }
+ }
+ }
+
+ private static void readNonNullListColumn(Object[] vals, int fieldIdx, ListColumnVector list, TypeDescription schema, int childCount) {
+
+ TypeDescription fieldType = schema.getChildren().get(0);
+ // get class of list elements
+ Class<?> classType = getClassForType(fieldType);
+
+ if (list.isRepeating) {
+
+ int offset = (int) list.offsets[0];
+ int length = (int) list.lengths[0];
+ // we only need to read until offset + length.
+ int entriesToRead = offset + length;
+
+ // read children
+ Object[] children = (Object[]) Array.newInstance(classType, entriesToRead);
+ readField(children, -1, fieldType, list.child, entriesToRead);
+
+ // get function to copy list
+ Function<Object, Object> copyList = getCopyFunction(schema);
+
+ // create first list that will be copied
+ Object[] first;
+ if (offset == 0) {
+ first = children;
+ } else {
+ first = (Object[]) Array.newInstance(classType, length);
+ System.arraycopy(children, offset, first, 0, length);
+ }
+
+ // create copies of first list and set copies as result
+ for (int i = 0; i < childCount; i++) {
+ Object[] copy = (Object[]) copyList.apply(first);
+ if (fieldIdx == -1) {
+ vals[i] = copy;
+ } else {
+ ((Row) vals[i]).setField(fieldIdx, copy);
+ }
+ }
+ } else {
+
+ // read children
+ Object[] children = (Object[]) Array.newInstance(classType, list.childCount);
+ readField(children, -1, fieldType, list.child, list.childCount);
+
+ // fill lists with children
+ for (int i = 0; i < childCount; i++) {
+ int offset = (int) list.offsets[i];
+ int length = (int) list.lengths[i];
+
+ Object[] temp = (Object[]) Array.newInstance(classType, length);
+ System.arraycopy(children, offset, temp, 0, length);
+ if (fieldIdx == -1) {
+ vals[i] = temp;
+ } else {
+ ((Row) vals[i]).setField(fieldIdx, temp);
+ }
+ }
+ }
+ }
+
+ private static void readNonNullMapColumn(Object[] vals, int fieldIdx, MapColumnVector mapsVector, TypeDescription schema, int childCount) {
+
+ List<TypeDescription> fieldType = schema.getChildren();
+ TypeDescription keyType = fieldType.get(0);
+ TypeDescription valueType = fieldType.get(1);
+
+ ColumnVector keys = mapsVector.keys;
+ ColumnVector values = mapsVector.values;
+
+ if (mapsVector.isRepeating) {
+ // first map is repeated
+
+ // get map copy function
+ Function<Object, Object> copyMap = getCopyFunction(schema);
+
+ // set all key and value entries except those of the first map to null
+ int offset = (int) mapsVector.offsets[0];
+ int length = (int) mapsVector.lengths[0];
+ // we only need to read until offset + length.
+ int entriesToRead = offset + length;
+
+ Object[] keyRows = new Object[entriesToRead];
+ Object[] valueRows = new Object[entriesToRead];
+
+ // read map keys and values
+ readField(keyRows, -1, keyType, keys, entriesToRead);
+ readField(valueRows, -1, valueType, values, entriesToRead);
+
+ // create first map that will be copied
+ HashMap map = readHashMap(keyRows, valueRows, offset, length);
+
+ // copy first map and set copy as result
+ for (int i = 0; i < childCount; i++) {
+ if (fieldIdx == -1) {
+ vals[i] = copyMap.apply(map);
+ } else {
+ ((Row) vals[i]).setField(fieldIdx, copyMap.apply(map));
+ }
+ }
+
+ } else {
+
+ Object[] keyRows = new Object[mapsVector.childCount];
+ Object[] valueRows = new Object[mapsVector.childCount];
+
+ // read map keys and values
+ readField(keyRows, -1, keyType, keys, keyRows.length);
+ readField(valueRows, -1, valueType, values, valueRows.length);
+
+ long[] lengthVectorMap = mapsVector.lengths;
+ int offset = 0;
+
+ for (int i = 0; i < childCount; i++) {
+ long numMapEntries = lengthVectorMap[i];
+ HashMap map = readHashMap(keyRows, valueRows, offset, numMapEntries);
+ offset += numMapEntries;
+
+ if (fieldIdx == -1) {
+ vals[i] = map;
+ } else {
+ ((Row) vals[i]).setField(fieldIdx, map);
+ }
+ }
+ }
+
+ }
+
+ private static <T> void readLongColumn(Object[] vals, int fieldIdx, LongColumnVector vector,
+ int childCount, LongFunction<T> reader) {
+
+ if (vector.isRepeating) { // fill complete column with first value
+ if (vector.isNull[0]) {
+ // fill vals with null values
+ fillColumnWithRepeatingValue(vals, fieldIdx, null, childCount);
+ } else {
+ // read repeating non-null value by forwarding call.
+ readNonNullLongColumn(vals, fieldIdx, vector, childCount, reader);
+ }
+ } else {
+ boolean[] isNullVector = vector.isNull;
+ if (fieldIdx == -1) { // set as an object
+ for (int i = 0; i < childCount; i++) {
+ if (isNullVector[i]) {
+ vals[i] = null;
+ } else {
+ vals[i] = reader.apply(vector.vector[i]);
+ }
+ }
+ } else { // set as a field of Row
+ Row[] rows = (Row[]) vals;
+ for (int i = 0; i < childCount; i++) {
+ if (isNullVector[i]) {
+ rows[i].setField(fieldIdx, null);
+ } else {
+ rows[i].setField(fieldIdx, reader.apply(vector.vector[i]));
+ }
+ }
+ }
+ }
+ }
+
+ private static <T> void readDoubleColumn(Object[] vals, int fieldIdx, DoubleColumnVector vector,
+ int childCount, DoubleFunction<T> reader) {
+
+ if (vector.isRepeating) { // fill complete column with first value
+ if (vector.isNull[0]) {
+ // fill vals with null values
+ fillColumnWithRepeatingValue(vals, fieldIdx, null, childCount);
+ } else {
+ // read repeating non-null value by forwarding call
+ readNonNullDoubleColumn(vals, fieldIdx, vector, childCount, reader);
+ }
+ } else {
+ boolean[] isNullVector = vector.isNull;
+ if (fieldIdx == -1) { // set as an object
+ for (int i = 0; i < childCount; i++) {
+ if (isNullVector[i]) {
+ vals[i] = null;
+ } else {
+ vals[i] = reader.apply(vector.vector[i]);
+ }
+ }
+ } else { // set as a field of Row
+ Row[] rows = (Row[]) vals;
+ for (int i = 0; i < childCount; i++) {
+ if (isNullVector[i]) {
+ rows[i].setField(fieldIdx, null);
+ } else {
+ rows[i].setField(fieldIdx, reader.apply(vector.vector[i]));
+ }
+ }
+ }
+ }
+ }
+
+ private static void readBytesColumnAsString(Object[] vals, int fieldIdx, BytesColumnVector bytes, int childCount) {
+
+ if (bytes.isRepeating) { // fill complete column with first value
+ if (bytes.isNull[0]) {
+ // fill vals with null values
+ fillColumnWithRepeatingValue(vals, fieldIdx, null, childCount);
+ } else {
+ // read repeating non-null value by forwarding call
+ readNonNullBytesColumnAsString(vals, fieldIdx, bytes, childCount);
+ }
+ } else {
+ boolean[] isNullVector = bytes.isNull;
+ if (fieldIdx == -1) { // set as an object
+ for (int i = 0; i < childCount; i++) {
+ if (isNullVector[i]) {
+ vals[i] = null;
+ } else {
+ vals[i] = readString(bytes.vector[i], bytes.start[i], bytes.length[i]);
+ }
+ }
+ } else { // set as a field of Row
+ Row[] rows = (Row[]) vals;
+ for (int i = 0; i < childCount; i++) {
+ if (isNullVector[i]) {
+ rows[i].setField(fieldIdx, null);
+ } else {
+ rows[i].setField(fieldIdx, readString(bytes.vector[i], bytes.start[i], bytes.length[i]));
+ }
+ }
+ }
+ }
+ }
+
+ private static void readBytesColumnAsBinary(Object[] vals, int fieldIdx, BytesColumnVector bytes, int childCount) {
+
+ if (bytes.isRepeating) { // fill complete column with first value
+ if (bytes.isNull[0]) {
+ // fill vals with null values
+ fillColumnWithRepeatingValue(vals, fieldIdx, null, childCount);
+ } else {
+ // read repeating non-null value by forwarding call
+ readNonNullBytesColumnAsBinary(vals, fieldIdx, bytes, childCount);
+ }
+ } else {
+ boolean[] isNullVector = bytes.isNull;
+ if (fieldIdx == -1) { // set as an object
+ for (int i = 0; i < childCount; i++) {
+ if (isNullVector[i]) {
+ vals[i] = null;
+ } else {
+ vals[i] = readBinary(bytes.vector[i], bytes.start[i], bytes.length[i]);
+ }
+ }
+ } else { // set as a field of Row
+ Row[] rows = (Row[]) vals;
+ for (int i = 0; i < childCount; i++) {
+ if (isNullVector[i]) {
+ rows[i].setField(fieldIdx, null);
+ } else {
+ rows[i].setField(fieldIdx, readBinary(bytes.vector[i], bytes.start[i], bytes.length[i]));
+ }
+ }
+ }
+ }
+ }
+
+ private static void readLongColumnAsDate(Object[] vals, int fieldIdx, LongColumnVector vector, int childCount) {
+
+ if (vector.isRepeating) { // fill complete column with first value
+ if (vector.isNull[0]) {
+ // fill vals with null values
+ fillColumnWithRepeatingValue(vals, fieldIdx, null, childCount);
+ } else {
+ // read repeating non-null value by forwarding call
+ readNonNullLongColumnAsDate(vals, fieldIdx, vector, childCount);
+ }
+ } else {
+ boolean[] isNullVector = vector.isNull;
+ if (fieldIdx == -1) { // set as an object
+ for (int i = 0; i < childCount; i++) {
+ if (isNullVector[i]) {
+ vals[i] = null;
+ } else {
+ vals[i] = readDate(vector.vector[i]);
+ }
+ }
+ } else { // set as a field of Row
+ Row[] rows = (Row[]) vals;
+ for (int i = 0; i < childCount; i++) {
+ if (isNullVector[i]) {
+ rows[i].setField(fieldIdx, null);
+ } else {
+ rows[i].setField(fieldIdx, readDate(vector.vector[i]));
+ }
+ }
+ }
+ }
+ }
+
+ private static void readTimestampColumn(Object[] vals, int fieldIdx, TimestampColumnVector vector, int childCount) {
+
+ if (vector.isRepeating) { // fill complete column with first value
+ if (vector.isNull[0]) {
+ // fill vals with null values
+ fillColumnWithRepeatingValue(vals, fieldIdx, null, childCount);
+ } else {
+ // read repeating non-null value by forwarding call
+ readNonNullTimestampColumn(vals, fieldIdx, vector, childCount);
+ }
+ } else {
+ boolean[] isNullVector = vector.isNull;
+ if (fieldIdx == -1) { // set as an object
+ for (int i = 0; i < childCount; i++) {
+ if (isNullVector[i]) {
+ vals[i] = null;
+ } else {
+ Timestamp ts = readTimestamp(vector.time[i], vector.nanos[i]);
+ vals[i] = ts;
+ }
+ }
+ } else { // set as a field of Row
+ Row[] rows = (Row[]) vals;
+ for (int i = 0; i < childCount; i++) {
+ if (isNullVector[i]) {
+ rows[i].setField(fieldIdx, null);
+ } else {
+ Timestamp ts = readTimestamp(vector.time[i], vector.nanos[i]);
+ rows[i].setField(fieldIdx, ts);
+ }
+ }
+ }
+ }
+ }
+
+ private static void readDecimalColumn(Object[] vals, int fieldIdx, DecimalColumnVector vector, int childCount) {
+
+ if (vector.isRepeating) { // fill complete column with first value
+ if (vector.isNull[0]) {
+ // fill vals with null values
+ fillColumnWithRepeatingValue(vals, fieldIdx, null, childCount);
+ } else {
+ // read repeating non-null value by forwarding call
+ readNonNullDecimalColumn(vals, fieldIdx, vector, childCount);
+ }
+ } else {
+ boolean[] isNullVector = vector.isNull;
+ if (fieldIdx == -1) { // set as an object
+ for (int i = 0; i < childCount; i++) {
+ if (isNullVector[i]) {
+ vals[i] = null;
+ } else {
+ vals[i] = readBigDecimal(vector.vector[i]);
+ }
+ }
+ } else { // set as a field of Row
+ Row[] rows = (Row[]) vals;
+ for (int i = 0; i < childCount; i++) {
+ if (isNullVector[i]) {
+ rows[i].setField(fieldIdx, null);
+ } else {
+ rows[i].setField(fieldIdx, readBigDecimal(vector.vector[i]));
+ }
+ }
+ }
+ }
+ }
+
+ private static void readStructColumn(Object[] vals, int fieldIdx, StructColumnVector structVector, TypeDescription schema, int childCount) {
+
+ List<TypeDescription> childrenTypes = schema.getChildren();
+
+ int numFields = childrenTypes.size();
+
+ // Early out if struct column is repeating and always null.
+ // This is the only repeating case we need to handle.
+ // ORC assumes that repeating values have been pushed to the children.
+ if (structVector.isRepeating && structVector.isNull[0]) {
+ if (fieldIdx < 0) {
+ for (int i = 0; i < childCount; i++) {
+ vals[i] = null;
+ }
+ } else {
+ for (int i = 0; i < childCount; i++) {
+ ((Row) vals[i]).setField(fieldIdx, null);
+ }
+ }
+ return;
+ }
+
+ // create a batch of Rows to read the structs
+ Row[] structs = new Row[childCount];
+ // TODO: possible improvement: reuse existing Row objects
+ for (int i = 0; i < childCount; i++) {
+ structs[i] = new Row(numFields);
+ }
+
+ // read struct fields
+ for (int i = 0; i < numFields; i++) {
+ ColumnVector fieldVector = structVector.fields[i];
+ if (!fieldVector.isRepeating) {
+ // Reduce fieldVector reads by setting all entries null where struct is null.
+ if (fieldVector.noNulls) {
+ // fieldVector had no nulls. Just use struct null information.
+ System.arraycopy(structVector.isNull, 0, fieldVector.isNull, 0, structVector.isNull.length);
+ structVector.fields[i].noNulls = false;
+ } else {
+ // fieldVector had nulls. Merge field nulls with struct nulls.
+ for (int j = 0; j < structVector.isNull.length; j++) {
+ structVector.fields[i].isNull[j] = structVector.isNull[j] || structVector.fields[i].isNull[j];
+ }
+ }
+ }
+ readField(structs, i, childrenTypes.get(i), structVector.fields[i], childCount);
+ }
+
+ boolean[] isNullVector = structVector.isNull;
+
+ if (fieldIdx == -1) { // set struct as an object
+ for (int i = 0; i < childCount; i++) {
+ if (isNullVector[i]) {
+ vals[i] = null;
+ } else {
+ vals[i] = structs[i];
+ }
+ }
+ } else { // set struct as a field of Row
+ Row[] rows = (Row[]) vals;
+ for (int i = 0; i < childCount; i++) {
+ if (isNullVector[i]) {
+ rows[i].setField(fieldIdx, null);
+ } else {
+ rows[i].setField(fieldIdx, structs[i]);
+ }
+ }
+ }
+ }
+
+ private static void readListColumn(Object[] vals, int fieldIdx, ListColumnVector list, TypeDescription schema, int childCount) {
+
+ TypeDescription fieldType = schema.getChildren().get(0);
+ // get class of list elements
+ Class<?> classType = getClassForType(fieldType);
+
+ if (list.isRepeating) {
+ // list values are repeating. we only need to read the first list and copy it.
+
+ if (list.isNull[0]) {
+ // Even better. The first list is null and so are all lists are null
+ for (int i = 0; i < childCount; i++) {
+ if (fieldIdx == -1) {
+ vals[i] = null;
+ } else {
+ ((Row) vals[i]).setField(fieldIdx, null);
+ }
+ }
+
+ } else {
+ // Get function to copy list
+ Function<Object, Object> copyList = getCopyFunction(schema);
+
+ int offset = (int) list.offsets[0];
+ int length = (int) list.lengths[0];
+ // we only need to read until offset + length.
+ int entriesToRead = offset + length;
+
+ // read entries
+ Object[] children = (Object[]) Array.newInstance(classType, entriesToRead);
+ readField(children, -1, fieldType, list.child, entriesToRead);
+
+ // create first list which will be copied
+ Object[] temp;
+ if (offset == 0) {
+ temp = children;
+ } else {
+ temp = (Object[]) Array.newInstance(classType, length);
+ System.arraycopy(children, offset, temp, 0, length);
+ }
+
+ // copy repeated list and set copy as result
+ for (int i = 0; i < childCount; i++) {
+ Object[] copy = (Object[]) copyList.apply(temp);
+ if (fieldIdx == -1) {
+ vals[i] = copy;
+ } else {
+ ((Row) vals[i]).setField(fieldIdx, copy);
+ }
+ }
+ }
+
+ } else {
+ if (!list.child.isRepeating) {
+ boolean[] childIsNull = new boolean[list.childCount];
+ Arrays.fill(childIsNull, true);
+ // forward info of null lists into child vector
+ for (int i = 0; i < childCount; i++) {
+ // preserve isNull info of entries of non-null lists
+ if (!list.isNull[i]) {
+ int offset = (int) list.offsets[i];
+ int length = (int) list.lengths[i];
+ System.arraycopy(list.child.isNull, offset, childIsNull, offset, length);
+ }
+ }
+ // override isNull of children vector
+ list.child.isNull = childIsNull;
+ list.child.noNulls = false;
+ }
+
+ // read children
+ Object[] children = (Object[]) Array.newInstance(classType, list.childCount);
+ readField(children, -1, fieldType, list.child, list.childCount);
+
+ Object[] temp;
+ // fill lists with children
+ for (int i = 0; i < childCount; i++) {
+
+ if (list.isNull[i]) {
+ temp = null;
+ } else {
+ int offset = (int) list.offsets[i];
+ int length = (int) list.lengths[i];
+
+ temp = (Object[]) Array.newInstance(classType, length);
+ System.arraycopy(children, offset, temp, 0, length);
+ }
+
+ if (fieldIdx == -1) {
+ vals[i] = temp;
+ } else {
+ ((Row) vals[i]).setField(fieldIdx, temp);
+ }
+ }
+ }
+ }
+
+ private static void readMapColumn(Object[] vals, int fieldIdx, MapColumnVector map, TypeDescription schema, int childCount) {
+
+ List<TypeDescription> fieldType = schema.getChildren();
+ TypeDescription keyType = fieldType.get(0);
+ TypeDescription valueType = fieldType.get(1);
+
+ ColumnVector keys = map.keys;
+ ColumnVector values = map.values;
+
+ if (map.isRepeating) {
+ // map values are repeating. we only need to read the first map and copy it.
+
+ if (map.isNull[0]) {
+ // Even better. The first map is null and so are all maps are null
+ for (int i = 0; i < childCount; i++) {
+ if (fieldIdx == -1) {
+ vals[i] = null;
+ } else {
+ ((Row) vals[i]).setField(fieldIdx, null);
+ }
+ }
+
+ } else {
+ // Get function to copy map
+ Function<Object, Object> copyMap = getCopyFunction(schema);
+
+ int offset = (int) map.offsets[0];
+ int length = (int) map.lengths[0];
+ // we only need to read until offset + length.
+ int entriesToRead = offset + length;
+
+ Object[] keyRows = new Object[entriesToRead];
+ Object[] valueRows = new Object[entriesToRead];
+
+ // read map keys and values
+ readField(keyRows, -1, keyType, keys, entriesToRead);
+ readField(valueRows, -1, valueType, values, entriesToRead);
+
+ // create first map which will be copied
+ HashMap temp = readHashMap(keyRows, valueRows, offset, length);
+
+ // copy repeated map and set copy as result
+ for (int i = 0; i < childCount; i++) {
+ if (fieldIdx == -1) {
+ vals[i] = copyMap.apply(temp);
+ } else {
+ ((Row) vals[i]).setField(fieldIdx, copyMap.apply(temp));
+ }
+ }
+ }
+ } else {
+ // ensure only keys and values that are referenced by non-null maps are set to non-null
+
+ if (!keys.isRepeating) {
+ // propagate is null info of map into keys vector
+ boolean[] keyIsNull = new boolean[map.childCount];
+ Arrays.fill(keyIsNull, true);
+ for (int i = 0; i < childCount; i++) {
+ // preserve isNull info for keys of non-null maps
+ if (!map.isNull[i]) {
+ int offset = (int) map.offsets[i];
+ int length = (int) map.lengths[i];
+ System.arraycopy(keys.isNull, offset, keyIsNull, offset, length);
+ }
+ }
+ // override isNull of keys vector
+ keys.isNull = keyIsNull;
+ keys.noNulls = false;
+ }
+ if (!values.isRepeating) {
+ // propagate is null info of map into values vector
+ boolean[] valIsNull = new boolean[map.childCount];
+ Arrays.fill(valIsNull, true);
+ for (int i = 0; i < childCount; i++) {
+ // preserve isNull info for vals of non-null maps
+ if (!map.isNull[i]) {
+ int offset = (int) map.offsets[i];
+ int length = (int) map.lengths[i];
+ System.arraycopy(values.isNull, offset, valIsNull, offset, length);
+ }
+ }
+ // override isNull of values vector
+ values.isNull = valIsNull;
+ values.noNulls = false;
+ }
+
+ Object[] keyRows = new Object[map.childCount];
+ Object[] valueRows = new Object[map.childCount];
+
+ // read map keys and values
+ readField(keyRows, -1, keyType, keys, keyRows.length);
+ readField(valueRows, -1, valueType, values, valueRows.length);
+
+ boolean[] isNullVector = map.isNull;
+ long[] lengths = map.lengths;
+ long[] offsets = map.offsets;
+
+ if (fieldIdx == -1) { // set map as an object
+ for (int i = 0; i < childCount; i++) {
+ if (isNullVector[i]) {
+ vals[i] = null;
+ } else {
+ vals[i] = readHashMap(keyRows, valueRows, (int) offsets[i], lengths[i]);
+ }
+ }
+ } else { // set map as a field of Row
+ Row[] rows = (Row[]) vals;
+ for (int i = 0; i < childCount; i++) {
+ if (isNullVector[i]) {
+ rows[i].setField(fieldIdx, null);
+ } else {
+ rows[i].setField(fieldIdx, readHashMap(keyRows, valueRows, (int) offsets[i], lengths[i]));
+ }
+ }
+ }
+ }
+ }
+
+ /**
+ * Sets a repeating value to all objects or row fields of the passed vals array.
+ *
+ * @param vals The array of objects or Rows.
+ * @param fieldIdx If the objs array is an array of Row, the index of the field that needs to be filled.
+ * Otherwise a -1 must be passed and the data is directly filled into the array.
+ * @param repeatingValue The value that is set.
+ * @param childCount The number of times the value is set.
+ */
+ private static void fillColumnWithRepeatingValue(Object[] vals, int fieldIdx, Object repeatingValue, int childCount) {
+
+ if (fieldIdx == -1) {
+ // set value as an object
+ Arrays.fill(vals, 0, childCount, repeatingValue);
+ } else {
+ // set value as a field of Row
+ Row[] rows = (Row[]) vals;
+ for (int i = 0; i < childCount; i++) {
+ rows[i].setField(fieldIdx, repeatingValue);
+ }
+ }
+ }
+
+ private static Class<?> getClassForType(TypeDescription schema) {
+
+ // check the type of the vector to decide how to read it.
+ switch (schema.getCategory()) {
+ case BOOLEAN:
+ return Boolean.class;
+ case BYTE:
+ return Byte.class;
+ case SHORT:
+ return Short.class;
+ case INT:
+ return Integer.class;
+ case LONG:
+ return Long.class;
+ case FLOAT:
+ return Float.class;
+ case DOUBLE:
+ return Double.class;
+ case CHAR:
+ case VARCHAR:
+ case STRING:
+ return String.class;
+ case DATE:
+ return Date.class;
+ case TIMESTAMP:
+ return Timestamp.class;
+ case BINARY:
+ return byte[].class;
+ case DECIMAL:
+ return BigDecimal.class;
+ case STRUCT:
+ return Row.class;
+ case LIST:
+ Class<?> childClass = getClassForType(schema.getChildren().get(0));
+ return Array.newInstance(childClass, 0).getClass();
+ case MAP:
+ return HashMap.class;
+ case UNION:
+ throw new UnsupportedOperationException("UNION type not supported yet");
+ default:
+ throw new IllegalArgumentException("Unknown type " + schema);
+ }
+ }
+
+ private static Boolean readBoolean(long l) {
+ return l != 0;
+ }
+
+ private static Byte readByte(long l) {
+ return (byte) l;
+ }
+
+ private static Short readShort(long l) {
+ return (short) l;
+ }
+
+ private static Integer readInt(long l) {
+ return (int) l;
+ }
+
+ private static Long readLong(long l) {
+ return l;
+ }
+
+ private static Float readFloat(double d) {
+ return (float) d;
+ }
+
+ private static Double readDouble(double d) {
+ return d;
+ }
+
+ private static Date readDate(long l) {
+ // day to milliseconds
+ final long t = l * MILLIS_PER_DAY;
+ // adjust by local timezone
+ return new java.sql.Date(t - LOCAL_TZ.getOffset(t));
+ }
+
+ private static String readString(byte[] bytes, int start, int length) {
+ return new String(bytes, start, length, StandardCharsets.UTF_8);
+ }
+
+ private static byte[] readBinary(byte[] src, int srcPos, int length) {
+ byte[] result = new byte[length];
+ System.arraycopy(src, srcPos, result, 0, length);
+ return result;
+ }
+
+ private static BigDecimal readBigDecimal(HiveDecimalWritable hiveDecimalWritable) {
+ HiveDecimal hiveDecimal = hiveDecimalWritable.getHiveDecimal();
+ return hiveDecimal.bigDecimalValue();
+ }
+
+ private static Timestamp readTimestamp(long time, int nanos) {
+ Timestamp ts = new Timestamp(time);
+ ts.setNanos(nanos);
+ return ts;
+ }
+
+ private static HashMap readHashMap(Object[] keyRows, Object[] valueRows, int offset, long length) {
+ HashMap<Object, Object> resultMap = new HashMap<>();
+ for (int j = 0; j < length; j++) {
+ resultMap.put(keyRows[offset], valueRows[offset]);
+ offset++;
+ }
+ return resultMap;
+ }
+
+ @SuppressWarnings("unchecked")
+ private static Function<Object, Object> getCopyFunction(TypeDescription schema) {
+ // check the type of the vector to decide how to read it.
+ switch (schema.getCategory()) {
+ case BOOLEAN:
+ case BYTE:
+ case SHORT:
+ case INT:
+ case LONG:
+ case FLOAT:
+ case DOUBLE:
+ case CHAR:
+ case VARCHAR:
+ case STRING:
+ case DECIMAL:
+ return OrcBatchReader::returnImmutable;
+ case DATE:
+ return OrcBatchReader::copyDate;
+ case TIMESTAMP:
+ return OrcBatchReader::copyTimestamp;
+ case BINARY:
+ return OrcBatchReader::copyBinary;
+ case STRUCT:
+ List<TypeDescription> fieldTypes = schema.getChildren();
+ Function<Object, Object>[] copyFields = new Function[fieldTypes.size()];
+ for (int i = 0; i < fieldTypes.size(); i++) {
+ copyFields[i] = getCopyFunction(fieldTypes.get(i));
+ }
+ return new CopyStruct(copyFields);
+ case LIST:
+ TypeDescription entryType = schema.getChildren().get(0);
+ Function<Object, Object> copyEntry = getCopyFunction(entryType);
+ Class entryClass = getClassForType(entryType);
+ return new CopyList(copyEntry, entryClass);
+ case MAP:
+ TypeDescription keyType = schema.getChildren().get(0);
+ TypeDescription valueType = schema.getChildren().get(1);
+ Function<Object, Object> copyKey = getCopyFunction(keyType);
+ Function<Object, Object> copyValue = getCopyFunction(valueType);
+ return new CopyMap(copyKey, copyValue);
+ case UNION:
+ throw new UnsupportedOperationException("UNION type not supported yet");
+ default:
+ throw new IllegalArgumentException("Unknown type " + schema);
+ }
+ }
+
+ private static Object returnImmutable(Object o) {
+ return o;
+ }
+
+ private static Date copyDate(Object o) {
+ if (o == null) {
+ return null;
+ } else {
+ long date = ((Date) o).getTime();
+ return new Date(date);
+ }
+ }
+
+ private static Timestamp copyTimestamp(Object o) {
+ if (o == null) {
+ return null;
+ } else {
+ long millis = ((Timestamp) o).getTime();
+ int nanos = ((Timestamp) o).getNanos();
+ Timestamp copy = new Timestamp(millis);
+ copy.setNanos(nanos);
+ return copy;
+ }
+ }
+
+ private static byte[] copyBinary(Object o) {
+ if (o == null) {
+ return null;
+ } else {
+ int length = ((byte[]) o).length;
+ return Arrays.copyOf((byte[]) o, length);
+ }
+ }
+
+ private static class CopyStruct implements Function<Object, Object> {
+
+ private final Function<Object, Object>[] copyFields;
+
+ CopyStruct(Function<Object, Object>[] copyFields) {
+ this.copyFields = copyFields;
+ }
+
+ @Override
+ public Object apply(Object o) {
+ if (o == null) {
+ return null;
+ } else {
+ Row r = (Row) o;
+ Row copy = new Row(copyFields.length);
+ for (int i = 0; i < copyFields.length; i++) {
+ copy.setField(i, copyFields[i].apply(r.getField(i)));
+ }
+ return copy;
+ }
+ }
+ }
+
+ private static class CopyList implements Function<Object, Object> {
+
+ private final Function<Object, Object> copyEntry;
+ private final Class entryClass;
+
+ CopyList(Function<Object, Object> copyEntry, Class entryClass) {
+ this.copyEntry = copyEntry;
+ this.entryClass = entryClass;
+ }
+
+ @Override
+ public Object apply(Object o) {
+ if (o == null) {
+ return null;
+ } else {
+ Object[] l = (Object[]) o;
+ Object[] copy = (Object[]) Array.newInstance(entryClass, l.length);
+ for (int i = 0; i < l.length; i++) {
+ copy[i] = copyEntry.apply(l[i]);
+ }
+ return copy;
+ }
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ private static class CopyMap implements Function<Object, Object> {
+
+ private final Function<Object, Object> copyKey;
+ private final Function<Object, Object> copyValue;
+
+ CopyMap(Function<Object, Object> copyKey, Function<Object, Object> copyValue) {
+ this.copyKey = copyKey;
+ this.copyValue = copyValue;
+ }
+
+ @Override
+ public Object apply(Object o) {
+ if (o == null) {
+ return null;
+ } else {
+ Map<Object, Object> m = (Map<Object, Object>) o;
+ HashMap<Object, Object> copy = new HashMap<>(m.size());
+
+ for (Map.Entry<Object, Object> e : m.entrySet()) {
+ Object keyCopy = copyKey.apply(e.getKey());
+ Object valueCopy = copyValue.apply(e.getValue());
+ copy.put(keyCopy, valueCopy);
+ }
+ return copy;
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/bcead3be/flink-connectors/flink-orc/src/main/java/org/apache/flink/orc/OrcRowInputFormat.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-orc/src/main/java/org/apache/flink/orc/OrcRowInputFormat.java b/flink-connectors/flink-orc/src/main/java/org/apache/flink/orc/OrcRowInputFormat.java
index 4353cbc..61575ad 100644
--- a/flink-connectors/flink-orc/src/main/java/org/apache/flink/orc/OrcRowInputFormat.java
+++ b/flink-connectors/flink-orc/src/main/java/org/apache/flink/orc/OrcRowInputFormat.java
@@ -55,7 +55,7 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
-import static org.apache.flink.orc.OrcUtils.fillRows;
+import static org.apache.flink.orc.OrcBatchReader.fillRows;
/**
* InputFormat to read ORC files.
@@ -128,7 +128,7 @@ public class OrcRowInputFormat extends FileInputFormat<Row> implements ResultTyp
// configure OrcRowInputFormat
this.schema = orcSchema;
- this.rowType = (RowTypeInfo) OrcUtils.schemaToTypeInfo(schema);
+ this.rowType = (RowTypeInfo) OrcBatchReader.schemaToTypeInfo(schema);
this.conf = orcConfig;
this.batchSize = batchSize;
http://git-wip-us.apache.org/repos/asf/flink/blob/bcead3be/flink-connectors/flink-orc/src/main/java/org/apache/flink/orc/OrcTableSource.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-orc/src/main/java/org/apache/flink/orc/OrcTableSource.java b/flink-connectors/flink-orc/src/main/java/org/apache/flink/orc/OrcTableSource.java
index 31a9303..0eab4a0 100644
--- a/flink-connectors/flink-orc/src/main/java/org/apache/flink/orc/OrcTableSource.java
+++ b/flink-connectors/flink-orc/src/main/java/org/apache/flink/orc/OrcTableSource.java
@@ -127,7 +127,7 @@ public class OrcTableSource
this.predicates = predicates;
// determine the type information from the ORC schema
- RowTypeInfo typeInfoFromSchema = (RowTypeInfo) OrcUtils.schemaToTypeInfo(this.orcSchema);
+ RowTypeInfo typeInfoFromSchema = (RowTypeInfo) OrcBatchReader.schemaToTypeInfo(this.orcSchema);
// set return type info
if (selectedFields == null) {