You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tw...@apache.org on 2018/01/31 13:45:02 UTC

[1/2] flink git commit: [FLINK-8230] [orc] Fix NPEs when reading nested columns.

Repository: flink
Updated Branches:
  refs/heads/master 3cfc5ae9f -> bcead3be3


http://git-wip-us.apache.org/repos/asf/flink/blob/bcead3be/flink-connectors/flink-orc/src/main/java/org/apache/flink/orc/OrcUtils.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-orc/src/main/java/org/apache/flink/orc/OrcUtils.java b/flink-connectors/flink-orc/src/main/java/org/apache/flink/orc/OrcUtils.java
deleted file mode 100644
index cfb4e0e..0000000
--- a/flink-connectors/flink-orc/src/main/java/org/apache/flink/orc/OrcUtils.java
+++ /dev/null
@@ -1,1508 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.orc;
-
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
-import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
-import org.apache.flink.api.common.typeinfo.SqlTimeTypeInfo;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.typeutils.MapTypeInfo;
-import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo;
-import org.apache.flink.api.java.typeutils.RowTypeInfo;
-import org.apache.flink.types.Row;
-
-import org.apache.hadoop.hive.common.type.HiveDecimal;
-import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
-import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
-import org.apache.hadoop.hive.ql.exec.vector.DecimalColumnVector;
-import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector;
-import org.apache.hadoop.hive.ql.exec.vector.ListColumnVector;
-import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
-import org.apache.hadoop.hive.ql.exec.vector.MapColumnVector;
-import org.apache.hadoop.hive.ql.exec.vector.StructColumnVector;
-import org.apache.hadoop.hive.ql.exec.vector.TimestampColumnVector;
-import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
-import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable;
-import org.apache.orc.TypeDescription;
-
-import java.lang.reflect.Array;
-import java.math.BigDecimal;
-import java.nio.charset.StandardCharsets;
-import java.sql.Date;
-import java.sql.Timestamp;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.List;
-import java.util.TimeZone;
-import java.util.function.DoubleFunction;
-import java.util.function.IntFunction;
-import java.util.function.LongFunction;
-
-/**
- * A class that provides utility methods for orc file reading.
- */
-class OrcUtils {
-
-	private static final long MILLIS_PER_DAY = 86400000; // = 24 * 60 * 60 * 1000
-	private static final TimeZone LOCAL_TZ = TimeZone.getDefault();
-
-	/**
-	 * Converts an ORC schema to a Flink TypeInformation.
-	 *
-	 * @param schema The ORC schema.
-	 * @return The TypeInformation that corresponds to the ORC schema.
-	 */
-	static TypeInformation schemaToTypeInfo(TypeDescription schema) {
-		switch (schema.getCategory()) {
-			case BOOLEAN:
-				return BasicTypeInfo.BOOLEAN_TYPE_INFO;
-			case BYTE:
-				return BasicTypeInfo.BYTE_TYPE_INFO;
-			case SHORT:
-				return BasicTypeInfo.SHORT_TYPE_INFO;
-			case INT:
-				return BasicTypeInfo.INT_TYPE_INFO;
-			case LONG:
-				return BasicTypeInfo.LONG_TYPE_INFO;
-			case FLOAT:
-				return BasicTypeInfo.FLOAT_TYPE_INFO;
-			case DOUBLE:
-				return BasicTypeInfo.DOUBLE_TYPE_INFO;
-			case DECIMAL:
-				return BasicTypeInfo.BIG_DEC_TYPE_INFO;
-			case STRING:
-			case CHAR:
-			case VARCHAR:
-				return BasicTypeInfo.STRING_TYPE_INFO;
-			case DATE:
-				return SqlTimeTypeInfo.DATE;
-			case TIMESTAMP:
-				return SqlTimeTypeInfo.TIMESTAMP;
-			case BINARY:
-				return PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO;
-			case STRUCT:
-				List<TypeDescription> fieldSchemas = schema.getChildren();
-				TypeInformation[] fieldTypes = new TypeInformation[fieldSchemas.size()];
-				for (int i = 0; i < fieldSchemas.size(); i++) {
-					fieldTypes[i] = schemaToTypeInfo(fieldSchemas.get(i));
-				}
-				String[] fieldNames = schema.getFieldNames().toArray(new String[]{});
-				return new RowTypeInfo(fieldTypes, fieldNames);
-			case LIST:
-				TypeDescription elementSchema = schema.getChildren().get(0);
-				TypeInformation<?> elementType = schemaToTypeInfo(elementSchema);
-				// arrays of primitive types are handled as object arrays to support null values
-				return ObjectArrayTypeInfo.getInfoFor(elementType);
-			case MAP:
-				TypeDescription keySchema = schema.getChildren().get(0);
-				TypeDescription valSchema = schema.getChildren().get(1);
-				TypeInformation<?> keyType = schemaToTypeInfo(keySchema);
-				TypeInformation<?> valType = schemaToTypeInfo(valSchema);
-				return new MapTypeInfo<>(keyType, valType);
-			case UNION:
-				throw new UnsupportedOperationException("UNION type is not supported yet.");
-			default:
-				throw new IllegalArgumentException("Unknown type " + schema);
-		}
-	}
-
-	/**
-	 * Fills an ORC batch into an array of Row.
-	 *
-	 * @param rows The batch of rows need to be filled.
-	 * @param schema The schema of the ORC data.
-	 * @param batch The ORC data.
-	 * @param selectedFields The list of selected ORC fields.
-	 * @return The number of rows that were filled.
-	 */
-	static int fillRows(Row[] rows, TypeDescription schema, VectorizedRowBatch batch, int[] selectedFields) {
-
-		int rowsToRead = Math.min((int) batch.count(), rows.length);
-
-		List<TypeDescription> fieldTypes = schema.getChildren();
-		// read each selected field
-		for (int rowIdx = 0; rowIdx < selectedFields.length; rowIdx++) {
-			int orcIdx = selectedFields[rowIdx];
-			readField(rows, rowIdx, fieldTypes.get(orcIdx), batch.cols[orcIdx], null, rowsToRead);
-		}
-		return rowsToRead;
-	}
-
-	/**
-	 * Reads a vector of data into an array of objects.
-	 *
-	 * @param vals The array that needs to be filled.
-	 * @param fieldIdx If the vals array is an array of Row, the index of the field that needs to be filled.
-	 *                 Otherwise a -1 must be passed and the data is directly filled into the array.
-	 * @param schema The schema of the vector to read.
-	 * @param vector The vector to read.
-	 * @param lengthVector If the vector is of type List or Map, the number of sub-elements to read for each field. Otherwise, it must be null.
-	 * @param childCount The number of vector entries to read.
-	 */
-	private static void readField(Object[] vals, int fieldIdx, TypeDescription schema, ColumnVector vector, long[] lengthVector, int childCount) {
-
-		// check the type of the vector to decide how to read it.
-		switch (schema.getCategory()) {
-			case BOOLEAN:
-				if (vector.noNulls) {
-					readNonNullLongColumn(vals, fieldIdx, (LongColumnVector) vector, lengthVector, childCount, OrcUtils::readBoolean, OrcUtils::boolArray);
-				} else {
-					readLongColumn(vals, fieldIdx, (LongColumnVector) vector, lengthVector, childCount, OrcUtils::readBoolean, OrcUtils::boolArray);
-				}
-				break;
-			case BYTE:
-				if (vector.noNulls) {
-					readNonNullLongColumn(vals, fieldIdx, (LongColumnVector) vector, lengthVector, childCount, OrcUtils::readByte, OrcUtils::byteArray);
-				} else {
-					readLongColumn(vals, fieldIdx, (LongColumnVector) vector, lengthVector, childCount, OrcUtils::readByte, OrcUtils::byteArray);
-				}
-				break;
-			case SHORT:
-				if (vector.noNulls) {
-					readNonNullLongColumn(vals, fieldIdx, (LongColumnVector) vector, lengthVector, childCount, OrcUtils::readShort, OrcUtils::shortArray);
-				} else {
-					readLongColumn(vals, fieldIdx, (LongColumnVector) vector, lengthVector, childCount, OrcUtils::readShort, OrcUtils::shortArray);
-				}
-				break;
-			case INT:
-				if (vector.noNulls) {
-					readNonNullLongColumn(vals, fieldIdx, (LongColumnVector) vector, lengthVector, childCount, OrcUtils::readInt, OrcUtils::intArray);
-				} else {
-					readLongColumn(vals, fieldIdx, (LongColumnVector) vector, lengthVector, childCount, OrcUtils::readInt, OrcUtils::intArray);
-				}
-				break;
-			case LONG:
-				if (vector.noNulls) {
-					readNonNullLongColumn(vals, fieldIdx, (LongColumnVector) vector, lengthVector, childCount, OrcUtils::readLong, OrcUtils::longArray);
-				} else {
-					readLongColumn(vals, fieldIdx, (LongColumnVector) vector, lengthVector, childCount, OrcUtils::readLong, OrcUtils::longArray);
-				}
-				break;
-			case FLOAT:
-				if (vector.noNulls) {
-					readNonNullDoubleColumn(vals, fieldIdx, (DoubleColumnVector) vector, lengthVector, childCount, OrcUtils::readFloat, OrcUtils::floatArray);
-				} else {
-					readDoubleColumn(vals, fieldIdx, (DoubleColumnVector) vector, lengthVector, childCount, OrcUtils::readFloat, OrcUtils::floatArray);
-				}
-				break;
-			case DOUBLE:
-				if (vector.noNulls) {
-					readNonNullDoubleColumn(vals, fieldIdx, (DoubleColumnVector) vector, lengthVector, childCount, OrcUtils::readDouble, OrcUtils::doubleArray);
-				} else {
-					readDoubleColumn(vals, fieldIdx, (DoubleColumnVector) vector, lengthVector, childCount, OrcUtils::readDouble, OrcUtils::doubleArray);
-				}
-				break;
-			case CHAR:
-			case VARCHAR:
-			case STRING:
-				if (vector.noNulls) {
-					readNonNullBytesColumnAsString(vals, fieldIdx, (BytesColumnVector) vector, lengthVector, childCount);
-				} else {
-					readBytesColumnAsString(vals, fieldIdx, (BytesColumnVector) vector, lengthVector, childCount);
-				}
-				break;
-			case DATE:
-				if (vector.noNulls) {
-					readNonNullLongColumnAsDate(vals, fieldIdx, (LongColumnVector) vector, lengthVector, childCount);
-				} else {
-					readLongColumnAsDate(vals, fieldIdx, (LongColumnVector) vector, lengthVector, childCount);
-				}
-				break;
-			case TIMESTAMP:
-				if (vector.noNulls) {
-					readNonNullTimestampColumn(vals, fieldIdx, (TimestampColumnVector) vector, lengthVector, childCount);
-				} else {
-					readTimestampColumn(vals, fieldIdx, (TimestampColumnVector) vector, lengthVector, childCount);
-				}
-				break;
-			case BINARY:
-				if (vector.noNulls) {
-					readNonNullBytesColumnAsBinary(vals, fieldIdx, (BytesColumnVector) vector, lengthVector, childCount);
-				} else {
-					readBytesColumnAsBinary(vals, fieldIdx, (BytesColumnVector) vector, lengthVector, childCount);
-				}
-				break;
-			case DECIMAL:
-				if (vector.noNulls) {
-					readNonNullDecimalColumn(vals, fieldIdx, (DecimalColumnVector) vector, lengthVector, childCount);
-				} else {
-					readDecimalColumn(vals, fieldIdx, (DecimalColumnVector) vector, lengthVector, childCount);
-				}
-				break;
-			case STRUCT:
-				if (vector.noNulls) {
-					readNonNullStructColumn(vals, fieldIdx, (StructColumnVector) vector, schema, lengthVector, childCount);
-				} else {
-					readStructColumn(vals, fieldIdx, (StructColumnVector) vector, schema, lengthVector, childCount);
-				}
-				break;
-			case LIST:
-				if (vector.noNulls) {
-					readNonNullListColumn(vals, fieldIdx, (ListColumnVector) vector, schema, lengthVector, childCount);
-				} else {
-					readListColumn(vals, fieldIdx, (ListColumnVector) vector, schema, lengthVector, childCount);
-				}
-				break;
-			case MAP:
-				if (vector.noNulls) {
-					readNonNullMapColumn(vals, fieldIdx, (MapColumnVector) vector, schema, lengthVector, childCount);
-				} else {
-					readMapColumn(vals, fieldIdx, (MapColumnVector) vector, schema, lengthVector, childCount);
-				}
-				break;
-			case UNION:
-				throw new UnsupportedOperationException("UNION type not supported yet");
-			default:
-				throw new IllegalArgumentException("Unknown type " + schema);
-		}
-	}
-
-	private static <T> void readNonNullLongColumn(Object[] vals, int fieldIdx, LongColumnVector vector, long[] lengthVector, int childCount,
-													LongFunction<T> reader, IntFunction<T[]> array) {
-
-		// check if the values need to be read into lists or as single values
-		if (lengthVector == null) {
-			if (vector.isRepeating) { // fill complete column with first value
-				T repeatingValue = reader.apply(vector.vector[0]);
-				fillColumnWithRepeatingValue(vals, fieldIdx, repeatingValue, childCount);
-			} else {
-				if (fieldIdx == -1) { // set as an object
-					for (int i = 0; i < childCount; i++) {
-						vals[i] = reader.apply(vector.vector[i]);
-					}
-				} else { // set as a field of Row
-					Row[] rows = (Row[]) vals;
-					for (int i = 0; i < childCount; i++) {
-						rows[i].setField(fieldIdx, reader.apply(vector.vector[i]));
-					}
-				}
-			}
-		} else { // in a list
-			T[] temp;
-			int offset = 0;
-			if (vector.isRepeating) { // fill complete list with first value
-				T repeatingValue = reader.apply(vector.vector[0]);
-				for (int i = 0; offset < childCount; i++) {
-					temp = array.apply((int) lengthVector[i]);
-					Arrays.fill(temp, repeatingValue);
-					offset += temp.length;
-					if (fieldIdx == -1) {
-						vals[i] = temp;
-					} else {
-						((Row) vals[i]).setField(fieldIdx, temp);
-					}
-				}
-			} else {
-				for (int i = 0; offset < childCount; i++) {
-					temp = array.apply((int) lengthVector[i]);
-					for (int j = 0; j < temp.length; j++) {
-						temp[j] = reader.apply(vector.vector[offset++]);
-					}
-					if (fieldIdx == -1) {
-						vals[i] = temp;
-					} else {
-						((Row) vals[i]).setField(fieldIdx, temp);
-					}
-				}
-			}
-		}
-	}
-
-	private static <T> void readNonNullDoubleColumn(Object[] vals, int fieldIdx, DoubleColumnVector vector, long[] lengthVector, int childCount,
-													DoubleFunction<T> reader, IntFunction<T[]> array) {
-
-		// check if the values need to be read into lists or as single values
-		if (lengthVector == null) {
-			if (vector.isRepeating) { // fill complete column with first value
-				T repeatingValue = reader.apply(vector.vector[0]);
-				fillColumnWithRepeatingValue(vals, fieldIdx, repeatingValue, childCount);
-			} else {
-				if (fieldIdx == -1) { // set as an object
-					for (int i = 0; i < childCount; i++) {
-						vals[i] = reader.apply(vector.vector[i]);
-					}
-				} else { // set as a field of Row
-					Row[] rows = (Row[]) vals;
-					for (int i = 0; i < childCount; i++) {
-						rows[i].setField(fieldIdx, reader.apply(vector.vector[i]));
-					}
-				}
-			}
-		} else { // in a list
-			T[] temp;
-			int offset = 0;
-			if (vector.isRepeating) { // fill complete list with first value
-				T repeatingValue = reader.apply(vector.vector[0]);
-				for (int i = 0; offset < childCount; i++) {
-					temp = array.apply((int) lengthVector[i]);
-					Arrays.fill(temp, repeatingValue);
-					offset += temp.length;
-					if (fieldIdx == -1) {
-						vals[i] = temp;
-					} else {
-						((Row) vals[i]).setField(fieldIdx, temp);
-					}
-				}
-			} else {
-				for (int i = 0; offset < childCount; i++) {
-					temp = array.apply((int) lengthVector[i]);
-					for (int j = 0; j < temp.length; j++) {
-						temp[j] = reader.apply(vector.vector[offset++]);
-					}
-					if (fieldIdx == -1) {
-						vals[i] = temp;
-					} else {
-						((Row) vals[i]).setField(fieldIdx, temp);
-					}
-				}
-			}
-		}
-	}
-
-	private static void readNonNullBytesColumnAsString(Object[] vals, int fieldIdx, BytesColumnVector bytes, long[] lengthVector, int childCount) {
-		// check if the values need to be read into lists or as single values
-		if (lengthVector == null) {
-			if (bytes.isRepeating) { // fill complete column with first value
-				String repeatingValue = new String(bytes.vector[0], bytes.start[0], bytes.length[0]);
-				fillColumnWithRepeatingValue(vals, fieldIdx, repeatingValue, childCount);
-			} else {
-				if (fieldIdx == -1) { // set as an object
-					for (int i = 0; i < childCount; i++) {
-						vals[i] = new String(bytes.vector[i], bytes.start[i], bytes.length[i], StandardCharsets.UTF_8);
-					}
-				} else { // set as a field of Row
-					Row[] rows = (Row[]) vals;
-					for (int i = 0; i < childCount; i++) {
-						rows[i].setField(fieldIdx, new String(bytes.vector[i], bytes.start[i], bytes.length[i], StandardCharsets.UTF_8));
-					}
-				}
-			}
-		} else {
-			String[] temp;
-			int offset = 0;
-			if (bytes.isRepeating) { // fill complete list with first value
-				String repeatingValue = new String(bytes.vector[0], bytes.start[0], bytes.length[0], StandardCharsets.UTF_8);
-				for (int i = 0; offset < childCount; i++) {
-					temp = new String[(int) lengthVector[i]];
-					Arrays.fill(temp, repeatingValue);
-					offset += temp.length;
-					if (fieldIdx == -1) {
-						vals[i] = temp;
-					} else {
-						((Row) vals[i]).setField(fieldIdx, temp);
-					}
-				}
-			} else {
-				for (int i = 0; offset < childCount; i++) {
-					temp = new String[(int) lengthVector[i]];
-					for (int j = 0; j < temp.length; j++) {
-						temp[j] = new String(bytes.vector[offset], bytes.start[offset], bytes.length[offset], StandardCharsets.UTF_8);
-						offset++;
-					}
-					if (fieldIdx == -1) {
-						vals[i] = temp;
-					} else {
-						((Row) vals[i]).setField(fieldIdx, temp);
-					}
-				}
-			}
-		}
-	}
-
-	private static void readNonNullBytesColumnAsBinary(Object[] vals, int fieldIdx, BytesColumnVector bytes, long[] lengthVector, int childCount) {
-		// check if the values need to be read into lists or as single values
-		if (lengthVector == null) {
-			if (bytes.isRepeating) { // fill complete column with first value
-				if (fieldIdx == -1) { // set as an object
-					for (int i = 0; i < childCount; i++) {
-						// don't reuse repeating val to avoid object mutation
-						vals[i] = readBinary(bytes.vector[0], bytes.start[0], bytes.length[0]);
-					}
-				} else { // set as a field of Row
-					Row[] rows = (Row[]) vals;
-					for (int i = 0; i < childCount; i++) {
-						// don't reuse repeating val to avoid object mutation
-						rows[i].setField(fieldIdx, readBinary(bytes.vector[0], bytes.start[0], bytes.length[0]));
-					}
-				}
-			} else {
-				if (fieldIdx == -1) { // set as an object
-					for (int i = 0; i < childCount; i++) {
-						vals[i] = readBinary(bytes.vector[i], bytes.start[i], bytes.length[i]);
-					}
-				} else { // set as a field of Row
-					Row[] rows = (Row[]) vals;
-					for (int i = 0; i < childCount; i++) {
-						rows[i].setField(fieldIdx, readBinary(bytes.vector[i], bytes.start[i], bytes.length[i]));
-					}
-				}
-			}
-		} else {
-			byte[][] temp;
-			int offset = 0;
-			if (bytes.isRepeating) { // fill complete list with first value
-				for (int i = 0; offset < childCount; i++) {
-					temp = new byte[(int) lengthVector[i]][];
-					for (int j = 0; j < temp.length; j++) {
-						temp[j] = readBinary(bytes.vector[0], bytes.start[0], bytes.length[0]);
-					}
-					offset += temp.length;
-					if (fieldIdx == -1) {
-						vals[i] = temp;
-					} else {
-						((Row) vals[i]).setField(fieldIdx, temp);
-					}
-				}
-			} else {
-				for (int i = 0; offset < childCount; i++) {
-					temp = new byte[(int) lengthVector[i]][];
-					for (int j = 0; j < temp.length; j++) {
-						temp[j] = readBinary(bytes.vector[offset], bytes.start[offset], bytes.length[offset]);
-						offset++;
-					}
-					if (fieldIdx == -1) {
-						vals[i] = temp;
-					} else {
-						((Row) vals[i]).setField(fieldIdx, temp);
-					}
-				}
-			}
-		}
-	}
-
-	private static void readNonNullLongColumnAsDate(Object[] vals, int fieldIdx, LongColumnVector vector, long[] lengthVector, int childCount) {
-
-		// check if the values need to be read into lists or as single values
-		if (lengthVector == null) {
-			if (vector.isRepeating) { // fill complete column with first value
-				if (fieldIdx == -1) { // set as an object
-					for (int i = 0; i < childCount; i++) {
-						// do not reuse repeated value due to mutability of Date
-						vals[i] = readDate(vector.vector[0]);
-					}
-				} else { // set as a field of Row
-					Row[] rows = (Row[]) vals;
-					for (int i = 0; i < childCount; i++) {
-						// do not reuse repeated value due to mutability of Date
-						rows[i].setField(fieldIdx, readDate(vector.vector[0]));
-					}
-				}
-			} else {
-				if (fieldIdx == -1) { // set as an object
-					for (int i = 0; i < childCount; i++) {
-						vals[i] = readDate(vector.vector[i]);
-					}
-				} else { // set as a field of Row
-					Row[] rows = (Row[]) vals;
-					for (int i = 0; i < childCount; i++) {
-						rows[i].setField(fieldIdx, readDate(vector.vector[i]));
-					}
-				}
-			}
-		} else { // in a list
-			Date[] temp;
-			int offset = 0;
-			if (vector.isRepeating) { // fill complete list with first value
-				for (int i = 0; offset < childCount; i++) {
-					temp = new Date[(int) lengthVector[i]];
-					for (int j = 0; j < temp.length; j++) {
-						temp[j] = readDate(vector.vector[0]);
-					}
-					offset += temp.length;
-					if (fieldIdx == -1) {
-						vals[i] = temp;
-					} else {
-						((Row) vals[i]).setField(fieldIdx, temp);
-					}
-				}
-			} else {
-				for (int i = 0; offset < childCount; i++) {
-					temp = new Date[(int) lengthVector[i]];
-					for (int j = 0; j < temp.length; j++) {
-						temp[j] = readDate(vector.vector[offset++]);
-					}
-					if (fieldIdx == -1) {
-						vals[i] = temp;
-					} else {
-						((Row) vals[i]).setField(fieldIdx, temp);
-					}
-				}
-			}
-		}
-	}
-
-	private static void readNonNullTimestampColumn(Object[] vals, int fieldIdx, TimestampColumnVector vector, long[] lengthVector, int childCount) {
-
-		// check if the timestamps need to be read into lists or as single values
-		if (lengthVector == null) {
-			if (vector.isRepeating) { // fill complete column with first value
-				if (fieldIdx == -1) { // set as an object
-					for (int i = 0; i < childCount; i++) {
-						// do not reuse value to prevent object mutation
-						vals[i] = readTimestamp(vector.time[0], vector.nanos[0]);
-					}
-				} else { // set as a field of Row
-					Row[] rows = (Row[]) vals;
-					for (int i = 0; i < childCount; i++) {
-						// do not reuse value to prevent object mutation
-						rows[i].setField(fieldIdx, readTimestamp(vector.time[0], vector.nanos[0]));
-					}
-				}
-			} else {
-				if (fieldIdx == -1) { // set as an object
-					for (int i = 0; i < childCount; i++) {
-						vals[i] = readTimestamp(vector.time[i], vector.nanos[i]);
-					}
-				} else { // set as a field of Row
-					Row[] rows = (Row[]) vals;
-					for (int i = 0; i < childCount; i++) {
-						rows[i].setField(fieldIdx, readTimestamp(vector.time[i], vector.nanos[i]));
-					}
-				}
-			}
-		} else {
-			Timestamp[] temp;
-			int offset = 0;
-			if (vector.isRepeating) { // fill complete list with first value
-				for (int i = 0; offset < childCount; i++) {
-					temp = new Timestamp[(int) lengthVector[i]];
-					for (int j = 0; j < temp.length; j++) {
-						// do not reuse value to prevent object mutation
-						temp[j] = readTimestamp(vector.time[0], vector.nanos[0]);
-					}
-					offset += temp.length;
-					if (fieldIdx == -1) {
-						vals[i] = temp;
-					} else {
-						((Row) vals[i]).setField(fieldIdx, temp);
-					}
-				}
-			} else {
-				for (int i = 0; offset < childCount; i++) {
-					temp = new Timestamp[(int) lengthVector[i]];
-					for (int j = 0; j < temp.length; j++) {
-						temp[j] = readTimestamp(vector.time[offset], vector.nanos[offset]);
-						offset++;
-					}
-					if (fieldIdx == -1) {
-						vals[i] = temp;
-					} else {
-						((Row) vals[i]).setField(fieldIdx, temp);
-					}
-				}
-			}
-		}
-	}
-
-	private static void readNonNullDecimalColumn(Object[] vals, int fieldIdx, DecimalColumnVector vector, long[] lengthVector, int childCount) {
-
-		// check if the decimals need to be read into lists or as single values
-		if (lengthVector == null) {
-			if (vector.isRepeating) { // fill complete column with first value
-				fillColumnWithRepeatingValue(vals, fieldIdx, readBigDecimal(vector.vector[0]), childCount);
-			} else {
-				if (fieldIdx == -1) { // set as an object
-					for (int i = 0; i < childCount; i++) {
-						vals[i] = readBigDecimal(vector.vector[i]);
-					}
-				} else { // set as a field of Row
-					Row[] rows = (Row[]) vals;
-					for (int i = 0; i < childCount; i++) {
-						rows[i].setField(fieldIdx, readBigDecimal(vector.vector[i]));
-					}
-				}
-			}
-		} else {
-			BigDecimal[] temp;
-			int offset = 0;
-			if (vector.isRepeating) { // fill complete list with first value
-				BigDecimal repeatingValue = readBigDecimal(vector.vector[0]);
-				for (int i = 0; offset < childCount; i++) {
-					temp = new BigDecimal[(int) lengthVector[i]];
-					Arrays.fill(temp, repeatingValue);
-					offset += temp.length;
-					if (fieldIdx == -1) {
-						vals[i] = temp;
-					} else {
-						((Row) vals[i]).setField(fieldIdx, temp);
-					}
-				}
-			} else {
-				for (int i = 0; offset < childCount; i++) {
-					temp = new BigDecimal[(int) lengthVector[i]];
-					for (int j = 0; j < temp.length; j++) {
-						temp[j] = readBigDecimal(vector.vector[offset++]);
-					}
-					if (fieldIdx == -1) {
-						vals[i] = temp;
-					} else {
-						((Row) vals[i]).setField(fieldIdx, temp);
-					}
-				}
-			}
-		}
-
-	}
-
-	private static void readNonNullStructColumn(Object[] vals, int fieldIdx, StructColumnVector structVector, TypeDescription schema, long[] lengthVector, int childCount) {
-
-		List<TypeDescription> childrenTypes = schema.getChildren();
-
-		int numFields = childrenTypes.size();
-		// create a batch of Rows to read the structs
-		Row[] structs = new Row[childCount];
-		// TODO: possible improvement: reuse existing Row objects
-		for (int i = 0; i < childCount; i++) {
-			structs[i] = new Row(numFields);
-		}
-
-		// read struct fields
-		for (int i = 0; i < numFields; i++) {
-			readField(structs, i, childrenTypes.get(i), structVector.fields[i], null, childCount);
-		}
-
-		// check if the structs need to be read into lists or as single values
-		if (lengthVector == null) {
-			if (fieldIdx == -1) { // set struct as an object
-				System.arraycopy(structs, 0, vals, 0, childCount);
-			} else { // set struct as a field of Row
-				Row[] rows = (Row[]) vals;
-				for (int i = 0; i < childCount; i++) {
-					rows[i].setField(fieldIdx, structs[i]);
-				}
-			}
-		} else { // struct in a list
-			int offset = 0;
-			Row[] temp;
-			for (int i = 0; offset < childCount; i++) {
-				temp = new Row[(int) lengthVector[i]];
-				System.arraycopy(structs, offset, temp, 0, temp.length);
-				offset = offset + temp.length;
-				if (fieldIdx == -1) {
-					vals[i] = temp;
-				} else {
-					((Row) vals[i]).setField(fieldIdx, temp);
-				}
-			}
-		}
-	}
-
-	private static void readNonNullListColumn(Object[] vals, int fieldIdx, ListColumnVector list, TypeDescription schema, long[] lengthVector, int childCount) {
-
-		TypeDescription fieldType = schema.getChildren().get(0);
-		// check if the list need to be read into lists or as single values
-		if (lengthVector == null) {
-			long[] lengthVectorNested = list.lengths;
-			readField(vals, fieldIdx, fieldType, list.child, lengthVectorNested, list.childCount);
-		} else { // list in a list
-			Object[] nestedLists = new Object[childCount];
-			// length vector for nested list
-			long[] lengthVectorNested = list.lengths;
-			// read nested list
-			readField(nestedLists, -1, fieldType, list.child, lengthVectorNested, list.childCount);
-			// get type of nestedList
-			Class<?> classType = nestedLists[0].getClass();
-
-			// fill outer list with nested list
-			int offset = 0;
-			int length;
-			for (int i = 0; offset < childCount; i++) {
-				length = (int) lengthVector[i];
-				Object[] temp = (Object[]) Array.newInstance(classType, length);
-				System.arraycopy(nestedLists, offset, temp, 0, length);
-				offset = offset + length;
-				if (fieldIdx == -1) {
-					vals[i] = temp;
-				} else {
-					((Row) vals[i]).setField(fieldIdx, temp);
-				}
-			}
-		}
-	}
-
-	private static void readNonNullMapColumn(Object[] vals, int fieldIdx, MapColumnVector mapsVector, TypeDescription schema, long[] lengthVector, int childCount) {
-
-		List<TypeDescription> fieldType = schema.getChildren();
-		TypeDescription keyType = fieldType.get(0);
-		TypeDescription valueType = fieldType.get(1);
-
-		ColumnVector keys = mapsVector.keys;
-		ColumnVector values = mapsVector.values;
-		Object[] keyRows = new Object[mapsVector.childCount];
-		Object[] valueRows = new Object[mapsVector.childCount];
-
-		// read map keys and values
-		readField(keyRows, -1, keyType, keys, null, keyRows.length);
-		readField(valueRows, -1, valueType, values, null, valueRows.length);
-
-		// check if the maps need to be read into lists or as single values
-		if (lengthVector == null) {
-			long[] lengthVectorMap = mapsVector.lengths;
-			int offset = 0;
-
-			for (int i = 0; i < childCount; i++) {
-				long numMapEntries = lengthVectorMap[i];
-				HashMap map = readHashMap(keyRows, valueRows, offset, numMapEntries);
-				offset += numMapEntries;
-
-				if (fieldIdx == -1) {
-					vals[i] = map;
-				} else {
-					((Row) vals[i]).setField(fieldIdx, map);
-				}
-			}
-		} else { // list of map
-
-			long[] lengthVectorMap = mapsVector.lengths;
-			int mapOffset = 0; // offset of map element
-			int offset = 0; // offset of map
-			HashMap[] temp;
-
-			for (int i = 0; offset < childCount; i++) {
-				temp = new HashMap[(int) lengthVector[i]];
-				for (int j = 0; j < temp.length; j++) {
-					long numMapEntries = lengthVectorMap[offset];
-					temp[j] = readHashMap(keyRows, valueRows, mapOffset, numMapEntries);
-					mapOffset += numMapEntries;
-					offset++;
-				}
-				if (fieldIdx == 1) {
-					vals[i] = temp;
-				} else {
-					((Row) vals[i]).setField(fieldIdx, temp);
-				}
-			}
-		}
-	}
-
-	private static <T> void readLongColumn(Object[] vals, int fieldIdx, LongColumnVector vector, long[] lengthVector, int childCount,
-											LongFunction<T> reader, IntFunction<T[]> array) {
-
-		// check if the values need to be read into lists or as single values
-		if (lengthVector == null) {
-			if (vector.isRepeating) { // fill complete column with first value
-				// since the column contains null values and has just one distinct value, the repeated value is null
-				fillColumnWithRepeatingValue(vals, fieldIdx, null, childCount);
-			} else {
-				boolean[] isNullVector = vector.isNull;
-				if (fieldIdx == -1) { // set as an object
-					for (int i = 0; i < childCount; i++) {
-						if (isNullVector[i]) {
-							vals[i] = null;
-						} else {
-							vals[i] = reader.apply(vector.vector[i]);
-						}
-					}
-				} else { // set as a field of Row
-					Row[] rows = (Row[]) vals;
-					for (int i = 0; i < childCount; i++) {
-						if (isNullVector[i]) {
-							rows[i].setField(fieldIdx, null);
-						} else {
-							rows[i].setField(fieldIdx, reader.apply(vector.vector[i]));
-						}
-					}
-				}
-			}
-		} else { // in a list
-			if (vector.isRepeating) { // // fill complete list with first value
-				// since the column contains null values and has just one distinct value, the repeated value is null
-				fillListWithRepeatingNull(vals, fieldIdx, lengthVector, childCount, array);
-			} else {
-				// column contain null values
-				int offset = 0;
-				T[] temp;
-				boolean[] isNullVector = vector.isNull;
-				for (int i = 0; offset < childCount; i++) {
-					temp = array.apply((int) lengthVector[i]);
-					for (int j = 0; j < temp.length; j++) {
-						if (isNullVector[offset]) {
-							offset++;
-						} else {
-							temp[j] = reader.apply(vector.vector[offset++]);
-						}
-					}
-					if (fieldIdx == -1) {
-						vals[i] = temp;
-					} else {
-						((Row) vals[i]).setField(fieldIdx, temp);
-					}
-				}
-			}
-		}
-	}
-
-	private static <T> void readDoubleColumn(Object[] vals, int fieldIdx, DoubleColumnVector vector, long[] lengthVector, int childCount,
-												DoubleFunction<T> reader, IntFunction<T[]> array) {
-
-		// check if the values need to be read into lists or as single values
-		if (lengthVector == null) {
-			if (vector.isRepeating) { // fill complete column with first value
-				// since the column contains null values and has just one distinct value, the repeated value is null
-				fillColumnWithRepeatingValue(vals, fieldIdx, null, childCount);
-			} else {
-				boolean[] isNullVector = vector.isNull;
-				if (fieldIdx == -1) { // set as an object
-					for (int i = 0; i < childCount; i++) {
-						if (isNullVector[i]) {
-							vals[i] = null;
-						} else {
-							vals[i] = reader.apply(vector.vector[i]);
-						}
-					}
-				} else { // set as a field of Row
-					Row[] rows = (Row[]) vals;
-					for (int i = 0; i < childCount; i++) {
-						if (isNullVector[i]) {
-							rows[i].setField(fieldIdx, null);
-						} else {
-							rows[i].setField(fieldIdx, reader.apply(vector.vector[i]));
-						}
-					}
-				}
-			}
-		} else { // in a list
-			if (vector.isRepeating) { // // fill complete list with first value
-				// since the column contains null values and has just one distinct value, the repeated value is null
-				fillListWithRepeatingNull(vals, fieldIdx, lengthVector, childCount, array);
-			} else {
-				// column contain null values
-				int offset = 0;
-				T[] temp;
-				boolean[] isNullVector = vector.isNull;
-				for (int i = 0; offset < childCount; i++) {
-					temp = array.apply((int) lengthVector[i]);
-					for (int j = 0; j < temp.length; j++) {
-						if (isNullVector[offset]) {
-							offset++;
-						} else {
-							temp[j] = reader.apply(vector.vector[offset++]);
-						}
-					}
-					if (fieldIdx == -1) {
-						vals[i] = temp;
-					} else {
-						((Row) vals[i]).setField(fieldIdx, temp);
-					}
-				}
-			}
-		}
-	}
-
-	private static void readBytesColumnAsString(Object[] vals, int fieldIdx, BytesColumnVector bytes, long[] lengthVector, int childCount) {
-
-		// check if the values need to be read into lists or as single values
-		if (lengthVector == null) {
-			if (bytes.isRepeating) { // fill complete column with first value
-				// since the column contains null values and has just one distinct value, the repeated value is null
-				fillColumnWithRepeatingValue(vals, fieldIdx, null, childCount);
-			} else {
-				boolean[] isNullVector = bytes.isNull;
-				if (fieldIdx == -1) { // set as an object
-					for (int i = 0; i < childCount; i++) {
-						if (isNullVector[i]) {
-							vals[i] = null;
-						} else {
-							vals[i] = new String(bytes.vector[i], bytes.start[i], bytes.length[i]);
-						}
-					}
-				} else { // set as a field of Row
-					Row[] rows = (Row[]) vals;
-					for (int i = 0; i < childCount; i++) {
-						if (isNullVector[i]) {
-							rows[i].setField(fieldIdx, null);
-						} else {
-							rows[i].setField(fieldIdx, new String(bytes.vector[i], bytes.start[i], bytes.length[i]));
-						}
-					}
-				}
-			}
-		} else { // in a list
-			if (bytes.isRepeating) { // fill list with first value
-				// since the column contains null values and has just one distinct value, the repeated value is null
-				fillListWithRepeatingNull(vals, fieldIdx, lengthVector, childCount, OrcUtils::stringArray);
-			} else {
-				int offset = 0;
-				String[] temp;
-				boolean[] isNullVector = bytes.isNull;
-				for (int i = 0; offset < childCount; i++) {
-					temp = new String[(int) lengthVector[i]];
-					for (int j = 0; j < temp.length; j++) {
-						if (isNullVector[offset]) {
-							offset++;
-						} else {
-							temp[j] = new String(bytes.vector[offset], bytes.start[offset], bytes.length[offset]);
-							offset++;
-						}
-					}
-					if (fieldIdx == -1) {
-						vals[i] = temp;
-					} else {
-						((Row) vals[i]).setField(fieldIdx, temp);
-					}
-				}
-			}
-		}
-	}
-
-	private static void readBytesColumnAsBinary(Object[] vals, int fieldIdx, BytesColumnVector bytes, long[] lengthVector, int childCount) {
-
-		// check if the binary need to be read into lists or as single values
-		if (lengthVector == null) {
-			if (bytes.isRepeating) { // fill complete column with first value
-				// since the column contains null values and has just one distinct value, the repeated value is null
-				fillColumnWithRepeatingValue(vals, fieldIdx, null, childCount);
-			} else {
-				boolean[] isNullVector = bytes.isNull;
-				if (fieldIdx == -1) { // set as an object
-					for (int i = 0; i < childCount; i++) {
-						if (isNullVector[i]) {
-							vals[i] = null;
-						} else {
-							vals[i] = readBinary(bytes.vector[i], bytes.start[i], bytes.length[i]);
-						}
-					}
-				} else { // set as a field of Row
-					Row[] rows = (Row[]) vals;
-					for (int i = 0; i < childCount; i++) {
-						if (isNullVector[i]) {
-							rows[i].setField(fieldIdx, null);
-						} else {
-							rows[i].setField(fieldIdx, readBinary(bytes.vector[i], bytes.start[i], bytes.length[i]));
-						}
-					}
-				}
-			}
-		} else {
-			if (bytes.isRepeating) { // fill complete list with first value
-				// since the column contains null values and has just one distinct value, the repeated value is null
-				fillListWithRepeatingNull(vals, fieldIdx, lengthVector, childCount, OrcUtils::binaryArray);
-			} else {
-				int offset = 0;
-				byte[][] temp;
-				boolean[] isNullVector = bytes.isNull;
-				for (int i = 0; offset < childCount; i++) {
-					temp = new byte[(int) lengthVector[i]][];
-					for (int j = 0; j < temp.length; j++) {
-						if (isNullVector[offset]) {
-							offset++;
-						} else {
-							temp[j] = readBinary(bytes.vector[offset], bytes.start[offset], bytes.length[offset]);
-							offset++;
-						}
-					}
-					if (fieldIdx == -1) {
-						vals[i] = temp;
-					} else {
-						((Row) vals[i]).setField(fieldIdx, temp);
-					}
-				}
-			}
-		}
-	}
-
-	private static void readLongColumnAsDate(Object[] vals, int fieldIdx, LongColumnVector vector, long[] lengthVector, int childCount) {
-
-		// check if the values need to be read into lists or as single values
-		if (lengthVector == null) {
-			if (vector.isRepeating) { // fill complete column with first value
-				// since the column contains null values and has just one distinct value, the repeated value is null
-				fillColumnWithRepeatingValue(vals, fieldIdx, null, childCount);
-			} else {
-				boolean[] isNullVector = vector.isNull;
-				if (fieldIdx == -1) { // set as an object
-					for (int i = 0; i < childCount; i++) {
-						if (isNullVector[i]) {
-							vals[i] = null;
-						} else {
-							vals[i] = readDate(vector.vector[i]);
-						}
-					}
-				} else { // set as a field of Row
-					Row[] rows = (Row[]) vals;
-					for (int i = 0; i < childCount; i++) {
-						if (isNullVector[i]) {
-							rows[i].setField(fieldIdx, null);
-						} else {
-							rows[i].setField(fieldIdx, readDate(vector.vector[i]));
-						}
-					}
-				}
-			}
-		} else { // in a list
-			if (vector.isRepeating) { // // fill complete list with first value
-				// since the column contains null values and has just one distinct value, the repeated value is null
-				fillListWithRepeatingNull(vals, fieldIdx, lengthVector, childCount, OrcUtils::dateArray);
-			} else {
-				// column contain null values
-				int offset = 0;
-				Date[] temp;
-				boolean[] isNullVector = vector.isNull;
-				for (int i = 0; offset < childCount; i++) {
-					temp = new Date[(int) lengthVector[i]];
-					for (int j = 0; j < temp.length; j++) {
-						if (isNullVector[offset]) {
-							offset++;
-						} else {
-							temp[j] = readDate(vector.vector[offset++]);
-						}
-					}
-					if (fieldIdx == -1) {
-						vals[i] = temp;
-					} else {
-						((Row) vals[i]).setField(fieldIdx, temp);
-					}
-				}
-			}
-		}
-	}
-
-	private static void readTimestampColumn(Object[] vals, int fieldIdx, TimestampColumnVector vector, long[] lengthVector, int childCount) {
-
-		// check if the timestamps need to be read into lists or as single values
-		if (lengthVector == null) {
-			if (vector.isRepeating) { // fill complete column with first value
-				// since the column contains null values and has just one distinct value, the repeated value is null
-				fillColumnWithRepeatingValue(vals, fieldIdx, null, childCount);
-			} else {
-				boolean[] isNullVector = vector.isNull;
-				if (fieldIdx == -1) { // set as an object
-					for (int i = 0; i < childCount; i++) {
-						if (isNullVector[i]) {
-							vals[i] = null;
-						} else {
-							Timestamp ts = readTimestamp(vector.time[i], vector.nanos[i]);
-							vals[i] = ts;
-						}
-					}
-				} else { // set as a field of Row
-					Row[] rows = (Row[]) vals;
-					for (int i = 0; i < childCount; i++) {
-						if (isNullVector[i]) {
-							rows[i].setField(fieldIdx, null);
-						} else {
-							Timestamp ts = readTimestamp(vector.time[i], vector.nanos[i]);
-							rows[i].setField(fieldIdx, ts);
-						}
-					}
-				}
-			}
-		} else {
-			if (vector.isRepeating) { // fill complete list with first value
-				// since the column contains null values and has just one distinct value, the repeated value is null
-				fillListWithRepeatingNull(vals, fieldIdx, lengthVector, childCount, OrcUtils::timestampArray);
-			} else {
-				int offset = 0;
-				Timestamp[] temp;
-				boolean[] isNullVector = vector.isNull;
-				for (int i = 0; offset < childCount; i++) {
-					temp = new Timestamp[(int) lengthVector[i]];
-					for (int j = 0; j < temp.length; j++) {
-						if (isNullVector[offset]) {
-							offset++;
-						} else {
-							temp[j] = readTimestamp(vector.time[offset], vector.nanos[offset]);
-							offset++;
-						}
-					}
-					if (fieldIdx == -1) {
-						vals[i] = temp;
-					} else {
-						((Row) vals[i]).setField(fieldIdx, temp);
-					}
-				}
-			}
-		}
-	}
-
-	private static void readDecimalColumn(Object[] vals, int fieldIdx, DecimalColumnVector vector, long[] lengthVector, int childCount) {
-
-		// check if the decimals need to be read into lists or as single values
-		if (lengthVector == null) {
-			if (vector.isRepeating) { // fill complete column with first value
-				// since the column contains null values and has just one distinct value, the repeated value is null
-				fillColumnWithRepeatingValue(vals, fieldIdx, null, childCount);
-			} else {
-				boolean[] isNullVector = vector.isNull;
-				if (fieldIdx == -1) { // set as an object
-					for (int i = 0; i < childCount; i++) {
-						if (isNullVector[i]) {
-							vals[i] = null;
-						} else {
-							vals[i] = readBigDecimal(vector.vector[i]);
-						}
-					}
-				} else { // set as a field of Row
-					Row[] rows = (Row[]) vals;
-					for (int i = 0; i < childCount; i++) {
-						if (isNullVector[i]) {
-							rows[i].setField(fieldIdx, null);
-						} else {
-							rows[i].setField(fieldIdx, readBigDecimal(vector.vector[i]));
-						}
-					}
-				}
-			}
-		} else {
-			if (vector.isRepeating) { // fill complete list with first value
-				// since the column contains null values and has just one distinct value, the repeated value is null
-				fillListWithRepeatingNull(vals, fieldIdx, lengthVector, childCount, OrcUtils::decimalArray);
-			} else {
-				int offset = 0;
-				BigDecimal[] temp;
-				boolean[] isNullVector = vector.isNull;
-				for (int i = 0; offset < childCount; i++) {
-					temp = new BigDecimal[(int) lengthVector[i]];
-					for (int j = 0; j < temp.length; j++) {
-						if (isNullVector[offset]) {
-							offset++;
-						} else {
-							temp[j] = readBigDecimal(vector.vector[offset++]);
-						}
-					}
-					if (fieldIdx == -1) {
-						vals[i] = temp;
-					} else {
-						((Row) vals[i]).setField(fieldIdx, temp);
-					}
-				}
-			}
-		}
-	}
-
-	private static void readStructColumn(Object[] vals, int fieldIdx, StructColumnVector structVector, TypeDescription schema, long[] lengthVector, int childCount) {
-
-		List<TypeDescription> childrenTypes = schema.getChildren();
-
-		int numFields = childrenTypes.size();
-		// create a batch of Rows to read the structs
-		Row[] structs = new Row[childCount];
-		// TODO: possible improvement: reuse existing Row objects
-		for (int i = 0; i < childCount; i++) {
-			structs[i] = new Row(numFields);
-		}
-
-		// read struct fields
-		for (int i = 0; i < numFields; i++) {
-			readField(structs, i, childrenTypes.get(i), structVector.fields[i], null, childCount);
-		}
-
-		boolean[] isNullVector = structVector.isNull;
-
-		// check if the structs need to be read into lists or as single values
-		if (lengthVector == null) {
-			if (fieldIdx == -1) { // set struct as an object
-				for (int i = 0; i < childCount; i++) {
-					if (isNullVector[i]) {
-						vals[i] = null;
-					} else {
-						vals[i] = structs[i];
-					}
-				}
-			} else { // set struct as a field of Row
-				Row[] rows = (Row[]) vals;
-				for (int i = 0; i < childCount; i++) {
-					if (isNullVector[i]) {
-						rows[i].setField(fieldIdx, null);
-					} else {
-						rows[i].setField(fieldIdx, structs[i]);
-					}
-				}
-			}
-		} else { // struct in a list
-			int offset = 0;
-			Row[] temp;
-			for (int i = 0; offset < childCount; i++) {
-				temp = new Row[(int) lengthVector[i]];
-				for (int j = 0; j < temp.length; j++) {
-					if (isNullVector[offset]) {
-						temp[j] = null;
-					} else {
-						temp[j] = structs[offset++];
-					}
-				}
-				if (fieldIdx == -1) { // set list of structs as an object
-					vals[i] = temp;
-				} else { // set list of structs as field of row
-					((Row) vals[i]).setField(fieldIdx, temp);
-				}
-			}
-		}
-	}
-
-	private static void readListColumn(Object[] vals, int fieldIdx, ListColumnVector list, TypeDescription schema, long[] lengthVector, int childCount) {
-
-		TypeDescription fieldType = schema.getChildren().get(0);
-		// check if the lists need to be read into lists or as single values
-		if (lengthVector == null) {
-			long[] lengthVectorNested = list.lengths;
-			readField(vals, fieldIdx, fieldType, list.child, lengthVectorNested, list.childCount);
-		} else { // list in a list
-			Object[] nestedList = new Object[childCount];
-			// length vector for nested list
-			long[] lengthVectorNested = list.lengths;
-			// read nested list
-			readField(nestedList, -1, fieldType, list.child, lengthVectorNested, list.childCount);
-
-			// fill outer list with nested list
-			int offset = 0;
-			int length;
-			// get type of nestedList
-			Class<?> classType = nestedList[0].getClass();
-			for (int i = 0; offset < childCount; i++) {
-				length = (int) lengthVector[i];
-				Object[] temp = (Object[]) Array.newInstance(classType, length);
-				System.arraycopy(nestedList, offset, temp, 0, length);
-				offset = offset + length;
-				if (fieldIdx == -1) { // set list of list as an object
-					vals[i] = temp;
-				} else { // set list of list as field of row
-					((Row) vals[i]).setField(fieldIdx, temp);
-				}
-			}
-		}
-	}
-
-	private static void readMapColumn(Object[] vals, int fieldIdx, MapColumnVector map, TypeDescription schema, long[] lengthVector, int childCount) {
-
-		List<TypeDescription> fieldType = schema.getChildren();
-		TypeDescription keyType = fieldType.get(0);
-		TypeDescription valueType = fieldType.get(1);
-
-		ColumnVector keys = map.keys;
-		ColumnVector values = map.values;
-		Object[] keyRows = new Object[map.childCount];
-		Object[] valueRows = new Object[map.childCount];
-
-		// read map kes and values
-		readField(keyRows, -1, keyType, keys, null, keyRows.length);
-		readField(valueRows, -1, valueType, values, null, valueRows.length);
-
-		boolean[] isNullVector = map.isNull;
-
-		// check if the maps need to be read into lists or as single values
-		if (lengthVector == null) {
-			long[] lengthVectorMap = map.lengths;
-			int offset = 0;
-			if (fieldIdx == -1) { // set map as an object
-				for (int i = 0; i < childCount; i++) {
-					if (isNullVector[i]) {
-						vals[i] = null;
-					} else {
-						vals[i] = readHashMap(keyRows, valueRows, offset, lengthVectorMap[i]);
-						offset += lengthVectorMap[i];
-					}
-				}
-			} else { // set map as a field of Row
-				Row[] rows = (Row[]) vals;
-				for (int i = 0; i < childCount; i++) {
-					if (isNullVector[i]) {
-						rows[i].setField(fieldIdx, null);
-					} else {
-						rows[i].setField(fieldIdx, readHashMap(keyRows, valueRows, offset, lengthVectorMap[i]));
-						offset += lengthVectorMap[i];
-					}
-				}
-			}
-		} else { // list of map
-			long[] lengthVectorMap = map.lengths;
-			int mapOffset = 0; // offset of map element
-			int offset = 0; // offset of map
-			HashMap[] temp;
-
-			for (int i = 0; offset < childCount; i++) {
-				temp = new HashMap[(int) lengthVector[i]];
-				for (int j = 0; j < temp.length; j++) {
-					if (isNullVector[offset]) {
-						temp[j] = null;
-					} else {
-						temp[j] = readHashMap(keyRows, valueRows, mapOffset, lengthVectorMap[offset]);
-						mapOffset += lengthVectorMap[offset];
-						offset++;
-					}
-				}
-				if (fieldIdx == -1) {
-					vals[i] = temp;
-				} else {
-					((Row) vals[i]).setField(fieldIdx, temp);
-				}
-			}
-		}
-	}
-
-	/**
-	 * Sets a repeating value to all objects or row fields of the passed vals array.
-	 *
-	 * @param vals The array of objects or Rows.
-	 * @param fieldIdx If the objs array is an array of Row, the index of the field that needs to be filled.
-	 *                 Otherwise a -1 must be passed and the data is directly filled into the array.
-	 * @param repeatingValue The value that is set.
-	 * @param childCount The number of times the value is set.
-	 */
-	private static void fillColumnWithRepeatingValue(Object[] vals, int fieldIdx, Object repeatingValue, int childCount) {
-
-		if (fieldIdx == -1) {
-			// set value as an object
-			Arrays.fill(vals, 0, childCount, repeatingValue);
-		} else {
-			// set value as a field of Row
-			Row[] rows = (Row[]) vals;
-			for (int i = 0; i < childCount; i++) {
-				rows[i].setField(fieldIdx, repeatingValue);
-			}
-		}
-	}
-
-	/**
-	 * Sets arrays containing only null values to all objects or row fields of the passed vals array.
-	 *
-	 * @param vals The array of objects or Rows to which the empty arrays are set.
-	 * @param fieldIdx If the objs array is an array of Row, the index of the field that needs to be filled.
-	 *                 Otherwise a -1 must be passed and the data is directly filled into the array.
-	 * @param lengthVector The vector containing the lengths of the individual empty arrays.
-	 * @param childCount The number of objects or Rows to fill.
-	 * @param array A method to create arrays of the appropriate type.
-	 * @param <T> The type of the arrays to create.
-	 */
-	private static <T> void fillListWithRepeatingNull(Object[] vals, int fieldIdx, long[] lengthVector, int childCount, IntFunction<T[]> array) {
-
-		if (fieldIdx == -1) {
-			// set empty array as object
-			for (int i = 0; i < childCount; i++) {
-				vals[i] = array.apply((int) lengthVector[i]);
-			}
-		} else {
-			// set empty array as field in Row
-			Row[] rows = (Row[]) vals;
-			for (int i = 0; i < childCount; i++) {
-				rows[i].setField(fieldIdx, array.apply((int) lengthVector[i]));
-			}
-		}
-	}
-
-	private static Boolean readBoolean(long l) {
-		return l != 0;
-	}
-
-	private static Byte readByte(long l) {
-		return (byte) l;
-	}
-
-	private static Short readShort(long l) {
-		return (short) l;
-	}
-
-	private static Integer readInt(long l) {
-		return (int) l;
-	}
-
-	private static Long readLong(long l) {
-		return l;
-	}
-
-	private static Float readFloat(double d) {
-		return (float) d;
-	}
-
-	private static Double readDouble(double d) {
-		return d;
-	}
-
-	private static Date readDate(long l) {
-		// day to milliseconds
-		final long t = l * MILLIS_PER_DAY;
-		// adjust by local timezone
-		return new java.sql.Date(t - LOCAL_TZ.getOffset(t));
-	}
-
-	private static byte[] readBinary(byte[] src, int srcPos, int length) {
-		byte[] result = new byte[length];
-		System.arraycopy(src, srcPos, result, 0, length);
-		return result;
-	}
-
-	private static BigDecimal readBigDecimal(HiveDecimalWritable hiveDecimalWritable) {
-		HiveDecimal hiveDecimal = hiveDecimalWritable.getHiveDecimal();
-		return hiveDecimal.bigDecimalValue();
-	}
-
-	private static Timestamp readTimestamp(long time, int nanos) {
-		Timestamp ts = new Timestamp(time);
-		ts.setNanos(nanos);
-		return ts;
-	}
-
-	private static HashMap readHashMap(Object[] keyRows, Object[] valueRows, int offset, long length) {
-		HashMap<Object, Object> resultMap = new HashMap<>();
-		for (int j = 0; j < length; j++) {
-			resultMap.put(keyRows[offset], valueRows[offset]);
-			offset++;
-		}
-		return resultMap;
-	}
-
-	private static Boolean[] boolArray(int len) {
-		return new Boolean[len];
-	}
-
-	private static Byte[] byteArray(int len) {
-		return new Byte[len];
-	}
-
-	private static Short[] shortArray(int len) {
-		return new Short[len];
-	}
-
-	private static Integer[] intArray(int len) {
-		return new Integer[len];
-	}
-
-	private static Long[] longArray(int len) {
-		return new Long[len];
-	}
-
-	private static Float[] floatArray(int len) {
-		return new Float[len];
-	}
-
-	private static Double[] doubleArray(int len) {
-		return new Double[len];
-	}
-
-	private static Date[] dateArray(int len) {
-		return new Date[len];
-	}
-
-	private static byte[][] binaryArray(int len) {
-		return new byte[len][];
-	}
-
-	private static String[] stringArray(int len) {
-		return new String[len];
-	}
-
-	private static BigDecimal[] decimalArray(int len) {
-		return new BigDecimal[len];
-	}
-
-	private static Timestamp[] timestampArray(int len) {
-		return new Timestamp[len];
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/bcead3be/flink-connectors/flink-orc/src/test/java/org/apache/flink/orc/OrcBatchReaderTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-orc/src/test/java/org/apache/flink/orc/OrcBatchReaderTest.java b/flink-connectors/flink-orc/src/test/java/org/apache/flink/orc/OrcBatchReaderTest.java
new file mode 100644
index 0000000..b90313e
--- /dev/null
+++ b/flink-connectors/flink-orc/src/test/java/org/apache/flink/orc/OrcBatchReaderTest.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.orc;
+
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.java.typeutils.MapTypeInfo;
+import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+
+import org.apache.orc.TypeDescription;
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ * Unit tests for {@link OrcBatchReader}.
+ *
+ */
+public class OrcBatchReaderTest {
+
+	@Test
+	public void testFlatSchemaToTypeInfo1() {
+
+		String schema =
+			"struct<" +
+				"boolean1:boolean," +
+				"byte1:tinyint," +
+				"short1:smallint," +
+				"int1:int," +
+				"long1:bigint," +
+				"float1:float," +
+				"double1:double," +
+				"bytes1:binary," +
+				"string1:string," +
+				"date1:date," +
+				"timestamp1:timestamp," +
+				"decimal1:decimal(5,2)" +
+			">";
+		TypeInformation typeInfo = OrcBatchReader.schemaToTypeInfo(TypeDescription.fromString(schema));
+
+		Assert.assertNotNull(typeInfo);
+		Assert.assertTrue(typeInfo instanceof RowTypeInfo);
+		RowTypeInfo rowTypeInfo = (RowTypeInfo) typeInfo;
+
+		// validate field types
+		Assert.assertArrayEquals(
+			new TypeInformation[]{
+				Types.BOOLEAN, Types.BYTE, Types.SHORT, Types.INT, Types.LONG, Types.FLOAT, Types.DOUBLE,
+				PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO, Types.STRING,
+				Types.SQL_DATE, Types.SQL_TIMESTAMP, BasicTypeInfo.BIG_DEC_TYPE_INFO
+			},
+			rowTypeInfo.getFieldTypes());
+
+		// validate field names
+		Assert.assertArrayEquals(
+			new String[] {
+				"boolean1", "byte1", "short1", "int1", "long1", "float1", "double1",
+				"bytes1", "string1", "date1", "timestamp1", "decimal1"
+			},
+			rowTypeInfo.getFieldNames());
+
+	}
+
+	@Test
+	public void testNestedSchemaToTypeInfo1() {
+
+		String schema =
+			"struct<" +
+				"middle:struct<" +
+					"list:array<" +
+						"struct<" +
+							"int1:int," +
+							"string1:string" +
+						">" +
+					">" +
+				">," +
+				"list:array<" +
+					"struct<" +
+						"int1:int," +
+						"string1:string" +
+					">" +
+				">," +
+				"map:map<" +
+					"string," +
+					"struct<" +
+						"int1:int," +
+						"string1:string" +
+					">" +
+				">" +
+			">";
+		TypeInformation typeInfo = OrcBatchReader.schemaToTypeInfo(TypeDescription.fromString(schema));
+
+		Assert.assertNotNull(typeInfo);
+		Assert.assertTrue(typeInfo instanceof RowTypeInfo);
+		RowTypeInfo rowTypeInfo = (RowTypeInfo) typeInfo;
+
+		// validate field types
+		Assert.assertArrayEquals(
+			new TypeInformation[]{
+				Types.ROW_NAMED(
+					new String[]{"list"},
+					ObjectArrayTypeInfo.getInfoFor(
+						Types.ROW_NAMED(
+							new String[]{"int1", "string1"},
+							Types.INT, Types.STRING
+						)
+					)
+				),
+				ObjectArrayTypeInfo.getInfoFor(
+					Types.ROW_NAMED(
+						new String[]{"int1", "string1"},
+						Types.INT, Types.STRING
+					)
+				),
+				new MapTypeInfo<>(
+					Types.STRING,
+					Types.ROW_NAMED(
+						new String[]{"int1", "string1"},
+						Types.INT, Types.STRING
+					)
+				)
+			},
+			rowTypeInfo.getFieldTypes());
+
+		// validate field names
+		Assert.assertArrayEquals(
+			new String[] {"middle", "list", "map"},
+			rowTypeInfo.getFieldNames());
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/bcead3be/flink-connectors/flink-orc/src/test/java/org/apache/flink/orc/OrcRowInputFormatTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-orc/src/test/java/org/apache/flink/orc/OrcRowInputFormatTest.java b/flink-connectors/flink-orc/src/test/java/org/apache/flink/orc/OrcRowInputFormatTest.java
index 0efe41f..2eb3231 100644
--- a/flink-connectors/flink-orc/src/test/java/org/apache/flink/orc/OrcRowInputFormatTest.java
+++ b/flink-connectors/flink-orc/src/test/java/org/apache/flink/orc/OrcRowInputFormatTest.java
@@ -26,6 +26,7 @@ import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo;
 import org.apache.flink.api.java.typeutils.RowTypeInfo;
 import org.apache.flink.core.fs.FileInputSplit;
 import org.apache.flink.core.fs.Path;
+import org.apache.flink.orc.util.OrcTestFileGenerator;
 import org.apache.flink.types.Row;
 import org.apache.flink.util.InstantiationUtil;
 
@@ -50,6 +51,7 @@ import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 import static org.mockito.Matchers.eq;
 import static org.mockito.Mockito.any;
@@ -124,6 +126,32 @@ public class OrcRowInputFormatTest {
 	private static final String TEST_FILE_NESTEDLIST = "test-data-nestedlist.orc";
 	private static final String TEST_SCHEMA_NESTEDLIST = "struct<mylist1:array<array<struct<mylong1:bigint>>>>";
 
+	/** Generated by {@link OrcTestFileGenerator#writeCompositeTypesWithNullsFile(String)}. */
+	private static final String TEST_FILE_COMPOSITES_NULLS = "test-data-composites-with-nulls.orc";
+	private static final String TEST_SCHEMA_COMPOSITES_NULLS =
+		"struct<" +
+			"int1:int," +
+			"record1:struct<f1:int,f2:string>," +
+			"list1:array<array<array<struct<f1:string,f2:string>>>>," +
+			"list2:array<map<string,int>>" +
+		">";
+
+	/** Generated by {@link OrcTestFileGenerator#writeCompositeTypesWithRepeatingFile(String)}. */
+	private static final String TEST_FILE_REPEATING = "test-data-repeating.orc";
+	private static final String TEST_SCHEMA_REPEATING =
+		"struct<" +
+			"int1:int," +
+			"int2:int," +
+			"int3:int," +
+			"record1:struct<f1:int,f2:string>," +
+			"record2:struct<f1:int,f2:string>," +
+			"list1:array<int>," +
+			"list2:array<int>," +
+			"list3:array<int>," +
+			"map1:map<int,string>," +
+			"map2:map<int,string>" +
+		">";
+
 	@Test(expected = FileNotFoundException.class)
 	public void testInvalidPath() throws IOException{
 		rowOrcInputFormat =
@@ -477,7 +505,7 @@ public class OrcRowInputFormatTest {
 	}
 
 	@Test
-	public void testReadNestedFile() throws IOException{
+	public void testReadNestedFile() throws IOException {
 		rowOrcInputFormat = new OrcRowInputFormat(getPath(TEST_FILE_NESTED), TEST_SCHEMA_NESTED, new Configuration());
 
 		FileInputSplit[] splits = rowOrcInputFormat.createInputSplits(1);
@@ -563,7 +591,7 @@ public class OrcRowInputFormatTest {
 	}
 
 	@Test
-	public void testReadTimeTypeFile() throws IOException{
+	public void testReadTimeTypeFile() throws IOException {
 		rowOrcInputFormat = new OrcRowInputFormat(getPath(TEST_FILE_TIMETYPES), TEST_SCHEMA_TIMETYPES, new Configuration());
 
 		FileInputSplit[] splits = rowOrcInputFormat.createInputSplits(1);
@@ -590,7 +618,7 @@ public class OrcRowInputFormatTest {
 	}
 
 	@Test
-	public void testReadDecimalTypeFile() throws IOException{
+	public void testReadDecimalTypeFile() throws IOException {
 		rowOrcInputFormat = new OrcRowInputFormat(getPath(TEST_FILE_DECIMAL), TEST_SCHEMA_DECIMAL, new Configuration());
 
 		FileInputSplit[] splits = rowOrcInputFormat.createInputSplits(1);
@@ -653,7 +681,183 @@ public class OrcRowInputFormatTest {
 	}
 
 	@Test
-	public void testReadWithProjection() throws IOException{
+	public void testReadCompositesNullsFile() throws Exception {
+		rowOrcInputFormat = new OrcRowInputFormat(
+			getPath(TEST_FILE_COMPOSITES_NULLS),
+			TEST_SCHEMA_COMPOSITES_NULLS,
+			new Configuration());
+
+		FileInputSplit[] splits = rowOrcInputFormat.createInputSplits(1);
+		assertEquals(1, splits.length);
+		rowOrcInputFormat.openInputFormat();
+		rowOrcInputFormat.open(splits[0]);
+
+		assertFalse(rowOrcInputFormat.reachedEnd());
+
+		Row row = null;
+		long cnt = 0;
+
+		int structNullCnt = 0;
+		int nestedListNullCnt = 0;
+		int mapListNullCnt = 0;
+
+		// read all rows
+		while (!rowOrcInputFormat.reachedEnd()) {
+
+			row = rowOrcInputFormat.nextRecord(row);
+			assertEquals(4, row.getArity());
+
+			assertTrue(row.getField(0) instanceof Integer);
+
+			if (row.getField(1) == null) {
+				structNullCnt++;
+			} else {
+				Object f = row.getField(1);
+				assertTrue(f instanceof Row);
+				assertEquals(2, ((Row) f).getArity());
+			}
+
+			if (row.getField(2) == null) {
+				nestedListNullCnt++;
+			} else {
+				Object f = row.getField(2);
+				assertTrue(f instanceof Row[][][]);
+				assertEquals(4, ((Row[][][]) f).length);
+			}
+
+			if (row.getField(3) == null) {
+				mapListNullCnt++;
+			} else {
+				Object f = row.getField(3);
+				assertTrue(f instanceof HashMap[]);
+				assertEquals(3, ((HashMap[]) f).length);
+			}
+			cnt++;
+		}
+		// number of rows in file
+		assertEquals(2500, cnt);
+		// check number of null fields
+		assertEquals(1250, structNullCnt);
+		assertEquals(835, nestedListNullCnt);
+		assertEquals(835, mapListNullCnt);
+	}
+
+	@SuppressWarnings("unchecked")
+	@Test
+	public void testReadRepeatingValuesFile() throws IOException {
+		rowOrcInputFormat = new OrcRowInputFormat(
+			getPath(TEST_FILE_REPEATING),
+			TEST_SCHEMA_REPEATING,
+			new Configuration());
+
+		FileInputSplit[] splits = rowOrcInputFormat.createInputSplits(1);
+		assertEquals(1, splits.length);
+		rowOrcInputFormat.openInputFormat();
+		rowOrcInputFormat.open(splits[0]);
+
+		assertFalse(rowOrcInputFormat.reachedEnd());
+
+		Row row = null;
+		long cnt = 0;
+
+		Row firstRow1 = null;
+		Integer[] firstList1 = null;
+		HashMap firstMap1 = null;
+
+		// read all rows
+		while (!rowOrcInputFormat.reachedEnd()) {
+
+			cnt++;
+			row = rowOrcInputFormat.nextRecord(row);
+			assertEquals(10, row.getArity());
+
+			// check first int field (always 42)
+			assertNotNull(row.getField(0));
+			assertTrue(row.getField(0) instanceof Integer);
+			assertEquals(42, ((Integer) row.getField(0)).intValue());
+
+			// check second int field (always null)
+			assertNull(row.getField(1));
+
+			// check first int field (always 99)
+			assertNotNull(row.getField(2));
+			assertTrue(row.getField(2) instanceof Integer);
+			assertEquals(99, ((Integer) row.getField(2)).intValue());
+
+			// check first row field (always (23, null))
+			assertNotNull(row.getField(3));
+			assertTrue(row.getField(3) instanceof Row);
+			Row nestedRow = (Row) row.getField(3);
+			// check first field of nested row
+			assertNotNull(nestedRow.getField(0));
+			assertTrue(nestedRow.getField(0) instanceof Integer);
+			assertEquals(23, ((Integer) nestedRow.getField(0)).intValue());
+			// check second field of nested row
+			assertNull(nestedRow.getField(1));
+			// validate reference
+			if (firstRow1 == null) {
+				firstRow1 = nestedRow;
+			} else {
+				// repeated rows must be different instances
+				assertTrue(firstRow1 != nestedRow);
+			}
+
+			// check second row field (always null)
+			assertNull(row.getField(4));
+
+			// check first list field (always [1, 2, 3])
+			assertNotNull(row.getField(5));
+			assertTrue(row.getField(5) instanceof Integer[]);
+			Integer[] list1 = ((Integer[]) row.getField(5));
+			assertEquals(1, list1[0].intValue());
+			assertEquals(2, list1[1].intValue());
+			assertEquals(3, list1[2].intValue());
+			// validate reference
+			if (firstList1 == null) {
+				firstList1 = list1;
+			} else {
+				// repeated list must be different instances
+				assertTrue(firstList1 != list1);
+			}
+
+			// check second list field (always [7, 7, 7])
+			assertNotNull(row.getField(6));
+			assertTrue(row.getField(6) instanceof Integer[]);
+			Integer[] list2 = ((Integer[]) row.getField(6));
+			assertEquals(7, list2[0].intValue());
+			assertEquals(7, list2[1].intValue());
+			assertEquals(7, list2[2].intValue());
+
+			// check third list field (always null)
+			assertNull(row.getField(7));
+
+			// check first map field (always {2->"Hello", 4->"Hello})
+			assertNotNull(row.getField(8));
+			assertTrue(row.getField(8) instanceof HashMap);
+			HashMap<Integer, String> map = (HashMap<Integer, String>) row.getField(8);
+			assertEquals(2, map.size());
+			assertEquals("Hello", map.get(2));
+			assertEquals("Hello", map.get(4));
+			// validate reference
+			if (firstMap1 == null) {
+				firstMap1 = map;
+			} else {
+				// repeated list must be different instances
+				assertTrue(firstMap1 != map);
+			}
+
+			// check second map field (always null)
+			assertNull(row.getField(9));
+		}
+
+		rowOrcInputFormat.close();
+		rowOrcInputFormat.closeInputFormat();
+
+		assertEquals(256, cnt);
+	}
+
+	@Test
+	public void testReadWithProjection() throws IOException {
 		rowOrcInputFormat = new OrcRowInputFormat(getPath(TEST_FILE_NESTED), TEST_SCHEMA_NESTED, new Configuration());
 
 		rowOrcInputFormat.selectFields(7, 0, 10, 8);
@@ -691,7 +895,7 @@ public class OrcRowInputFormatTest {
 	}
 
 	@Test
-	public void testReadFileInSplits() throws IOException{
+	public void testReadFileInSplits() throws IOException {
 
 		rowOrcInputFormat = new OrcRowInputFormat(getPath(TEST_FILE_FLAT), TEST_SCHEMA_FLAT, new Configuration());
 		rowOrcInputFormat.selectFields(0, 1);
@@ -717,7 +921,7 @@ public class OrcRowInputFormatTest {
 	}
 
 	@Test
-	public void testReadFileWithFilter() throws IOException{
+	public void testReadFileWithFilter() throws IOException {
 
 		rowOrcInputFormat = new OrcRowInputFormat(getPath(TEST_FILE_FLAT), TEST_SCHEMA_FLAT, new Configuration());
 		rowOrcInputFormat.selectFields(0, 1);
@@ -751,7 +955,7 @@ public class OrcRowInputFormatTest {
 	}
 
 	@Test
-	public void testReadFileWithEvolvedSchema() throws IOException{
+	public void testReadFileWithEvolvedSchema() throws IOException {
 
 		rowOrcInputFormat = new OrcRowInputFormat(
 			getPath(TEST_FILE_FLAT),

http://git-wip-us.apache.org/repos/asf/flink/blob/bcead3be/flink-connectors/flink-orc/src/test/java/org/apache/flink/orc/OrcUtilsTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-orc/src/test/java/org/apache/flink/orc/OrcUtilsTest.java b/flink-connectors/flink-orc/src/test/java/org/apache/flink/orc/OrcUtilsTest.java
deleted file mode 100644
index 2cb1715..0000000
--- a/flink-connectors/flink-orc/src/test/java/org/apache/flink/orc/OrcUtilsTest.java
+++ /dev/null
@@ -1,148 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.orc;
-
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
-import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.common.typeinfo.Types;
-import org.apache.flink.api.java.typeutils.MapTypeInfo;
-import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo;
-import org.apache.flink.api.java.typeutils.RowTypeInfo;
-
-import org.apache.orc.TypeDescription;
-import org.junit.Assert;
-import org.junit.Test;
-
-/**
- * Unit tests for {@link OrcUtils}.
- *
- */
-public class OrcUtilsTest {
-
-	@Test
-	public void testFlatSchemaToTypeInfo1() {
-
-		String schema =
-			"struct<" +
-				"boolean1:boolean," +
-				"byte1:tinyint," +
-				"short1:smallint," +
-				"int1:int," +
-				"long1:bigint," +
-				"float1:float," +
-				"double1:double," +
-				"bytes1:binary," +
-				"string1:string," +
-				"date1:date," +
-				"timestamp1:timestamp," +
-				"decimal1:decimal(5,2)" +
-			">";
-		TypeInformation typeInfo = OrcUtils.schemaToTypeInfo(TypeDescription.fromString(schema));
-
-		Assert.assertNotNull(typeInfo);
-		Assert.assertTrue(typeInfo instanceof RowTypeInfo);
-		RowTypeInfo rowTypeInfo = (RowTypeInfo) typeInfo;
-
-		// validate field types
-		Assert.assertArrayEquals(
-			new TypeInformation[]{
-				Types.BOOLEAN, Types.BYTE, Types.SHORT, Types.INT, Types.LONG, Types.FLOAT, Types.DOUBLE,
-				PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO, Types.STRING,
-				Types.SQL_DATE, Types.SQL_TIMESTAMP, BasicTypeInfo.BIG_DEC_TYPE_INFO
-			},
-			rowTypeInfo.getFieldTypes());
-
-		// validate field names
-		Assert.assertArrayEquals(
-			new String[] {
-				"boolean1", "byte1", "short1", "int1", "long1", "float1", "double1",
-				"bytes1", "string1", "date1", "timestamp1", "decimal1"
-			},
-			rowTypeInfo.getFieldNames());
-
-	}
-
-	@Test
-	public void testNestedSchemaToTypeInfo1() {
-
-		String schema =
-			"struct<" +
-				"middle:struct<" +
-					"list:array<" +
-						"struct<" +
-							"int1:int," +
-							"string1:string" +
-						">" +
-					">" +
-				">," +
-				"list:array<" +
-					"struct<" +
-						"int1:int," +
-						"string1:string" +
-					">" +
-				">," +
-				"map:map<" +
-					"string," +
-					"struct<" +
-						"int1:int," +
-						"string1:string" +
-					">" +
-				">" +
-			">";
-		TypeInformation typeInfo = OrcUtils.schemaToTypeInfo(TypeDescription.fromString(schema));
-
-		Assert.assertNotNull(typeInfo);
-		Assert.assertTrue(typeInfo instanceof RowTypeInfo);
-		RowTypeInfo rowTypeInfo = (RowTypeInfo) typeInfo;
-
-		// validate field types
-		Assert.assertArrayEquals(
-			new TypeInformation[]{
-				Types.ROW_NAMED(
-					new String[]{"list"},
-					ObjectArrayTypeInfo.getInfoFor(
-						Types.ROW_NAMED(
-							new String[]{"int1", "string1"},
-							Types.INT, Types.STRING
-						)
-					)
-				),
-				ObjectArrayTypeInfo.getInfoFor(
-					Types.ROW_NAMED(
-						new String[]{"int1", "string1"},
-						Types.INT, Types.STRING
-					)
-				),
-				new MapTypeInfo<>(
-					Types.STRING,
-					Types.ROW_NAMED(
-						new String[]{"int1", "string1"},
-						Types.INT, Types.STRING
-					)
-				)
-			},
-			rowTypeInfo.getFieldTypes());
-
-		// validate field names
-		Assert.assertArrayEquals(
-			new String[] {"middle", "list", "map"},
-			rowTypeInfo.getFieldNames());
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/bcead3be/flink-connectors/flink-orc/src/test/java/org/apache/flink/orc/util/OrcTestFileGenerator.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-orc/src/test/java/org/apache/flink/orc/util/OrcTestFileGenerator.java b/flink-connectors/flink-orc/src/test/java/org/apache/flink/orc/util/OrcTestFileGenerator.java
new file mode 100644
index 0000000..9d3be63
--- /dev/null
+++ b/flink-connectors/flink-orc/src/test/java/org/apache/flink/orc/util/OrcTestFileGenerator.java
@@ -0,0 +1,373 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.orc.util;
+
+import org.apache.flink.orc.OrcRowInputFormatTest;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.ListColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.MapColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.StructColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.apache.orc.OrcFile;
+import org.apache.orc.TypeDescription;
+import org.apache.orc.Writer;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+
+/**
+ * A generator for ORC test files.
+ */
+public class OrcTestFileGenerator {
+
+	public static void main(String[] args) throws IOException {
+		writeCompositeTypesWithNullsFile(args[0]);
+//		writeCompositeTypesWithRepeatingFile(args[0]);
+	}
+
+	/**
+	 * Writes an ORC file with nested composite types and null values on different levels.
+	 * Generates {@link OrcRowInputFormatTest#TEST_FILE_COMPOSITES_NULLS}.
+	 */
+	private static void writeCompositeTypesWithNullsFile(String path) throws IOException {
+
+		Path filePath = new Path(path);
+		Configuration conf = new Configuration();
+
+		TypeDescription schema =
+			TypeDescription.fromString(
+				"struct<" +
+					"int1:int," +
+					"record1:struct<f1:int,f2:string>," +
+					"list1:array<array<array<struct<f1:string,f2:string>>>>," +
+					"list2:array<map<string,int>>" +
+				">");
+
+		Writer writer =
+			OrcFile.createWriter(filePath,
+				OrcFile.writerOptions(conf).setSchema(schema));
+
+		VectorizedRowBatch batch = schema.createRowBatch();
+		LongColumnVector int1 = (LongColumnVector) batch.cols[0];
+
+		StructColumnVector record1 = (StructColumnVector) batch.cols[1];
+		LongColumnVector record1F1 = (LongColumnVector) record1.fields[0];
+		BytesColumnVector record1F2 = (BytesColumnVector) record1.fields[1];
+
+		ListColumnVector list1 = (ListColumnVector) batch.cols[2];
+		ListColumnVector nestedList = (ListColumnVector) list1.child;
+		ListColumnVector nestedList2 = (ListColumnVector) nestedList.child;
+		StructColumnVector listEntries = (StructColumnVector) nestedList2.child;
+		BytesColumnVector entryField1 = (BytesColumnVector) listEntries.fields[0];
+		BytesColumnVector entryField2 = (BytesColumnVector) listEntries.fields[1];
+
+		ListColumnVector list2 = (ListColumnVector) batch.cols[3];
+		MapColumnVector map1 = (MapColumnVector) list2.child;
+		BytesColumnVector keys = (BytesColumnVector) map1.keys;
+		LongColumnVector vals = (LongColumnVector) map1.values;
+
+		final int list1Size = 4;
+		final int nestedListSize = 3;
+		final int nestedList2Size = 2;
+		final int list2Size = 3;
+		final int mapSize = 3;
+
+		final int batchSize = batch.getMaxSize();
+
+		// Ensure the vectors have sufficient capacity
+		nestedList.ensureSize(batchSize * list1Size, false);
+		nestedList2.ensureSize(batchSize * list1Size * nestedListSize, false);
+		listEntries.ensureSize(batchSize * list1Size * nestedListSize * nestedList2Size, false);
+		map1.ensureSize(batchSize * list2Size, false);
+		keys.ensureSize(batchSize * list2Size * mapSize, false);
+		vals.ensureSize(batchSize * list2Size * mapSize, false);
+
+		// add 2500 rows to file
+		for (int r = 0; r < 2500; ++r) {
+			int row = batch.size++;
+
+			// mark nullable fields
+			list1.noNulls = false;
+			nestedList.noNulls = false;
+			listEntries.noNulls = false;
+			entryField1.noNulls = false;
+			record1.noNulls = false;
+			record1F2.noNulls = false;
+			list2.noNulls = false;
+			map1.noNulls = false;
+			keys.noNulls = false;
+			vals.noNulls = false;
+
+			// first field: int1
+			int1.vector[row] = r;
+
+			// second field: struct
+			if (row % 2 != 0) {
+				// in every second row, the struct is null
+				record1F1.vector[row] = row;
+				if (row % 5 != 0) {
+					// in every fifth row, the second field of the struct is null
+					record1F2.setVal(row, ("f2-" + row).getBytes(StandardCharsets.UTF_8));
+				} else {
+					record1F2.isNull[row] = true;
+				}
+			} else {
+				record1.isNull[row] = true;
+			}
+
+			// third field: deeply nested list
+			if (row % 3 != 0) {
+				// in every third row, the nested list is null
+				list1.offsets[row] = list1.childCount;
+				list1.lengths[row] = list1Size;
+				list1.childCount += list1Size;
+
+				for (int i = 0; i < list1Size; i++) {
+
+					int listOffset = (int) list1.offsets[row] + i;
+					if (i != 2) {
+						// second nested list is always null
+						nestedList.offsets[listOffset] = nestedList.childCount;
+						nestedList.lengths[listOffset] = nestedListSize;
+						nestedList.childCount += nestedListSize;
+
+						for (int j = 0; j < nestedListSize; j++) {
+							int nestedOffset = (int) nestedList.offsets[listOffset] + j;
+							nestedList2.offsets[nestedOffset] = nestedList2.childCount;
+							nestedList2.lengths[nestedOffset] = nestedList2Size;
+							nestedList2.childCount += nestedList2Size;
+
+							for (int k = 0; k < nestedList2Size; k++) {
+								int nestedOffset2 = (int) nestedList2.offsets[nestedOffset] + k;
+								// list entries
+								if (k != 1) {
+									// second struct is always null
+									if (k != 0) {
+										// first struct field in first struct is always null
+										entryField1.setVal(nestedOffset2, ("f1-" + k).getBytes(StandardCharsets.UTF_8));
+									} else {
+										entryField1.isNull[nestedOffset2] = true;
+									}
+									entryField2.setVal(nestedOffset2, ("f2-" + k).getBytes(StandardCharsets.UTF_8));
+								} else {
+									listEntries.isNull[nestedOffset2] = true;
+								}
+							}
+						}
+					} else {
+						nestedList.isNull[listOffset] = true;
+					}
+				}
+			} else {
+				list1.isNull[row] = true;
+			}
+
+			// forth field: map in list
+			if (row % 3 != 0) {
+				// in every third row, the map list is null
+				list2.offsets[row] = list2.childCount;
+				list2.lengths[row] = list2Size;
+				list2.childCount += list2Size;
+
+				for (int i = 0; i < list2Size; i++) {
+					int mapOffset = (int) list2.offsets[row] + i;
+
+					if (i != 2) {
+						// second map list entry is always null
+						map1.offsets[mapOffset] = map1.childCount;
+						map1.lengths[mapOffset] = mapSize;
+						map1.childCount += mapSize;
+
+						for (int j = 0; j < mapSize; j++) {
+							int mapEntryOffset = (int) map1.offsets[mapOffset] + j;
+
+							if (j != 1) {
+								// key in second map entry is always null
+								keys.setVal(mapEntryOffset, ("key-" + row + "-" + j).getBytes(StandardCharsets.UTF_8));
+							} else {
+								keys.isNull[mapEntryOffset] = true;
+							}
+							if (j != 2) {
+								// value in third map entry is always null
+								vals.vector[mapEntryOffset] = row + i + j;
+							} else {
+								vals.isNull[mapEntryOffset] = true;
+							}
+						}
+					} else {
+						map1.isNull[mapOffset] = true;
+					}
+				}
+			} else {
+				list2.isNull[row] = true;
+			}
+
+			if (row == batchSize - 1) {
+				writer.addRowBatch(batch);
+				batch.reset();
+			}
+		}
+		if (batch.size != 0) {
+			writer.addRowBatch(batch);
+			batch.reset();
+		}
+		writer.close();
+	}
+
+	/**
+	 * Writes an ORC file with nested composite types and repeated values.
+	 * Generates {@link OrcRowInputFormatTest#TEST_FILE_REPEATING}.
+	 */
+	private static void writeCompositeTypesWithRepeatingFile(String path) throws IOException {
+
+		Path filePath = new Path(path);
+		Configuration conf = new Configuration();
+
+		TypeDescription schema =
+			TypeDescription.fromString(
+				"struct<" +
+					"int1:int," +
+					"int2:int," +
+					"int3:int," +
+					"record1:struct<f1:int,f2:string>," +
+					"record2:struct<f1:int,f2:string>," +
+					"list1:array<int>," +
+					"list2:array<int>," +
+					"list3:array<int>," +
+					"map1:map<int,string>," +
+					"map2:map<int,string>" +
+				">");
+
+		Writer writer =
+			OrcFile.createWriter(filePath,
+				OrcFile.writerOptions(conf).setSchema(schema));
+
+		VectorizedRowBatch batch = schema.createRowBatch();
+
+		LongColumnVector int1 = (LongColumnVector) batch.cols[0];
+		LongColumnVector int2 = (LongColumnVector) batch.cols[1];
+		LongColumnVector int3 = (LongColumnVector) batch.cols[2];
+
+		StructColumnVector record1 = (StructColumnVector) batch.cols[3];
+		LongColumnVector record1F1 = (LongColumnVector) record1.fields[0];
+		BytesColumnVector record1F2 = (BytesColumnVector) record1.fields[1];
+		StructColumnVector record2 = (StructColumnVector) batch.cols[4];
+
+		ListColumnVector list1 = (ListColumnVector) batch.cols[5];
+		LongColumnVector list1int = (LongColumnVector) list1.child;
+		ListColumnVector list2 = (ListColumnVector) batch.cols[6];
+		LongColumnVector list2int = (LongColumnVector) list2.child;
+		ListColumnVector list3 = (ListColumnVector) batch.cols[7];
+
+		MapColumnVector map1 = (MapColumnVector) batch.cols[8];
+		LongColumnVector map1keys = (LongColumnVector) map1.keys;
+		BytesColumnVector map1vals = (BytesColumnVector) map1.values;
+		MapColumnVector map2 = (MapColumnVector) batch.cols[9];
+
+		final int listSize = 3;
+		final int mapSize = 2;
+
+		final int batchSize = batch.getMaxSize();
+
+		// Ensure the vectors have sufficient capacity
+		list1int.ensureSize(batchSize * listSize, false);
+		list2int.ensureSize(batchSize * listSize, false);
+		map1keys.ensureSize(batchSize * mapSize, false);
+		map1vals.ensureSize(batchSize * mapSize, false);
+
+		// int1: all values are 42
+		int1.noNulls = true;
+		int1.setRepeating(true);
+		int1.vector[0] = 42;
+
+		// int2: all values are null
+		int2.noNulls = false;
+		int2.setRepeating(true);
+		int2.isNull[0] = true;
+
+		// int3: all values are 99
+		int3.noNulls = false;
+		int3.setRepeating(true);
+		int3.isNull[0] = false;
+		int3.vector[0] = 99;
+
+		// record1: all records are [23, "Hello"]
+		record1.noNulls = true;
+		record1.setRepeating(true);
+		for (int i = 0; i < batchSize; i++) {
+			record1F1.vector[i] = i + 23;
+		}
+		record1F2.noNulls = false;
+		record1F2.isNull[0] = true;
+
+		// record2: all records are null
+		record2.noNulls = false;
+		record2.setRepeating(true);
+		record2.isNull[0] = true;
+
+		// list1: all lists are [1, 2, 3]
+		list1.noNulls = true;
+		list1.setRepeating(true);
+		list1.lengths[0] = listSize;
+		list1.offsets[0] = 1;
+		for (int i = 0; i < batchSize * listSize; i++) {
+			list1int.vector[i] = i;
+		}
+
+		// list2: all lists are [7, 7, 7]
+		list2.noNulls = true;
+		list2.setRepeating(true);
+		list2.lengths[0] = listSize;
+		list2.offsets[0] = 0;
+		list2int.setRepeating(true);
+		list2int.vector[0] = 7;
+
+		// list3: all lists are null
+		list3.noNulls = false;
+		list3.setRepeating(true);
+		list3.isNull[0] = true;
+
+		// map1: all maps are [2 -> "HELLO", 4 -> "HELLO"]
+		map1.noNulls = true;
+		map1.setRepeating(true);
+		map1.lengths[0] = mapSize;
+		map1.offsets[0] = 1;
+		for (int i = 0; i < batchSize * mapSize; i++) {
+			map1keys.vector[i] = i * 2;
+		}
+		map1vals.setRepeating(true);
+		map1vals.setVal(0, "Hello".getBytes(StandardCharsets.UTF_8));
+
+		// map2: all maps are null
+		map2.noNulls = false;
+		map2.setRepeating(true);
+		map2.isNull[0] = true;
+
+		batch.size = 256;
+
+		writer.addRowBatch(batch);
+		batch.reset();
+		writer.close();
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/bcead3be/flink-connectors/flink-orc/src/test/resources/test-data-composites-with-nulls.orc
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-orc/src/test/resources/test-data-composites-with-nulls.orc b/flink-connectors/flink-orc/src/test/resources/test-data-composites-with-nulls.orc
new file mode 100644
index 0000000..eed1c55
Binary files /dev/null and b/flink-connectors/flink-orc/src/test/resources/test-data-composites-with-nulls.orc differ

http://git-wip-us.apache.org/repos/asf/flink/blob/bcead3be/flink-connectors/flink-orc/src/test/resources/test-data-repeating.orc
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-orc/src/test/resources/test-data-repeating.orc b/flink-connectors/flink-orc/src/test/resources/test-data-repeating.orc
new file mode 100644
index 0000000..ff2c917
Binary files /dev/null and b/flink-connectors/flink-orc/src/test/resources/test-data-repeating.orc differ


[2/2] flink git commit: [FLINK-8230] [orc] Fix NPEs when reading nested columns.

Posted by tw...@apache.org.
[FLINK-8230] [orc] Fix NPEs when reading nested columns.

- fixes NPEs for null-valued structs, lists, and maps
- fixes NPEs for repeating structs, lists, and maps
- adds test for deeply nested data with nulls
- adds test for columns with repeating values

This closes #5373.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/bcead3be
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/bcead3be
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/bcead3be

Branch: refs/heads/master
Commit: bcead3be32c624008730555d828fd8e9447fbeff
Parents: 3cfc5ae
Author: Fabian Hueske <fh...@apache.org>
Authored: Tue Dec 12 12:09:14 2017 +0100
Committer: twalthr <tw...@apache.org>
Committed: Wed Jan 31 14:44:23 2018 +0100

----------------------------------------------------------------------
 .../org/apache/flink/orc/OrcBatchReader.java    | 1402 ++++++++++++++++
 .../org/apache/flink/orc/OrcRowInputFormat.java |    4 +-
 .../org/apache/flink/orc/OrcTableSource.java    |    2 +-
 .../java/org/apache/flink/orc/OrcUtils.java     | 1508 ------------------
 .../apache/flink/orc/OrcBatchReaderTest.java    |  148 ++
 .../apache/flink/orc/OrcRowInputFormatTest.java |  218 ++-
 .../java/org/apache/flink/orc/OrcUtilsTest.java |  148 --
 .../flink/orc/util/OrcTestFileGenerator.java    |  373 +++++
 .../test-data-composites-with-nulls.orc         |  Bin 0 -> 10597 bytes
 .../src/test/resources/test-data-repeating.orc  |  Bin 0 -> 1203 bytes
 10 files changed, 2137 insertions(+), 1666 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/bcead3be/flink-connectors/flink-orc/src/main/java/org/apache/flink/orc/OrcBatchReader.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-orc/src/main/java/org/apache/flink/orc/OrcBatchReader.java b/flink-connectors/flink-orc/src/main/java/org/apache/flink/orc/OrcBatchReader.java
new file mode 100644
index 0000000..3ecdeb3
--- /dev/null
+++ b/flink-connectors/flink-orc/src/main/java/org/apache/flink/orc/OrcBatchReader.java
@@ -0,0 +1,1402 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.orc;
+
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
+import org.apache.flink.api.common.typeinfo.SqlTimeTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.MapTypeInfo;
+import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.types.Row;
+
+import org.apache.hadoop.hive.common.type.HiveDecimal;
+import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.DecimalColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.ListColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.MapColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.StructColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.TimestampColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable;
+import org.apache.orc.TypeDescription;
+
+import java.lang.reflect.Array;
+import java.math.BigDecimal;
+import java.nio.charset.StandardCharsets;
+import java.sql.Date;
+import java.sql.Timestamp;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.TimeZone;
+import java.util.function.DoubleFunction;
+import java.util.function.Function;
+import java.util.function.LongFunction;
+
+/**
+ * A class that provides utility methods for orc file reading.
+ */
+class OrcBatchReader {
+
+	private static final long MILLIS_PER_DAY = 86400000; // = 24 * 60 * 60 * 1000
+	private static final TimeZone LOCAL_TZ = TimeZone.getDefault();
+
+	/**
+	 * Converts an ORC schema to a Flink TypeInformation.
+	 *
+	 * @param schema The ORC schema.
+	 * @return The TypeInformation that corresponds to the ORC schema.
+	 */
+	static TypeInformation schemaToTypeInfo(TypeDescription schema) {
+		switch (schema.getCategory()) {
+			case BOOLEAN:
+				return BasicTypeInfo.BOOLEAN_TYPE_INFO;
+			case BYTE:
+				return BasicTypeInfo.BYTE_TYPE_INFO;
+			case SHORT:
+				return BasicTypeInfo.SHORT_TYPE_INFO;
+			case INT:
+				return BasicTypeInfo.INT_TYPE_INFO;
+			case LONG:
+				return BasicTypeInfo.LONG_TYPE_INFO;
+			case FLOAT:
+				return BasicTypeInfo.FLOAT_TYPE_INFO;
+			case DOUBLE:
+				return BasicTypeInfo.DOUBLE_TYPE_INFO;
+			case DECIMAL:
+				return BasicTypeInfo.BIG_DEC_TYPE_INFO;
+			case STRING:
+			case CHAR:
+			case VARCHAR:
+				return BasicTypeInfo.STRING_TYPE_INFO;
+			case DATE:
+				return SqlTimeTypeInfo.DATE;
+			case TIMESTAMP:
+				return SqlTimeTypeInfo.TIMESTAMP;
+			case BINARY:
+				return PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO;
+			case STRUCT:
+				List<TypeDescription> fieldSchemas = schema.getChildren();
+				TypeInformation[] fieldTypes = new TypeInformation[fieldSchemas.size()];
+				for (int i = 0; i < fieldSchemas.size(); i++) {
+					fieldTypes[i] = schemaToTypeInfo(fieldSchemas.get(i));
+				}
+				String[] fieldNames = schema.getFieldNames().toArray(new String[]{});
+				return new RowTypeInfo(fieldTypes, fieldNames);
+			case LIST:
+				TypeDescription elementSchema = schema.getChildren().get(0);
+				TypeInformation<?> elementType = schemaToTypeInfo(elementSchema);
+				// arrays of primitive types are handled as object arrays to support null values
+				return ObjectArrayTypeInfo.getInfoFor(elementType);
+			case MAP:
+				TypeDescription keySchema = schema.getChildren().get(0);
+				TypeDescription valSchema = schema.getChildren().get(1);
+				TypeInformation<?> keyType = schemaToTypeInfo(keySchema);
+				TypeInformation<?> valType = schemaToTypeInfo(valSchema);
+				return new MapTypeInfo<>(keyType, valType);
+			case UNION:
+				throw new UnsupportedOperationException("UNION type is not supported yet.");
+			default:
+				throw new IllegalArgumentException("Unknown type " + schema);
+		}
+	}
+
+	/**
+	 * Fills an ORC batch into an array of Row.
+	 *
+	 * @param rows The batch of rows need to be filled.
+	 * @param schema The schema of the ORC data.
+	 * @param batch The ORC data.
+	 * @param selectedFields The list of selected ORC fields.
+	 * @return The number of rows that were filled.
+	 */
+	static int fillRows(Row[] rows, TypeDescription schema, VectorizedRowBatch batch, int[] selectedFields) {
+
+		int rowsToRead = Math.min((int) batch.count(), rows.length);
+
+		List<TypeDescription> fieldTypes = schema.getChildren();
+		// read each selected field
+		for (int fieldIdx = 0; fieldIdx < selectedFields.length; fieldIdx++) {
+			int orcIdx = selectedFields[fieldIdx];
+			readField(rows, fieldIdx, fieldTypes.get(orcIdx), batch.cols[orcIdx], rowsToRead);
+		}
+		return rowsToRead;
+	}
+
+	/**
+	 * Reads a vector of data into an array of objects.
+	 *
+	 * @param vals The array that needs to be filled.
+	 * @param fieldIdx If the vals array is an array of Row, the index of the field that needs to be filled.
+	 *                 Otherwise a -1 must be passed and the data is directly filled into the array.
+	 * @param schema The schema of the vector to read.
+	 * @param vector The vector to read.
+	 * @param childCount The number of vector entries to read.
+	 */
+	private static void readField(Object[] vals, int fieldIdx, TypeDescription schema, ColumnVector vector, int childCount) {
+
+		// check the type of the vector to decide how to read it.
+		switch (schema.getCategory()) {
+			case BOOLEAN:
+				if (vector.noNulls) {
+					readNonNullLongColumn(vals, fieldIdx, (LongColumnVector) vector, childCount, OrcBatchReader::readBoolean);
+				} else {
+					readLongColumn(vals, fieldIdx, (LongColumnVector) vector, childCount, OrcBatchReader::readBoolean);
+				}
+				break;
+			case BYTE:
+				if (vector.noNulls) {
+					readNonNullLongColumn(vals, fieldIdx, (LongColumnVector) vector, childCount, OrcBatchReader::readByte);
+				} else {
+					readLongColumn(vals, fieldIdx, (LongColumnVector) vector, childCount, OrcBatchReader::readByte);
+				}
+				break;
+			case SHORT:
+				if (vector.noNulls) {
+					readNonNullLongColumn(vals, fieldIdx, (LongColumnVector) vector, childCount, OrcBatchReader::readShort);
+				} else {
+					readLongColumn(vals, fieldIdx, (LongColumnVector) vector, childCount, OrcBatchReader::readShort);
+				}
+				break;
+			case INT:
+				if (vector.noNulls) {
+					readNonNullLongColumn(vals, fieldIdx, (LongColumnVector) vector, childCount, OrcBatchReader::readInt);
+				} else {
+					readLongColumn(vals, fieldIdx, (LongColumnVector) vector, childCount, OrcBatchReader::readInt);
+				}
+				break;
+			case LONG:
+				if (vector.noNulls) {
+					readNonNullLongColumn(vals, fieldIdx, (LongColumnVector) vector, childCount, OrcBatchReader::readLong);
+				} else {
+					readLongColumn(vals, fieldIdx, (LongColumnVector) vector, childCount, OrcBatchReader::readLong);
+				}
+				break;
+			case FLOAT:
+				if (vector.noNulls) {
+					readNonNullDoubleColumn(vals, fieldIdx, (DoubleColumnVector) vector, childCount, OrcBatchReader::readFloat);
+				} else {
+					readDoubleColumn(vals, fieldIdx, (DoubleColumnVector) vector, childCount, OrcBatchReader::readFloat);
+				}
+				break;
+			case DOUBLE:
+				if (vector.noNulls) {
+					readNonNullDoubleColumn(vals, fieldIdx, (DoubleColumnVector) vector, childCount, OrcBatchReader::readDouble);
+				} else {
+					readDoubleColumn(vals, fieldIdx, (DoubleColumnVector) vector, childCount, OrcBatchReader::readDouble);
+				}
+				break;
+			case CHAR:
+			case VARCHAR:
+			case STRING:
+				if (vector.noNulls) {
+					readNonNullBytesColumnAsString(vals, fieldIdx, (BytesColumnVector) vector, childCount);
+				} else {
+					readBytesColumnAsString(vals, fieldIdx, (BytesColumnVector) vector, childCount);
+				}
+				break;
+			case DATE:
+				if (vector.noNulls) {
+					readNonNullLongColumnAsDate(vals, fieldIdx, (LongColumnVector) vector, childCount);
+				} else {
+					readLongColumnAsDate(vals, fieldIdx, (LongColumnVector) vector, childCount);
+				}
+				break;
+			case TIMESTAMP:
+				if (vector.noNulls) {
+					readNonNullTimestampColumn(vals, fieldIdx, (TimestampColumnVector) vector, childCount);
+				} else {
+					readTimestampColumn(vals, fieldIdx, (TimestampColumnVector) vector, childCount);
+				}
+				break;
+			case BINARY:
+				if (vector.noNulls) {
+					readNonNullBytesColumnAsBinary(vals, fieldIdx, (BytesColumnVector) vector, childCount);
+				} else {
+					readBytesColumnAsBinary(vals, fieldIdx, (BytesColumnVector) vector, childCount);
+				}
+				break;
+			case DECIMAL:
+				if (vector.noNulls) {
+					readNonNullDecimalColumn(vals, fieldIdx, (DecimalColumnVector) vector, childCount);
+				} else {
+					readDecimalColumn(vals, fieldIdx, (DecimalColumnVector) vector, childCount);
+				}
+				break;
+			case STRUCT:
+				if (vector.noNulls) {
+					readNonNullStructColumn(vals, fieldIdx, (StructColumnVector) vector, schema, childCount);
+				} else {
+					readStructColumn(vals, fieldIdx, (StructColumnVector) vector, schema, childCount);
+				}
+				break;
+			case LIST:
+				if (vector.noNulls) {
+					readNonNullListColumn(vals, fieldIdx, (ListColumnVector) vector, schema, childCount);
+				} else {
+					readListColumn(vals, fieldIdx, (ListColumnVector) vector, schema, childCount);
+				}
+				break;
+			case MAP:
+				if (vector.noNulls) {
+					readNonNullMapColumn(vals, fieldIdx, (MapColumnVector) vector, schema, childCount);
+				} else {
+					readMapColumn(vals, fieldIdx, (MapColumnVector) vector, schema, childCount);
+				}
+				break;
+			case UNION:
+				throw new UnsupportedOperationException("UNION type not supported yet");
+			default:
+				throw new IllegalArgumentException("Unknown type " + schema);
+		}
+	}
+
+	private static <T> void readNonNullLongColumn(Object[] vals, int fieldIdx, LongColumnVector vector,
+													int childCount, LongFunction<T> reader) {
+
+		if (vector.isRepeating) { // fill complete column with first value
+			T repeatingValue = reader.apply(vector.vector[0]);
+			fillColumnWithRepeatingValue(vals, fieldIdx, repeatingValue, childCount);
+		} else {
+			if (fieldIdx == -1) { // set as an object
+				for (int i = 0; i < childCount; i++) {
+					vals[i] = reader.apply(vector.vector[i]);
+				}
+			} else { // set as a field of Row
+				Row[] rows = (Row[]) vals;
+				for (int i = 0; i < childCount; i++) {
+					rows[i].setField(fieldIdx, reader.apply(vector.vector[i]));
+				}
+			}
+		}
+	}
+
+	private static <T> void readNonNullDoubleColumn(Object[] vals, int fieldIdx, DoubleColumnVector vector,
+													int childCount, DoubleFunction<T> reader) {
+
+		if (vector.isRepeating) { // fill complete column with first value
+			T repeatingValue = reader.apply(vector.vector[0]);
+			fillColumnWithRepeatingValue(vals, fieldIdx, repeatingValue, childCount);
+		} else {
+			if (fieldIdx == -1) { // set as an object
+				for (int i = 0; i < childCount; i++) {
+					vals[i] = reader.apply(vector.vector[i]);
+				}
+			} else { // set as a field of Row
+				Row[] rows = (Row[]) vals;
+				for (int i = 0; i < childCount; i++) {
+					rows[i].setField(fieldIdx, reader.apply(vector.vector[i]));
+				}
+			}
+		}
+	}
+
+	private static void readNonNullBytesColumnAsString(Object[] vals, int fieldIdx, BytesColumnVector bytes, int childCount) {
+		if (bytes.isRepeating) { // fill complete column with first value
+			String repeatingValue = readString(bytes.vector[0], bytes.start[0], bytes.length[0]);
+			fillColumnWithRepeatingValue(vals, fieldIdx, repeatingValue, childCount);
+		} else {
+			if (fieldIdx == -1) { // set as an object
+				for (int i = 0; i < childCount; i++) {
+					vals[i] = readString(bytes.vector[i], bytes.start[i], bytes.length[i]);
+				}
+			} else { // set as a field of Row
+				Row[] rows = (Row[]) vals;
+				for (int i = 0; i < childCount; i++) {
+					rows[i].setField(fieldIdx, readString(bytes.vector[i], bytes.start[i], bytes.length[i]));
+				}
+			}
+		}
+	}
+
+	private static void readNonNullBytesColumnAsBinary(Object[] vals, int fieldIdx, BytesColumnVector bytes, int childCount) {
+		if (bytes.isRepeating) { // fill complete column with first value
+			if (fieldIdx == -1) { // set as an object
+				for (int i = 0; i < childCount; i++) {
+					// don't reuse repeating val to avoid object mutation
+					vals[i] = readBinary(bytes.vector[0], bytes.start[0], bytes.length[0]);
+				}
+			} else { // set as a field of Row
+				Row[] rows = (Row[]) vals;
+				for (int i = 0; i < childCount; i++) {
+					// don't reuse repeating val to avoid object mutation
+					rows[i].setField(fieldIdx, readBinary(bytes.vector[0], bytes.start[0], bytes.length[0]));
+				}
+			}
+		} else {
+			if (fieldIdx == -1) { // set as an object
+				for (int i = 0; i < childCount; i++) {
+					vals[i] = readBinary(bytes.vector[i], bytes.start[i], bytes.length[i]);
+				}
+			} else { // set as a field of Row
+				Row[] rows = (Row[]) vals;
+				for (int i = 0; i < childCount; i++) {
+					rows[i].setField(fieldIdx, readBinary(bytes.vector[i], bytes.start[i], bytes.length[i]));
+				}
+			}
+		}
+	}
+
+	private static void readNonNullLongColumnAsDate(Object[] vals, int fieldIdx, LongColumnVector vector, int childCount) {
+
+		if (vector.isRepeating) { // fill complete column with first value
+			if (fieldIdx == -1) { // set as an object
+				for (int i = 0; i < childCount; i++) {
+					// do not reuse repeated value due to mutability of Date
+					vals[i] = readDate(vector.vector[0]);
+				}
+			} else { // set as a field of Row
+				Row[] rows = (Row[]) vals;
+				for (int i = 0; i < childCount; i++) {
+					// do not reuse repeated value due to mutability of Date
+					rows[i].setField(fieldIdx, readDate(vector.vector[0]));
+				}
+			}
+		} else {
+			if (fieldIdx == -1) { // set as an object
+				for (int i = 0; i < childCount; i++) {
+					vals[i] = readDate(vector.vector[i]);
+				}
+			} else { // set as a field of Row
+				Row[] rows = (Row[]) vals;
+				for (int i = 0; i < childCount; i++) {
+					rows[i].setField(fieldIdx, readDate(vector.vector[i]));
+				}
+			}
+		}
+	}
+
+	private static void readNonNullTimestampColumn(Object[] vals, int fieldIdx, TimestampColumnVector vector, int childCount) {
+
+		if (vector.isRepeating) { // fill complete column with first value
+			if (fieldIdx == -1) { // set as an object
+				for (int i = 0; i < childCount; i++) {
+					// do not reuse value to prevent object mutation
+					vals[i] = readTimestamp(vector.time[0], vector.nanos[0]);
+				}
+			} else { // set as a field of Row
+				Row[] rows = (Row[]) vals;
+				for (int i = 0; i < childCount; i++) {
+					// do not reuse value to prevent object mutation
+					rows[i].setField(fieldIdx, readTimestamp(vector.time[0], vector.nanos[0]));
+				}
+			}
+		} else {
+			if (fieldIdx == -1) { // set as an object
+				for (int i = 0; i < childCount; i++) {
+					vals[i] = readTimestamp(vector.time[i], vector.nanos[i]);
+				}
+			} else { // set as a field of Row
+				Row[] rows = (Row[]) vals;
+				for (int i = 0; i < childCount; i++) {
+					rows[i].setField(fieldIdx, readTimestamp(vector.time[i], vector.nanos[i]));
+				}
+			}
+		}
+	}
+
+	private static void readNonNullDecimalColumn(Object[] vals, int fieldIdx, DecimalColumnVector vector, int childCount) {
+
+		if (vector.isRepeating) { // fill complete column with first value
+			fillColumnWithRepeatingValue(vals, fieldIdx, readBigDecimal(vector.vector[0]), childCount);
+		} else {
+			if (fieldIdx == -1) { // set as an object
+				for (int i = 0; i < childCount; i++) {
+					vals[i] = readBigDecimal(vector.vector[i]);
+				}
+			} else { // set as a field of Row
+				Row[] rows = (Row[]) vals;
+				for (int i = 0; i < childCount; i++) {
+					rows[i].setField(fieldIdx, readBigDecimal(vector.vector[i]));
+				}
+			}
+		}
+	}
+
+	private static void readNonNullStructColumn(Object[] vals, int fieldIdx, StructColumnVector structVector, TypeDescription schema, int childCount) {
+
+		List<TypeDescription> childrenTypes = schema.getChildren();
+
+		int numFields = childrenTypes.size();
+		// create a batch of Rows to read the structs
+		Row[] structs = new Row[childCount];
+		// TODO: possible improvement: reuse existing Row objects
+		for (int i = 0; i < childCount; i++) {
+			structs[i] = new Row(numFields);
+		}
+
+		// read struct fields
+		// we don't have to handle isRepeating because ORC assumes that it is propagated into the children.
+		for (int i = 0; i < numFields; i++) {
+			readField(structs, i, childrenTypes.get(i), structVector.fields[i], childCount);
+		}
+
+		if (fieldIdx == -1) { // set struct as an object
+			System.arraycopy(structs, 0, vals, 0, childCount);
+		} else { // set struct as a field of Row
+			Row[] rows = (Row[]) vals;
+			for (int i = 0; i < childCount; i++) {
+				rows[i].setField(fieldIdx, structs[i]);
+			}
+		}
+	}
+
+	private static void readNonNullListColumn(Object[] vals, int fieldIdx, ListColumnVector list, TypeDescription schema, int childCount) {
+
+		TypeDescription fieldType = schema.getChildren().get(0);
+		// get class of list elements
+		Class<?> classType = getClassForType(fieldType);
+
+		if (list.isRepeating) {
+
+			int offset = (int) list.offsets[0];
+			int length = (int) list.lengths[0];
+			// we only need to read until offset + length.
+			int entriesToRead = offset + length;
+
+			// read children
+			Object[] children = (Object[]) Array.newInstance(classType, entriesToRead);
+			readField(children, -1, fieldType, list.child, entriesToRead);
+
+			// get function to copy list
+			Function<Object, Object> copyList = getCopyFunction(schema);
+
+			// create first list that will be copied
+			Object[] first;
+			if (offset == 0) {
+				first = children;
+			} else {
+				first = (Object[]) Array.newInstance(classType, length);
+				System.arraycopy(children, offset, first, 0, length);
+			}
+
+			// create copies of first list and set copies as result
+			for (int i = 0; i < childCount; i++) {
+				Object[] copy = (Object[]) copyList.apply(first);
+				if (fieldIdx == -1) {
+					vals[i] = copy;
+				} else {
+					((Row) vals[i]).setField(fieldIdx, copy);
+				}
+			}
+		} else {
+
+			// read children
+			Object[] children = (Object[]) Array.newInstance(classType, list.childCount);
+			readField(children, -1, fieldType, list.child, list.childCount);
+
+			// fill lists with children
+			for (int i = 0; i < childCount; i++) {
+				int offset = (int) list.offsets[i];
+				int length = (int) list.lengths[i];
+
+				Object[] temp = (Object[]) Array.newInstance(classType, length);
+				System.arraycopy(children, offset, temp, 0, length);
+				if (fieldIdx == -1) {
+					vals[i] = temp;
+				} else {
+					((Row) vals[i]).setField(fieldIdx, temp);
+				}
+			}
+		}
+	}
+
+	private static void readNonNullMapColumn(Object[] vals, int fieldIdx, MapColumnVector mapsVector, TypeDescription schema, int childCount) {
+
+		List<TypeDescription> fieldType = schema.getChildren();
+		TypeDescription keyType = fieldType.get(0);
+		TypeDescription valueType = fieldType.get(1);
+
+		ColumnVector keys = mapsVector.keys;
+		ColumnVector values = mapsVector.values;
+
+		if (mapsVector.isRepeating) {
+			// first map is repeated
+
+			// get map copy function
+			Function<Object, Object> copyMap = getCopyFunction(schema);
+
+			// set all key and value entries except those of the first map to null
+			int offset = (int) mapsVector.offsets[0];
+			int length = (int) mapsVector.lengths[0];
+			// we only need to read until offset + length.
+			int entriesToRead = offset + length;
+
+			Object[] keyRows = new Object[entriesToRead];
+			Object[] valueRows = new Object[entriesToRead];
+
+			// read map keys and values
+			readField(keyRows, -1, keyType, keys, entriesToRead);
+			readField(valueRows, -1, valueType, values, entriesToRead);
+
+			// create first map that will be copied
+			HashMap map = readHashMap(keyRows, valueRows, offset, length);
+
+			// copy first map and set copy as result
+			for (int i = 0; i < childCount; i++) {
+				if (fieldIdx == -1) {
+					vals[i] = copyMap.apply(map);
+				} else {
+					((Row) vals[i]).setField(fieldIdx, copyMap.apply(map));
+				}
+			}
+
+		} else {
+
+			Object[] keyRows = new Object[mapsVector.childCount];
+			Object[] valueRows = new Object[mapsVector.childCount];
+
+			// read map keys and values
+			readField(keyRows, -1, keyType, keys, keyRows.length);
+			readField(valueRows, -1, valueType, values, valueRows.length);
+
+			long[] lengthVectorMap = mapsVector.lengths;
+			int offset = 0;
+
+			for (int i = 0; i < childCount; i++) {
+				long numMapEntries = lengthVectorMap[i];
+				HashMap map = readHashMap(keyRows, valueRows, offset, numMapEntries);
+				offset += numMapEntries;
+
+				if (fieldIdx == -1) {
+					vals[i] = map;
+				} else {
+					((Row) vals[i]).setField(fieldIdx, map);
+				}
+			}
+		}
+
+	}
+
+	private static <T> void readLongColumn(Object[] vals, int fieldIdx, LongColumnVector vector,
+											int childCount, LongFunction<T> reader) {
+
+		if (vector.isRepeating) { // fill complete column with first value
+			if (vector.isNull[0]) {
+				// fill vals with null values
+				fillColumnWithRepeatingValue(vals, fieldIdx, null, childCount);
+			} else {
+				// read repeating non-null value by forwarding call.
+				readNonNullLongColumn(vals, fieldIdx, vector, childCount, reader);
+			}
+		} else {
+			boolean[] isNullVector = vector.isNull;
+			if (fieldIdx == -1) { // set as an object
+				for (int i = 0; i < childCount; i++) {
+					if (isNullVector[i]) {
+						vals[i] = null;
+					} else {
+						vals[i] = reader.apply(vector.vector[i]);
+					}
+				}
+			} else { // set as a field of Row
+				Row[] rows = (Row[]) vals;
+				for (int i = 0; i < childCount; i++) {
+					if (isNullVector[i]) {
+						rows[i].setField(fieldIdx, null);
+					} else {
+						rows[i].setField(fieldIdx, reader.apply(vector.vector[i]));
+					}
+				}
+			}
+		}
+	}
+
+	private static <T> void readDoubleColumn(Object[] vals, int fieldIdx, DoubleColumnVector vector,
+												int childCount, DoubleFunction<T> reader) {
+
+		if (vector.isRepeating) { // fill complete column with first value
+			if (vector.isNull[0]) {
+				// fill vals with null values
+				fillColumnWithRepeatingValue(vals, fieldIdx, null, childCount);
+			} else {
+				// read repeating non-null value by forwarding call
+				readNonNullDoubleColumn(vals, fieldIdx, vector, childCount, reader);
+			}
+		} else {
+			boolean[] isNullVector = vector.isNull;
+			if (fieldIdx == -1) { // set as an object
+				for (int i = 0; i < childCount; i++) {
+					if (isNullVector[i]) {
+						vals[i] = null;
+					} else {
+						vals[i] = reader.apply(vector.vector[i]);
+					}
+				}
+			} else { // set as a field of Row
+				Row[] rows = (Row[]) vals;
+				for (int i = 0; i < childCount; i++) {
+					if (isNullVector[i]) {
+						rows[i].setField(fieldIdx, null);
+					} else {
+						rows[i].setField(fieldIdx, reader.apply(vector.vector[i]));
+					}
+				}
+			}
+		}
+	}
+
+	private static void readBytesColumnAsString(Object[] vals, int fieldIdx, BytesColumnVector bytes, int childCount) {
+
+		if (bytes.isRepeating) { // fill complete column with first value
+			if (bytes.isNull[0]) {
+				// fill vals with null values
+				fillColumnWithRepeatingValue(vals, fieldIdx, null, childCount);
+			} else {
+				// read repeating non-null value by forwarding call
+				readNonNullBytesColumnAsString(vals, fieldIdx, bytes, childCount);
+			}
+		} else {
+			boolean[] isNullVector = bytes.isNull;
+			if (fieldIdx == -1) { // set as an object
+				for (int i = 0; i < childCount; i++) {
+					if (isNullVector[i]) {
+						vals[i] = null;
+					} else {
+						vals[i] = readString(bytes.vector[i], bytes.start[i], bytes.length[i]);
+					}
+				}
+			} else { // set as a field of Row
+				Row[] rows = (Row[]) vals;
+				for (int i = 0; i < childCount; i++) {
+					if (isNullVector[i]) {
+						rows[i].setField(fieldIdx, null);
+					} else {
+						rows[i].setField(fieldIdx, readString(bytes.vector[i], bytes.start[i], bytes.length[i]));
+					}
+				}
+			}
+		}
+	}
+
+	private static void readBytesColumnAsBinary(Object[] vals, int fieldIdx, BytesColumnVector bytes, int childCount) {
+
+		if (bytes.isRepeating) { // fill complete column with first value
+			if (bytes.isNull[0]) {
+				// fill vals with null values
+				fillColumnWithRepeatingValue(vals, fieldIdx, null, childCount);
+			} else {
+				// read repeating non-null value by forwarding call
+				readNonNullBytesColumnAsBinary(vals, fieldIdx, bytes, childCount);
+			}
+		} else {
+			boolean[] isNullVector = bytes.isNull;
+			if (fieldIdx == -1) { // set as an object
+				for (int i = 0; i < childCount; i++) {
+					if (isNullVector[i]) {
+						vals[i] = null;
+					} else {
+						vals[i] = readBinary(bytes.vector[i], bytes.start[i], bytes.length[i]);
+					}
+				}
+			} else { // set as a field of Row
+				Row[] rows = (Row[]) vals;
+				for (int i = 0; i < childCount; i++) {
+					if (isNullVector[i]) {
+						rows[i].setField(fieldIdx, null);
+					} else {
+						rows[i].setField(fieldIdx, readBinary(bytes.vector[i], bytes.start[i], bytes.length[i]));
+					}
+				}
+			}
+		}
+	}
+
+	private static void readLongColumnAsDate(Object[] vals, int fieldIdx, LongColumnVector vector, int childCount) {
+
+		if (vector.isRepeating) { // fill complete column with first value
+			if (vector.isNull[0]) {
+				// fill vals with null values
+				fillColumnWithRepeatingValue(vals, fieldIdx, null, childCount);
+			} else {
+				// read repeating non-null value by forwarding call
+				readNonNullLongColumnAsDate(vals, fieldIdx, vector, childCount);
+			}
+		} else {
+			boolean[] isNullVector = vector.isNull;
+			if (fieldIdx == -1) { // set as an object
+				for (int i = 0; i < childCount; i++) {
+					if (isNullVector[i]) {
+						vals[i] = null;
+					} else {
+						vals[i] = readDate(vector.vector[i]);
+					}
+				}
+			} else { // set as a field of Row
+				Row[] rows = (Row[]) vals;
+				for (int i = 0; i < childCount; i++) {
+					if (isNullVector[i]) {
+						rows[i].setField(fieldIdx, null);
+					} else {
+						rows[i].setField(fieldIdx, readDate(vector.vector[i]));
+					}
+				}
+			}
+		}
+	}
+
+	private static void readTimestampColumn(Object[] vals, int fieldIdx, TimestampColumnVector vector, int childCount) {
+
+		if (vector.isRepeating) { // fill complete column with first value
+			if (vector.isNull[0]) {
+				// fill vals with null values
+				fillColumnWithRepeatingValue(vals, fieldIdx, null, childCount);
+			} else {
+				// read repeating non-null value by forwarding call
+				readNonNullTimestampColumn(vals, fieldIdx, vector, childCount);
+			}
+		} else {
+			boolean[] isNullVector = vector.isNull;
+			if (fieldIdx == -1) { // set as an object
+				for (int i = 0; i < childCount; i++) {
+					if (isNullVector[i]) {
+						vals[i] = null;
+					} else {
+						Timestamp ts = readTimestamp(vector.time[i], vector.nanos[i]);
+						vals[i] = ts;
+					}
+				}
+			} else { // set as a field of Row
+				Row[] rows = (Row[]) vals;
+				for (int i = 0; i < childCount; i++) {
+					if (isNullVector[i]) {
+						rows[i].setField(fieldIdx, null);
+					} else {
+						Timestamp ts = readTimestamp(vector.time[i], vector.nanos[i]);
+						rows[i].setField(fieldIdx, ts);
+					}
+				}
+			}
+		}
+	}
+
+	private static void readDecimalColumn(Object[] vals, int fieldIdx, DecimalColumnVector vector, int childCount) {
+
+		if (vector.isRepeating) { // fill complete column with first value
+			if (vector.isNull[0]) {
+				// fill vals with null values
+				fillColumnWithRepeatingValue(vals, fieldIdx, null, childCount);
+			} else {
+				// read repeating non-null value by forwarding call
+				readNonNullDecimalColumn(vals, fieldIdx, vector, childCount);
+			}
+		} else {
+			boolean[] isNullVector = vector.isNull;
+			if (fieldIdx == -1) { // set as an object
+				for (int i = 0; i < childCount; i++) {
+					if (isNullVector[i]) {
+						vals[i] = null;
+					} else {
+						vals[i] = readBigDecimal(vector.vector[i]);
+					}
+				}
+			} else { // set as a field of Row
+				Row[] rows = (Row[]) vals;
+				for (int i = 0; i < childCount; i++) {
+					if (isNullVector[i]) {
+						rows[i].setField(fieldIdx, null);
+					} else {
+						rows[i].setField(fieldIdx, readBigDecimal(vector.vector[i]));
+					}
+				}
+			}
+		}
+	}
+
+	private static void readStructColumn(Object[] vals, int fieldIdx, StructColumnVector structVector, TypeDescription schema, int childCount) {
+
+		List<TypeDescription> childrenTypes = schema.getChildren();
+
+		int numFields = childrenTypes.size();
+
+		// Early out if struct column is repeating and always null.
+		// This is the only repeating case we need to handle.
+		// ORC assumes that repeating values have been pushed to the children.
+		if (structVector.isRepeating && structVector.isNull[0]) {
+			if (fieldIdx < 0) {
+				for (int i = 0; i < childCount; i++) {
+					vals[i] = null;
+				}
+			} else {
+				for (int i = 0; i < childCount; i++) {
+					((Row) vals[i]).setField(fieldIdx, null);
+				}
+			}
+			return;
+		}
+
+		// create a batch of Rows to read the structs
+		Row[] structs = new Row[childCount];
+		// TODO: possible improvement: reuse existing Row objects
+		for (int i = 0; i < childCount; i++) {
+			structs[i] = new Row(numFields);
+		}
+
+		// read struct fields
+		for (int i = 0; i < numFields; i++) {
+			ColumnVector fieldVector = structVector.fields[i];
+			if (!fieldVector.isRepeating) {
+				// Reduce fieldVector reads by setting all entries null where struct is null.
+				if (fieldVector.noNulls) {
+					// fieldVector had no nulls. Just use struct null information.
+					System.arraycopy(structVector.isNull, 0, fieldVector.isNull, 0, structVector.isNull.length);
+					structVector.fields[i].noNulls = false;
+				} else {
+					// fieldVector had nulls. Merge field nulls with struct nulls.
+					for (int j = 0; j < structVector.isNull.length; j++) {
+						structVector.fields[i].isNull[j] = structVector.isNull[j] || structVector.fields[i].isNull[j];
+					}
+				}
+			}
+			readField(structs, i, childrenTypes.get(i), structVector.fields[i], childCount);
+		}
+
+		boolean[] isNullVector = structVector.isNull;
+
+		if (fieldIdx == -1) { // set struct as an object
+			for (int i = 0; i < childCount; i++) {
+				if (isNullVector[i]) {
+					vals[i] = null;
+				} else {
+					vals[i] = structs[i];
+				}
+			}
+		} else { // set struct as a field of Row
+			Row[] rows = (Row[]) vals;
+			for (int i = 0; i < childCount; i++) {
+				if (isNullVector[i]) {
+					rows[i].setField(fieldIdx, null);
+				} else {
+					rows[i].setField(fieldIdx, structs[i]);
+				}
+			}
+		}
+	}
+
+	private static void readListColumn(Object[] vals, int fieldIdx, ListColumnVector list, TypeDescription schema, int childCount) {
+
+		TypeDescription fieldType = schema.getChildren().get(0);
+		// get class of list elements
+		Class<?> classType = getClassForType(fieldType);
+
+		if (list.isRepeating) {
+			// list values are repeating. we only need to read the first list and copy it.
+
+			if (list.isNull[0]) {
+				// Even better. The first list is null and so are all lists are null
+				for (int i = 0; i < childCount; i++) {
+					if (fieldIdx == -1) {
+						vals[i] = null;
+					} else {
+						((Row) vals[i]).setField(fieldIdx, null);
+					}
+				}
+
+			} else {
+				// Get function to copy list
+				Function<Object, Object> copyList = getCopyFunction(schema);
+
+				int offset = (int) list.offsets[0];
+				int length = (int) list.lengths[0];
+				// we only need to read until offset + length.
+				int entriesToRead = offset + length;
+
+				// read entries
+				Object[] children = (Object[]) Array.newInstance(classType, entriesToRead);
+				readField(children, -1, fieldType, list.child, entriesToRead);
+
+				// create first list which will be copied
+				Object[] temp;
+				if (offset == 0) {
+					temp = children;
+				} else {
+					temp = (Object[]) Array.newInstance(classType, length);
+					System.arraycopy(children, offset, temp, 0, length);
+				}
+
+				// copy repeated list and set copy as result
+				for (int i = 0; i < childCount; i++) {
+					Object[] copy = (Object[]) copyList.apply(temp);
+					if (fieldIdx == -1) {
+						vals[i] = copy;
+					} else {
+						((Row) vals[i]).setField(fieldIdx, copy);
+					}
+				}
+			}
+
+		} else {
+			if (!list.child.isRepeating) {
+				boolean[] childIsNull = new boolean[list.childCount];
+				Arrays.fill(childIsNull, true);
+				// forward info of null lists into child vector
+				for (int i = 0; i < childCount; i++) {
+					// preserve isNull info of entries of non-null lists
+					if (!list.isNull[i]) {
+						int offset = (int) list.offsets[i];
+						int length = (int) list.lengths[i];
+						System.arraycopy(list.child.isNull, offset, childIsNull, offset, length);
+					}
+				}
+				// override isNull of children vector
+				list.child.isNull = childIsNull;
+				list.child.noNulls = false;
+			}
+
+			// read children
+			Object[] children = (Object[]) Array.newInstance(classType, list.childCount);
+			readField(children, -1, fieldType, list.child, list.childCount);
+
+			Object[] temp;
+			// fill lists with children
+			for (int i = 0; i < childCount; i++) {
+
+				if (list.isNull[i]) {
+					temp = null;
+				} else {
+					int offset = (int) list.offsets[i];
+					int length = (int) list.lengths[i];
+
+					temp = (Object[]) Array.newInstance(classType, length);
+					System.arraycopy(children, offset, temp, 0, length);
+				}
+
+				if (fieldIdx == -1) {
+					vals[i] = temp;
+				} else {
+					((Row) vals[i]).setField(fieldIdx, temp);
+				}
+			}
+		}
+	}
+
+	private static void readMapColumn(Object[] vals, int fieldIdx, MapColumnVector map, TypeDescription schema, int childCount) {
+
+		List<TypeDescription> fieldType = schema.getChildren();
+		TypeDescription keyType = fieldType.get(0);
+		TypeDescription valueType = fieldType.get(1);
+
+		ColumnVector keys = map.keys;
+		ColumnVector values = map.values;
+
+		if (map.isRepeating) {
+			// map values are repeating. we only need to read the first map and copy it.
+
+			if (map.isNull[0]) {
+				// Even better. The first map is null and so are all maps are null
+				for (int i = 0; i < childCount; i++) {
+					if (fieldIdx == -1) {
+						vals[i] = null;
+					} else {
+						((Row) vals[i]).setField(fieldIdx, null);
+					}
+				}
+
+			} else {
+				// Get function to copy map
+				Function<Object, Object> copyMap = getCopyFunction(schema);
+
+				int offset = (int) map.offsets[0];
+				int length = (int) map.lengths[0];
+				// we only need to read until offset + length.
+				int entriesToRead = offset + length;
+
+				Object[] keyRows = new Object[entriesToRead];
+				Object[] valueRows = new Object[entriesToRead];
+
+				// read map keys and values
+				readField(keyRows, -1, keyType, keys, entriesToRead);
+				readField(valueRows, -1, valueType, values, entriesToRead);
+
+				// create first map which will be copied
+				HashMap temp = readHashMap(keyRows, valueRows, offset, length);
+
+				// copy repeated map and set copy as result
+				for (int i = 0; i < childCount; i++) {
+					if (fieldIdx == -1) {
+						vals[i] = copyMap.apply(temp);
+					} else {
+						((Row) vals[i]).setField(fieldIdx, copyMap.apply(temp));
+					}
+				}
+			}
+		} else {
+			// ensure only keys and values that are referenced by non-null maps are set to non-null
+
+			if (!keys.isRepeating) {
+				// propagate is null info of map into keys vector
+				boolean[] keyIsNull = new boolean[map.childCount];
+				Arrays.fill(keyIsNull, true);
+				for (int i = 0; i < childCount; i++) {
+					// preserve isNull info for keys of non-null maps
+					if (!map.isNull[i]) {
+						int offset = (int) map.offsets[i];
+						int length = (int) map.lengths[i];
+						System.arraycopy(keys.isNull, offset, keyIsNull, offset, length);
+					}
+				}
+				// override isNull of keys vector
+				keys.isNull = keyIsNull;
+				keys.noNulls = false;
+			}
+			if (!values.isRepeating) {
+				// propagate is null info of map into values vector
+				boolean[] valIsNull = new boolean[map.childCount];
+				Arrays.fill(valIsNull, true);
+				for (int i = 0; i < childCount; i++) {
+					// preserve isNull info for vals of non-null maps
+					if (!map.isNull[i]) {
+						int offset = (int) map.offsets[i];
+						int length = (int) map.lengths[i];
+						System.arraycopy(values.isNull, offset, valIsNull, offset, length);
+					}
+				}
+				// override isNull of values vector
+				values.isNull = valIsNull;
+				values.noNulls = false;
+			}
+
+			Object[] keyRows = new Object[map.childCount];
+			Object[] valueRows = new Object[map.childCount];
+
+			// read map keys and values
+			readField(keyRows, -1, keyType, keys, keyRows.length);
+			readField(valueRows, -1, valueType, values, valueRows.length);
+
+			boolean[] isNullVector = map.isNull;
+			long[] lengths = map.lengths;
+			long[] offsets = map.offsets;
+
+			if (fieldIdx == -1) { // set map as an object
+				for (int i = 0; i < childCount; i++) {
+					if (isNullVector[i]) {
+						vals[i] = null;
+					} else {
+						vals[i] = readHashMap(keyRows, valueRows, (int) offsets[i], lengths[i]);
+					}
+				}
+			} else { // set map as a field of Row
+				Row[] rows = (Row[]) vals;
+				for (int i = 0; i < childCount; i++) {
+					if (isNullVector[i]) {
+						rows[i].setField(fieldIdx, null);
+					} else {
+						rows[i].setField(fieldIdx, readHashMap(keyRows, valueRows, (int) offsets[i], lengths[i]));
+					}
+				}
+			}
+		}
+	}
+
+	/**
+	 * Sets a repeating value to all objects or row fields of the passed vals array.
+	 *
+	 * @param vals The array of objects or Rows.
+	 * @param fieldIdx If the objs array is an array of Row, the index of the field that needs to be filled.
+	 *                 Otherwise a -1 must be passed and the data is directly filled into the array.
+	 * @param repeatingValue The value that is set.
+	 * @param childCount The number of times the value is set.
+	 */
+	private static void fillColumnWithRepeatingValue(Object[] vals, int fieldIdx, Object repeatingValue, int childCount) {
+
+		if (fieldIdx == -1) {
+			// set value as an object
+			Arrays.fill(vals, 0, childCount, repeatingValue);
+		} else {
+			// set value as a field of Row
+			Row[] rows = (Row[]) vals;
+			for (int i = 0; i < childCount; i++) {
+				rows[i].setField(fieldIdx, repeatingValue);
+			}
+		}
+	}
+
+	private static Class<?> getClassForType(TypeDescription schema) {
+
+		// check the type of the vector to decide how to read it.
+		switch (schema.getCategory()) {
+			case BOOLEAN:
+				return Boolean.class;
+			case BYTE:
+				return Byte.class;
+			case SHORT:
+				return Short.class;
+			case INT:
+				return Integer.class;
+			case LONG:
+				return Long.class;
+			case FLOAT:
+				return Float.class;
+			case DOUBLE:
+				return Double.class;
+			case CHAR:
+			case VARCHAR:
+			case STRING:
+				return String.class;
+			case DATE:
+				return Date.class;
+			case TIMESTAMP:
+				return Timestamp.class;
+			case BINARY:
+				return byte[].class;
+			case DECIMAL:
+				return BigDecimal.class;
+			case STRUCT:
+				return Row.class;
+			case LIST:
+				Class<?> childClass = getClassForType(schema.getChildren().get(0));
+				return Array.newInstance(childClass, 0).getClass();
+			case MAP:
+				return HashMap.class;
+			case UNION:
+				throw new UnsupportedOperationException("UNION type not supported yet");
+			default:
+				throw new IllegalArgumentException("Unknown type " + schema);
+		}
+	}
+
+	private static Boolean readBoolean(long l) {
+		return l != 0;
+	}
+
+	private static Byte readByte(long l) {
+		return (byte) l;
+	}
+
+	private static Short readShort(long l) {
+		return (short) l;
+	}
+
+	private static Integer readInt(long l) {
+		return (int) l;
+	}
+
+	private static Long readLong(long l) {
+		return l;
+	}
+
+	private static Float readFloat(double d) {
+		return (float) d;
+	}
+
+	private static Double readDouble(double d) {
+		return d;
+	}
+
+	private static Date readDate(long l) {
+		// day to milliseconds
+		final long t = l * MILLIS_PER_DAY;
+		// adjust by local timezone
+		return new java.sql.Date(t - LOCAL_TZ.getOffset(t));
+	}
+
+	private static String readString(byte[] bytes, int start, int length) {
+		return new String(bytes, start, length, StandardCharsets.UTF_8);
+	}
+
+	private static byte[] readBinary(byte[] src, int srcPos, int length) {
+		byte[] result = new byte[length];
+		System.arraycopy(src, srcPos, result, 0, length);
+		return result;
+	}
+
+	private static BigDecimal readBigDecimal(HiveDecimalWritable hiveDecimalWritable) {
+		HiveDecimal hiveDecimal = hiveDecimalWritable.getHiveDecimal();
+		return hiveDecimal.bigDecimalValue();
+	}
+
+	private static Timestamp readTimestamp(long time, int nanos) {
+		Timestamp ts = new Timestamp(time);
+		ts.setNanos(nanos);
+		return ts;
+	}
+
+	private static HashMap readHashMap(Object[] keyRows, Object[] valueRows, int offset, long length) {
+		HashMap<Object, Object> resultMap = new HashMap<>();
+		for (int j = 0; j < length; j++) {
+			resultMap.put(keyRows[offset], valueRows[offset]);
+			offset++;
+		}
+		return resultMap;
+	}
+
+	@SuppressWarnings("unchecked")
+	private static Function<Object, Object> getCopyFunction(TypeDescription schema) {
+		// check the type of the vector to decide how to read it.
+		switch (schema.getCategory()) {
+			case BOOLEAN:
+			case BYTE:
+			case SHORT:
+			case INT:
+			case LONG:
+			case FLOAT:
+			case DOUBLE:
+			case CHAR:
+			case VARCHAR:
+			case STRING:
+			case DECIMAL:
+				return OrcBatchReader::returnImmutable;
+			case DATE:
+				return OrcBatchReader::copyDate;
+			case TIMESTAMP:
+				return OrcBatchReader::copyTimestamp;
+			case BINARY:
+				return OrcBatchReader::copyBinary;
+			case STRUCT:
+				List<TypeDescription> fieldTypes = schema.getChildren();
+				Function<Object, Object>[] copyFields = new Function[fieldTypes.size()];
+				for (int i = 0; i < fieldTypes.size(); i++) {
+					copyFields[i] = getCopyFunction(fieldTypes.get(i));
+				}
+				return new CopyStruct(copyFields);
+			case LIST:
+				TypeDescription entryType = schema.getChildren().get(0);
+				Function<Object, Object> copyEntry = getCopyFunction(entryType);
+				Class entryClass = getClassForType(entryType);
+				return new CopyList(copyEntry, entryClass);
+			case MAP:
+				TypeDescription keyType = schema.getChildren().get(0);
+				TypeDescription valueType = schema.getChildren().get(1);
+				Function<Object, Object> copyKey = getCopyFunction(keyType);
+				Function<Object, Object> copyValue = getCopyFunction(valueType);
+				return new CopyMap(copyKey, copyValue);
+			case UNION:
+				throw new UnsupportedOperationException("UNION type not supported yet");
+			default:
+				throw new IllegalArgumentException("Unknown type " + schema);
+		}
+	}
+
+	private static Object returnImmutable(Object o) {
+		return o;
+	}
+
+	private static Date copyDate(Object o) {
+		if (o == null) {
+			return null;
+		} else {
+			long date = ((Date) o).getTime();
+			return new Date(date);
+		}
+	}
+
+	private static Timestamp copyTimestamp(Object o) {
+		if (o == null) {
+			return null;
+		} else {
+			long millis = ((Timestamp) o).getTime();
+			int nanos = ((Timestamp) o).getNanos();
+			Timestamp copy = new Timestamp(millis);
+			copy.setNanos(nanos);
+			return copy;
+		}
+	}
+
+	private static byte[] copyBinary(Object o) {
+		if (o == null) {
+			return null;
+		} else {
+			int length = ((byte[]) o).length;
+			return Arrays.copyOf((byte[]) o, length);
+		}
+	}
+
+	private static class CopyStruct implements Function<Object, Object> {
+
+		private final Function<Object, Object>[] copyFields;
+
+		CopyStruct(Function<Object, Object>[] copyFields) {
+			this.copyFields = copyFields;
+		}
+
+		@Override
+		public Object apply(Object o) {
+			if (o == null) {
+				return null;
+			} else {
+				Row r = (Row) o;
+				Row copy = new Row(copyFields.length);
+				for (int i = 0; i < copyFields.length; i++) {
+					copy.setField(i, copyFields[i].apply(r.getField(i)));
+				}
+				return copy;
+			}
+		}
+	}
+
+	private static class CopyList implements Function<Object, Object> {
+
+		private final Function<Object, Object> copyEntry;
+		private final Class entryClass;
+
+		CopyList(Function<Object, Object> copyEntry, Class entryClass) {
+			this.copyEntry = copyEntry;
+			this.entryClass = entryClass;
+		}
+
+		@Override
+		public Object apply(Object o) {
+			if (o == null) {
+				return null;
+			} else {
+				Object[] l = (Object[]) o;
+				Object[] copy = (Object[]) Array.newInstance(entryClass, l.length);
+				for (int i = 0; i < l.length; i++) {
+					copy[i] = copyEntry.apply(l[i]);
+				}
+				return copy;
+			}
+		}
+	}
+
+	@SuppressWarnings("unchecked")
+	private static class CopyMap implements Function<Object, Object> {
+
+		private final Function<Object, Object> copyKey;
+		private final Function<Object, Object> copyValue;
+
+		CopyMap(Function<Object, Object> copyKey, Function<Object, Object> copyValue) {
+			this.copyKey = copyKey;
+			this.copyValue = copyValue;
+		}
+
+		@Override
+		public Object apply(Object o) {
+			if (o == null) {
+				return null;
+			} else {
+				Map<Object, Object> m = (Map<Object, Object>) o;
+				HashMap<Object, Object> copy = new HashMap<>(m.size());
+
+				for (Map.Entry<Object, Object> e : m.entrySet()) {
+					Object keyCopy = copyKey.apply(e.getKey());
+					Object valueCopy = copyValue.apply(e.getValue());
+					copy.put(keyCopy, valueCopy);
+				}
+				return copy;
+			}
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/bcead3be/flink-connectors/flink-orc/src/main/java/org/apache/flink/orc/OrcRowInputFormat.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-orc/src/main/java/org/apache/flink/orc/OrcRowInputFormat.java b/flink-connectors/flink-orc/src/main/java/org/apache/flink/orc/OrcRowInputFormat.java
index 4353cbc..61575ad 100644
--- a/flink-connectors/flink-orc/src/main/java/org/apache/flink/orc/OrcRowInputFormat.java
+++ b/flink-connectors/flink-orc/src/main/java/org/apache/flink/orc/OrcRowInputFormat.java
@@ -55,7 +55,7 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
 
-import static org.apache.flink.orc.OrcUtils.fillRows;
+import static org.apache.flink.orc.OrcBatchReader.fillRows;
 
 /**
  * InputFormat to read ORC files.
@@ -128,7 +128,7 @@ public class OrcRowInputFormat extends FileInputFormat<Row> implements ResultTyp
 
 		// configure OrcRowInputFormat
 		this.schema = orcSchema;
-		this.rowType = (RowTypeInfo) OrcUtils.schemaToTypeInfo(schema);
+		this.rowType = (RowTypeInfo) OrcBatchReader.schemaToTypeInfo(schema);
 		this.conf = orcConfig;
 		this.batchSize = batchSize;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/bcead3be/flink-connectors/flink-orc/src/main/java/org/apache/flink/orc/OrcTableSource.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-orc/src/main/java/org/apache/flink/orc/OrcTableSource.java b/flink-connectors/flink-orc/src/main/java/org/apache/flink/orc/OrcTableSource.java
index 31a9303..0eab4a0 100644
--- a/flink-connectors/flink-orc/src/main/java/org/apache/flink/orc/OrcTableSource.java
+++ b/flink-connectors/flink-orc/src/main/java/org/apache/flink/orc/OrcTableSource.java
@@ -127,7 +127,7 @@ public class OrcTableSource
 		this.predicates = predicates;
 
 		// determine the type information from the ORC schema
-		RowTypeInfo typeInfoFromSchema = (RowTypeInfo) OrcUtils.schemaToTypeInfo(this.orcSchema);
+		RowTypeInfo typeInfoFromSchema = (RowTypeInfo) OrcBatchReader.schemaToTypeInfo(this.orcSchema);
 
 		// set return type info
 		if (selectedFields == null) {