You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2017/11/22 22:10:45 UTC

[8/9] flink git commit: [FLINK-2170] [connectors] Add OrcRowInputFormat and OrcTableSource.

http://git-wip-us.apache.org/repos/asf/flink/blob/200612ee/flink-connectors/flink-orc/src/main/java/org/apache/flink/orc/OrcUtils.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-orc/src/main/java/org/apache/flink/orc/OrcUtils.java b/flink-connectors/flink-orc/src/main/java/org/apache/flink/orc/OrcUtils.java
index c7557c7..cfb4e0e 100644
--- a/flink-connectors/flink-orc/src/main/java/org/apache/flink/orc/OrcUtils.java
+++ b/flink-connectors/flink-orc/src/main/java/org/apache/flink/orc/OrcUtils.java
@@ -39,29 +39,36 @@ import org.apache.hadoop.hive.ql.exec.vector.StructColumnVector;
 import org.apache.hadoop.hive.ql.exec.vector.TimestampColumnVector;
 import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
 import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable;
-
 import org.apache.orc.TypeDescription;
 
 import java.lang.reflect.Array;
 import java.math.BigDecimal;
+import java.nio.charset.StandardCharsets;
 import java.sql.Date;
 import java.sql.Timestamp;
 import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
+import java.util.TimeZone;
+import java.util.function.DoubleFunction;
+import java.util.function.IntFunction;
+import java.util.function.LongFunction;
 
 /**
  * A class that provides utility methods for orc file reading.
  */
-public class OrcUtils {
+class OrcUtils {
+
+	private static final long MILLIS_PER_DAY = 86400000; // = 24 * 60 * 60 * 1000
+	private static final TimeZone LOCAL_TZ = TimeZone.getDefault();
 
 	/**
-	 * Convert ORC schema types to Flink types.
-	 *
-	 * @param schema schema of orc file
+	 * Converts an ORC schema to a Flink TypeInformation.
 	 *
+	 * @param schema The ORC schema.
+	 * @return The TypeInformation that corresponds to the ORC schema.
 	 */
-	public static TypeInformation schemaToTypeInfo(TypeDescription schema) {
+	static TypeInformation schemaToTypeInfo(TypeDescription schema) {
 		switch (schema.getCategory()) {
 			case BOOLEAN:
 				return BasicTypeInfo.BOOLEAN_TYPE_INFO;
@@ -77,6 +84,8 @@ public class OrcUtils {
 				return BasicTypeInfo.FLOAT_TYPE_INFO;
 			case DOUBLE:
 				return BasicTypeInfo.DOUBLE_TYPE_INFO;
+			case DECIMAL:
+				return BasicTypeInfo.BIG_DEC_TYPE_INFO;
 			case STRING:
 			case CHAR:
 			case VARCHAR:
@@ -97,154 +106,164 @@ public class OrcUtils {
 				return new RowTypeInfo(fieldTypes, fieldNames);
 			case LIST:
 				TypeDescription elementSchema = schema.getChildren().get(0);
-				TypeInformation elementType = schemaToTypeInfo(elementSchema);
+				TypeInformation<?> elementType = schemaToTypeInfo(elementSchema);
+				// arrays of primitive types are handled as object arrays to support null values
 				return ObjectArrayTypeInfo.getInfoFor(elementType);
 			case MAP:
 				TypeDescription keySchema = schema.getChildren().get(0);
 				TypeDescription valSchema = schema.getChildren().get(1);
-				TypeInformation keyType = schemaToTypeInfo(keySchema);
-				TypeInformation valType = schemaToTypeInfo(valSchema);
-				return new MapTypeInfo(keyType, valType);
-			case DECIMAL:
-				return BasicTypeInfo.BIG_DEC_TYPE_INFO;
+				TypeInformation<?> keyType = schemaToTypeInfo(keySchema);
+				TypeInformation<?> valType = schemaToTypeInfo(valSchema);
+				return new MapTypeInfo<>(keyType, valType);
 			case UNION:
-				throw new UnsupportedOperationException("UNION type not supported yet.");
+				throw new UnsupportedOperationException("UNION type is not supported yet.");
 			default:
 				throw new IllegalArgumentException("Unknown type " + schema);
 		}
 	}
 
 	/**
-	 * Fill rows from orc batch.
-	 *
-	 * @param rows the batch of rows need to be filled
-	 * @param schema schema of orc file
-	 * @param batch current orc batch data used to fill the rows
-	 * @param fieldMapping field mapping
+	 * Fills an ORC batch into an array of Row.
 	 *
+	 * @param rows The batch of rows need to be filled.
+	 * @param schema The schema of the ORC data.
+	 * @param batch The ORC data.
+	 * @param selectedFields The list of selected ORC fields.
+	 * @return The number of rows that were filled.
 	 */
-	public static void fillRows(Object[] rows, TypeDescription schema, VectorizedRowBatch batch, int[] fieldMapping) {
+	static int fillRows(Row[] rows, TypeDescription schema, VectorizedRowBatch batch, int[] selectedFields) {
 
-		int totalRowsInBatch = (int) batch.count();
+		int rowsToRead = Math.min((int) batch.count(), rows.length);
 
 		List<TypeDescription> fieldTypes = schema.getChildren();
-		for (int outIdx = 0; outIdx < fieldMapping.length; outIdx++) {
-			int inIdx = fieldMapping[outIdx];
-			readField(rows, outIdx, fieldTypes.get(inIdx), batch.cols[inIdx], null, Math.min((int) totalRowsInBatch, rows.length));
+		// read each selected field
+		for (int rowIdx = 0; rowIdx < selectedFields.length; rowIdx++) {
+			int orcIdx = selectedFields[rowIdx];
+			readField(rows, rowIdx, fieldTypes.get(orcIdx), batch.cols[orcIdx], null, rowsToRead);
 		}
+		return rowsToRead;
 	}
 
-	private static void readField(Object[] rows, int fieldIdx, TypeDescription schema, ColumnVector vector, long[] lengthVector, int childCount) {
+	/**
+	 * Reads a vector of data into an array of objects.
+	 *
+	 * @param vals The array that needs to be filled.
+	 * @param fieldIdx If the vals array is an array of Row, the index of the field that needs to be filled.
+	 *                 Otherwise a -1 must be passed and the data is directly filled into the array.
+	 * @param schema The schema of the vector to read.
+	 * @param vector The vector to read.
+	 * @param lengthVector If the vector is of type List or Map, the number of sub-elements to read for each field. Otherwise, it must be null.
+	 * @param childCount The number of vector entries to read.
+	 */
+	private static void readField(Object[] vals, int fieldIdx, TypeDescription schema, ColumnVector vector, long[] lengthVector, int childCount) {
 
+		// check the type of the vector to decide how to read it.
 		switch (schema.getCategory()) {
 			case BOOLEAN:
 				if (vector.noNulls) {
-					readNonNullBooleanColumn(rows, fieldIdx, (LongColumnVector) vector, lengthVector, childCount);
+					readNonNullLongColumn(vals, fieldIdx, (LongColumnVector) vector, lengthVector, childCount, OrcUtils::readBoolean, OrcUtils::boolArray);
 				} else {
-					readBooleanColumn(rows, fieldIdx, (LongColumnVector) vector, lengthVector, childCount);
+					readLongColumn(vals, fieldIdx, (LongColumnVector) vector, lengthVector, childCount, OrcUtils::readBoolean, OrcUtils::boolArray);
 				}
 				break;
 			case BYTE:
 				if (vector.noNulls) {
-					readNonNullByteColumn(rows, fieldIdx, (LongColumnVector) vector, lengthVector, childCount);
+					readNonNullLongColumn(vals, fieldIdx, (LongColumnVector) vector, lengthVector, childCount, OrcUtils::readByte, OrcUtils::byteArray);
 				} else {
-					readByteColumn(rows, fieldIdx, (LongColumnVector) vector, lengthVector, childCount);
+					readLongColumn(vals, fieldIdx, (LongColumnVector) vector, lengthVector, childCount, OrcUtils::readByte, OrcUtils::byteArray);
 				}
 				break;
 			case SHORT:
 				if (vector.noNulls) {
-					readNonNullShortColumn(rows, fieldIdx, (LongColumnVector) vector, lengthVector, childCount);
+					readNonNullLongColumn(vals, fieldIdx, (LongColumnVector) vector, lengthVector, childCount, OrcUtils::readShort, OrcUtils::shortArray);
 				} else {
-					readShortColumn(rows, fieldIdx, (LongColumnVector) vector, lengthVector, childCount);
+					readLongColumn(vals, fieldIdx, (LongColumnVector) vector, lengthVector, childCount, OrcUtils::readShort, OrcUtils::shortArray);
 				}
 				break;
 			case INT:
 				if (vector.noNulls) {
-					readNonNullIntColumn(rows, fieldIdx, (LongColumnVector) vector, lengthVector, childCount);
+					readNonNullLongColumn(vals, fieldIdx, (LongColumnVector) vector, lengthVector, childCount, OrcUtils::readInt, OrcUtils::intArray);
 				} else {
-					readIntColumn(rows, fieldIdx, (LongColumnVector) vector, lengthVector, childCount);
+					readLongColumn(vals, fieldIdx, (LongColumnVector) vector, lengthVector, childCount, OrcUtils::readInt, OrcUtils::intArray);
 				}
 				break;
 			case LONG:
 				if (vector.noNulls) {
-					readNonNullLongColumn(rows, fieldIdx, (LongColumnVector) vector, lengthVector, childCount);
+					readNonNullLongColumn(vals, fieldIdx, (LongColumnVector) vector, lengthVector, childCount, OrcUtils::readLong, OrcUtils::longArray);
 				} else {
-					readLongColumn(rows, fieldIdx, (LongColumnVector) vector, lengthVector, childCount);
+					readLongColumn(vals, fieldIdx, (LongColumnVector) vector, lengthVector, childCount, OrcUtils::readLong, OrcUtils::longArray);
 				}
 				break;
 			case FLOAT:
 				if (vector.noNulls) {
-					readNonNullFloatColumn(rows, fieldIdx, (DoubleColumnVector) vector, lengthVector, childCount);
+					readNonNullDoubleColumn(vals, fieldIdx, (DoubleColumnVector) vector, lengthVector, childCount, OrcUtils::readFloat, OrcUtils::floatArray);
 				} else {
-					readFloatColumn(rows, fieldIdx, (DoubleColumnVector) vector, lengthVector, childCount);
+					readDoubleColumn(vals, fieldIdx, (DoubleColumnVector) vector, lengthVector, childCount, OrcUtils::readFloat, OrcUtils::floatArray);
 				}
 				break;
 			case DOUBLE:
 				if (vector.noNulls) {
-					readNonNullDoubleColumn(rows, fieldIdx, (DoubleColumnVector) vector, lengthVector, childCount);
+					readNonNullDoubleColumn(vals, fieldIdx, (DoubleColumnVector) vector, lengthVector, childCount, OrcUtils::readDouble, OrcUtils::doubleArray);
 				} else {
-					readDoubleColumn(rows, fieldIdx, (DoubleColumnVector) vector, lengthVector, childCount);
+					readDoubleColumn(vals, fieldIdx, (DoubleColumnVector) vector, lengthVector, childCount, OrcUtils::readDouble, OrcUtils::doubleArray);
 				}
 				break;
 			case CHAR:
 			case VARCHAR:
 			case STRING:
 				if (vector.noNulls) {
-					readNonNullStringColumn(rows, fieldIdx, (BytesColumnVector) vector, lengthVector, childCount);
+					readNonNullBytesColumnAsString(vals, fieldIdx, (BytesColumnVector) vector, lengthVector, childCount);
 				} else {
-					readStringColumn(rows, fieldIdx, (BytesColumnVector) vector, lengthVector, childCount);
+					readBytesColumnAsString(vals, fieldIdx, (BytesColumnVector) vector, lengthVector, childCount);
 				}
 				break;
 			case DATE:
 				if (vector.noNulls) {
-					readNonNullDateColumn(rows, fieldIdx, (LongColumnVector) vector, lengthVector, childCount);
+					readNonNullLongColumnAsDate(vals, fieldIdx, (LongColumnVector) vector, lengthVector, childCount);
 				} else {
-					readDateColumn(rows, fieldIdx, (LongColumnVector) vector, lengthVector, childCount);
+					readLongColumnAsDate(vals, fieldIdx, (LongColumnVector) vector, lengthVector, childCount);
 				}
 				break;
 			case TIMESTAMP:
 				if (vector.noNulls) {
-					readNonNullTimestampColumn(rows, fieldIdx, (TimestampColumnVector) vector, lengthVector, childCount);
+					readNonNullTimestampColumn(vals, fieldIdx, (TimestampColumnVector) vector, lengthVector, childCount);
 				} else {
-					readTimestampColumn(rows, fieldIdx, (TimestampColumnVector) vector, lengthVector, childCount);
+					readTimestampColumn(vals, fieldIdx, (TimestampColumnVector) vector, lengthVector, childCount);
 				}
 				break;
 			case BINARY:
 				if (vector.noNulls) {
-					readNonNullBinaryColumn(rows, fieldIdx, (BytesColumnVector) vector, lengthVector, childCount);
+					readNonNullBytesColumnAsBinary(vals, fieldIdx, (BytesColumnVector) vector, lengthVector, childCount);
 				} else {
-					readBinaryColumn(rows, fieldIdx, (BytesColumnVector) vector, lengthVector, childCount);
+					readBytesColumnAsBinary(vals, fieldIdx, (BytesColumnVector) vector, lengthVector, childCount);
 				}
 				break;
 			case DECIMAL:
 				if (vector.noNulls) {
-					readNonNullDecimalColumn(rows, fieldIdx, (DecimalColumnVector) vector, lengthVector, childCount);
-				}
-				else {
-					readDecimalColumn(rows, fieldIdx, (DecimalColumnVector) vector, lengthVector, childCount);
+					readNonNullDecimalColumn(vals, fieldIdx, (DecimalColumnVector) vector, lengthVector, childCount);
+				} else {
+					readDecimalColumn(vals, fieldIdx, (DecimalColumnVector) vector, lengthVector, childCount);
 				}
 				break;
 			case STRUCT:
 				if (vector.noNulls) {
-					readNonNullStructColumn(rows, fieldIdx, (StructColumnVector) vector, schema, lengthVector, childCount);
+					readNonNullStructColumn(vals, fieldIdx, (StructColumnVector) vector, schema, lengthVector, childCount);
 				} else {
-					readStructColumn(rows, fieldIdx, (StructColumnVector) vector, schema, lengthVector, childCount);
+					readStructColumn(vals, fieldIdx, (StructColumnVector) vector, schema, lengthVector, childCount);
 				}
 				break;
 			case LIST:
 				if (vector.noNulls) {
-					readNonNullListColumn(rows, fieldIdx, (ListColumnVector) vector, schema, lengthVector, childCount);
-				}
-				else {
-					readListColumn(rows, fieldIdx, (ListColumnVector) vector, schema, lengthVector, childCount);
+					readNonNullListColumn(vals, fieldIdx, (ListColumnVector) vector, schema, lengthVector, childCount);
+				} else {
+					readListColumn(vals, fieldIdx, (ListColumnVector) vector, schema, lengthVector, childCount);
 				}
 				break;
 			case MAP:
 				if (vector.noNulls) {
-					readNonNullMapColumn(rows, fieldIdx, (MapColumnVector) vector, schema, lengthVector, childCount);
-				}
-				else {
-					readMapColumn(rows, fieldIdx, (MapColumnVector) vector, schema, lengthVector, childCount);
+					readNonNullMapColumn(vals, fieldIdx, (MapColumnVector) vector, schema, lengthVector, childCount);
+				} else {
+					readMapColumn(vals, fieldIdx, (MapColumnVector) vector, schema, lengthVector, childCount);
 				}
 				break;
 			case UNION:
@@ -254,1870 +273,1013 @@ public class OrcUtils {
 		}
 	}
 
-	private static void readNonNullBooleanColumn(Object[] rows, int fieldIdx, LongColumnVector vector, long[] lengthVector, int childCount) {
+	private static <T> void readNonNullLongColumn(Object[] vals, int fieldIdx, LongColumnVector vector, long[] lengthVector, int childCount,
+													LongFunction<T> reader, IntFunction<T[]> array) {
 
-		// check if boolean is directly in a list or not, e.g, array<boolean>
+		// check if the values need to be read into lists or as single values
 		if (lengthVector == null) {
 			if (vector.isRepeating) { // fill complete column with first value
-				boolean repeatingValue = vector.vector[0] != 0;
-				fillColumnWithRepeatingValue(rows, fieldIdx, repeatingValue, childCount);
+				T repeatingValue = reader.apply(vector.vector[0]);
+				fillColumnWithRepeatingValue(vals, fieldIdx, repeatingValue, childCount);
 			} else {
 				if (fieldIdx == -1) { // set as an object
 					for (int i = 0; i < childCount; i++) {
-						rows[i] = vector.vector[i] != 0;
+						vals[i] = reader.apply(vector.vector[i]);
 					}
 				} else { // set as a field of Row
+					Row[] rows = (Row[]) vals;
 					for (int i = 0; i < childCount; i++) {
-						((Row) rows[i]).setField(fieldIdx, vector.vector[i] != 0);
+						rows[i].setField(fieldIdx, reader.apply(vector.vector[i]));
 					}
 				}
 			}
 		} else { // in a list
-			boolean[] temp;
+			T[] temp;
 			int offset = 0;
 			if (vector.isRepeating) { // fill complete list with first value
-				boolean repeatingValue = vector.vector[0] != 0;
-				if (fieldIdx == -1) { // set list as an object
-					for (int i = 0; offset < childCount; i++) {
-						temp = new boolean[(int) lengthVector[i]];
-						Arrays.fill(temp, repeatingValue);
-						rows[i] = temp;
-						offset += temp.length;
-					}
-				} else { // set list as a field of Row
-					for (int i = 0; offset < childCount; i++) {
-						temp = new boolean[(int) lengthVector[i]];
-						Arrays.fill(temp, repeatingValue);
-						((Row) rows[i]).setField(fieldIdx, temp);
-						offset += temp.length;
+				T repeatingValue = reader.apply(vector.vector[0]);
+				for (int i = 0; offset < childCount; i++) {
+					temp = array.apply((int) lengthVector[i]);
+					Arrays.fill(temp, repeatingValue);
+					offset += temp.length;
+					if (fieldIdx == -1) {
+						vals[i] = temp;
+					} else {
+						((Row) vals[i]).setField(fieldIdx, temp);
 					}
 				}
 			} else {
-				if (fieldIdx == -1) { // set list as an object
-					for (int i = 0; offset < childCount; i++) {
-						temp = new boolean[(int) lengthVector[i]];
-						for (int j = 0; j < temp.length; j++) {
-							temp[j] = vector.vector[offset++] != 0;
-						}
-						rows[i] = temp;
+				for (int i = 0; offset < childCount; i++) {
+					temp = array.apply((int) lengthVector[i]);
+					for (int j = 0; j < temp.length; j++) {
+						temp[j] = reader.apply(vector.vector[offset++]);
 					}
-				} else { // set list as a field of Row
-					for (int i = 0; offset < childCount; i++) {
-						temp = new boolean[(int) lengthVector[i]];
-						for (int j = 0; j < temp.length; j++) {
-							temp[j] = vector.vector[offset++] != 0;
-						}
-						((Row) rows[i]).setField(fieldIdx, temp);
+					if (fieldIdx == -1) {
+						vals[i] = temp;
+					} else {
+						((Row) vals[i]).setField(fieldIdx, temp);
 					}
 				}
 			}
 		}
 	}
 
-	private static void readNonNullByteColumn(Object[] rows, int fieldIdx, LongColumnVector vector, long[] lengthVector, int childCount) {
+	private static <T> void readNonNullDoubleColumn(Object[] vals, int fieldIdx, DoubleColumnVector vector, long[] lengthVector, int childCount,
+													DoubleFunction<T> reader, IntFunction<T[]> array) {
 
-		// check if byte is directly in a list or not, e.g, array<byte>
+		// check if the values need to be read into lists or as single values
 		if (lengthVector == null) {
 			if (vector.isRepeating) { // fill complete column with first value
-				byte repeatingValue = (byte) vector.vector[0];
-				fillColumnWithRepeatingValue(rows, fieldIdx, repeatingValue, childCount);
+				T repeatingValue = reader.apply(vector.vector[0]);
+				fillColumnWithRepeatingValue(vals, fieldIdx, repeatingValue, childCount);
 			} else {
 				if (fieldIdx == -1) { // set as an object
 					for (int i = 0; i < childCount; i++) {
-						rows[i] = (byte) vector.vector[i];
+						vals[i] = reader.apply(vector.vector[i]);
 					}
 				} else { // set as a field of Row
+					Row[] rows = (Row[]) vals;
 					for (int i = 0; i < childCount; i++) {
-						((Row) rows[i]).setField(fieldIdx, (byte) vector.vector[i]);
+						rows[i].setField(fieldIdx, reader.apply(vector.vector[i]));
 					}
 				}
 			}
 		} else { // in a list
-			byte[] temp;
+			T[] temp;
 			int offset = 0;
 			if (vector.isRepeating) { // fill complete list with first value
-				byte repeatingValue = (byte) vector.vector[0];
-				if (fieldIdx == -1) { // set list as an object
-					for (int i = 0; offset < childCount; i++) {
-						temp = new byte[(int) lengthVector[i]];
-						Arrays.fill(temp, repeatingValue);
-						rows[i] = temp;
-						offset += temp.length;
-					}
-				} else { // set list as a field of Row
-					for (int i = 0; offset < childCount; i++) {
-						temp = new byte[(int) lengthVector[i]];
-						Arrays.fill(temp, repeatingValue);
-						((Row) rows[i]).setField(fieldIdx, temp);
-						offset += temp.length;
+				T repeatingValue = reader.apply(vector.vector[0]);
+				for (int i = 0; offset < childCount; i++) {
+					temp = array.apply((int) lengthVector[i]);
+					Arrays.fill(temp, repeatingValue);
+					offset += temp.length;
+					if (fieldIdx == -1) {
+						vals[i] = temp;
+					} else {
+						((Row) vals[i]).setField(fieldIdx, temp);
 					}
 				}
 			} else {
-				if (fieldIdx == -1) { // set list as an object
-					for (int i = 0; offset < childCount; i++) {
-						temp = new byte[(int) lengthVector[i]];
-						for (int j = 0; j < temp.length; j++) {
-							temp[j] = (byte) vector.vector[offset++];
-						}
-						rows[i] = temp;
+				for (int i = 0; offset < childCount; i++) {
+					temp = array.apply((int) lengthVector[i]);
+					for (int j = 0; j < temp.length; j++) {
+						temp[j] = reader.apply(vector.vector[offset++]);
 					}
-				} else { // set list as a field of Row
-					for (int i = 0; offset < childCount; i++) {
-						temp = new byte[(int) lengthVector[i]];
-						for (int j = 0; j < temp.length; j++) {
-							temp[j] = (byte) vector.vector[offset++];
-						}
-						((Row) rows[i]).setField(fieldIdx, temp);
+					if (fieldIdx == -1) {
+						vals[i] = temp;
+					} else {
+						((Row) vals[i]).setField(fieldIdx, temp);
 					}
 				}
 			}
 		}
 	}
 
-	private static void readNonNullShortColumn(Object[] rows, int fieldIdx, LongColumnVector vector, long[] lengthVector, int childCount) {
-
-		// check if short is directly in a list or not, e.g, array<short>
+	private static void readNonNullBytesColumnAsString(Object[] vals, int fieldIdx, BytesColumnVector bytes, long[] lengthVector, int childCount) {
+		// check if the values need to be read into lists or as single values
 		if (lengthVector == null) {
-			if (vector.isRepeating) { // fill complete column with first value
-				short repeatingValue = (short) vector.vector[0];
-				fillColumnWithRepeatingValue(rows, fieldIdx, repeatingValue, childCount);
+			if (bytes.isRepeating) { // fill complete column with first value
+				String repeatingValue = new String(bytes.vector[0], bytes.start[0], bytes.length[0]);
+				fillColumnWithRepeatingValue(vals, fieldIdx, repeatingValue, childCount);
 			} else {
 				if (fieldIdx == -1) { // set as an object
 					for (int i = 0; i < childCount; i++) {
-						rows[i] = (short) vector.vector[i];
+						vals[i] = new String(bytes.vector[i], bytes.start[i], bytes.length[i], StandardCharsets.UTF_8);
 					}
 				} else { // set as a field of Row
+					Row[] rows = (Row[]) vals;
 					for (int i = 0; i < childCount; i++) {
-						((Row) rows[i]).setField(fieldIdx, (short) vector.vector[i]);
+						rows[i].setField(fieldIdx, new String(bytes.vector[i], bytes.start[i], bytes.length[i], StandardCharsets.UTF_8));
 					}
 				}
 			}
-		} else { // in a list
-			short[] temp;
+		} else {
+			String[] temp;
 			int offset = 0;
-			if (vector.isRepeating) { // fill complete list with first value
-				short repeatingValue = (short) vector.vector[0];
-				if (fieldIdx == -1) { // set list as an object
-					for (int i = 0; offset < childCount; i++) {
-						temp = new short[(int) lengthVector[i]];
-						Arrays.fill(temp, repeatingValue);
-						rows[i] = temp;
-						offset += temp.length;
-					}
-				} else { // set list as a field of Row
-					for (int i = 0; offset < childCount; i++) {
-						temp = new short[(int) lengthVector[i]];
-						Arrays.fill(temp, repeatingValue);
-						((Row) rows[i]).setField(fieldIdx, temp);
-						offset += temp.length;
+			if (bytes.isRepeating) { // fill complete list with first value
+				String repeatingValue = new String(bytes.vector[0], bytes.start[0], bytes.length[0], StandardCharsets.UTF_8);
+				for (int i = 0; offset < childCount; i++) {
+					temp = new String[(int) lengthVector[i]];
+					Arrays.fill(temp, repeatingValue);
+					offset += temp.length;
+					if (fieldIdx == -1) {
+						vals[i] = temp;
+					} else {
+						((Row) vals[i]).setField(fieldIdx, temp);
 					}
 				}
 			} else {
-				if (fieldIdx == -1) { // set list as an object
-					for (int i = 0; offset < childCount; i++) {
-						temp = new short[(int) lengthVector[i]];
-						for (int j = 0; j < temp.length; j++) {
-							temp[j] = (short) vector.vector[offset++];
-						}
-						rows[i] = temp;
+				for (int i = 0; offset < childCount; i++) {
+					temp = new String[(int) lengthVector[i]];
+					for (int j = 0; j < temp.length; j++) {
+						temp[j] = new String(bytes.vector[offset], bytes.start[offset], bytes.length[offset], StandardCharsets.UTF_8);
+						offset++;
 					}
-				} else { // set list as a field of Row
-					for (int i = 0; offset < childCount; i++) {
-						temp = new short[(int) lengthVector[i]];
-						for (int j = 0; j < temp.length; j++) {
-							temp[j] = (short) vector.vector[offset++];
-						}
-						((Row) rows[i]).setField(fieldIdx, temp);
+					if (fieldIdx == -1) {
+						vals[i] = temp;
+					} else {
+						((Row) vals[i]).setField(fieldIdx, temp);
 					}
 				}
 			}
 		}
 	}
 
-	private static void readNonNullIntColumn(Object[] rows, int fieldIdx, LongColumnVector vector, long[] lengthVector, int childCount) {
-
-		// check if int is directly in a list or not, e.g, array<int>
+	private static void readNonNullBytesColumnAsBinary(Object[] vals, int fieldIdx, BytesColumnVector bytes, long[] lengthVector, int childCount) {
+		// check if the values need to be read into lists or as single values
 		if (lengthVector == null) {
-			if (vector.isRepeating) { // fill complete column with first value
-				int repeatingValue = (int) vector.vector[0];
-				fillColumnWithRepeatingValue(rows, fieldIdx, repeatingValue, childCount);
+			if (bytes.isRepeating) { // fill complete column with first value
+				if (fieldIdx == -1) { // set as an object
+					for (int i = 0; i < childCount; i++) {
+						// don't reuse repeating val to avoid object mutation
+						vals[i] = readBinary(bytes.vector[0], bytes.start[0], bytes.length[0]);
+					}
+				} else { // set as a field of Row
+					Row[] rows = (Row[]) vals;
+					for (int i = 0; i < childCount; i++) {
+						// don't reuse repeating val to avoid object mutation
+						rows[i].setField(fieldIdx, readBinary(bytes.vector[0], bytes.start[0], bytes.length[0]));
+					}
+				}
 			} else {
 				if (fieldIdx == -1) { // set as an object
 					for (int i = 0; i < childCount; i++) {
-						rows[i] = (int) vector.vector[i];
+						vals[i] = readBinary(bytes.vector[i], bytes.start[i], bytes.length[i]);
 					}
 				} else { // set as a field of Row
+					Row[] rows = (Row[]) vals;
 					for (int i = 0; i < childCount; i++) {
-						((Row) rows[i]).setField(fieldIdx, (int) vector.vector[i]);
+						rows[i].setField(fieldIdx, readBinary(bytes.vector[i], bytes.start[i], bytes.length[i]));
 					}
 				}
 			}
-		} else { // in a list
-			int[] temp;
+		} else {
+			byte[][] temp;
 			int offset = 0;
-			if (vector.isRepeating) { // fill complete list with first value
-				int repeatingValue = (int) vector.vector[0];
-				if (fieldIdx == -1) { // set list as an object
-					for (int i = 0; offset < childCount; i++) {
-						temp = new int[(int) lengthVector[i]];
-						Arrays.fill(temp, repeatingValue);
-						rows[i] = temp;
-						offset += temp.length;
-					}
-				} else { // set list as a field of Row
-					for (int i = 0; offset < childCount; i++) {
-						temp = new int[(int) lengthVector[i]];
-						Arrays.fill(temp, repeatingValue);
-						((Row) rows[i]).setField(fieldIdx, temp);
-						offset += temp.length;
+			if (bytes.isRepeating) { // fill complete list with first value
+				for (int i = 0; offset < childCount; i++) {
+					temp = new byte[(int) lengthVector[i]][];
+					for (int j = 0; j < temp.length; j++) {
+						temp[j] = readBinary(bytes.vector[0], bytes.start[0], bytes.length[0]);
+					}
+					offset += temp.length;
+					if (fieldIdx == -1) {
+						vals[i] = temp;
+					} else {
+						((Row) vals[i]).setField(fieldIdx, temp);
 					}
 				}
 			} else {
-				if (fieldIdx == -1) { // set list as an object
-					for (int i = 0; offset < childCount; i++) {
-						temp = new int[(int) lengthVector[i]];
-						for (int j = 0; j < temp.length; j++) {
-							temp[j] = (int) vector.vector[offset++];
-						}
-						rows[i] = temp;
+				for (int i = 0; offset < childCount; i++) {
+					temp = new byte[(int) lengthVector[i]][];
+					for (int j = 0; j < temp.length; j++) {
+						temp[j] = readBinary(bytes.vector[offset], bytes.start[offset], bytes.length[offset]);
+						offset++;
 					}
-				} else { // set list as a field of Row
-					for (int i = 0; offset < childCount; i++) {
-						temp = new int[(int) lengthVector[i]];
-						for (int j = 0; j < temp.length; j++) {
-							temp[j] = (int) vector.vector[offset++];
-						}
-						((Row) rows[i]).setField(fieldIdx, temp);
+					if (fieldIdx == -1) {
+						vals[i] = temp;
+					} else {
+						((Row) vals[i]).setField(fieldIdx, temp);
 					}
 				}
 			}
 		}
 	}
 
-	private static void readNonNullLongColumn(Object[] rows, int fieldIdx, LongColumnVector vector, long[] lengthVector, int childCount) {
+	private static void readNonNullLongColumnAsDate(Object[] vals, int fieldIdx, LongColumnVector vector, long[] lengthVector, int childCount) {
 
-		// check if long is directly in a list or not, e.g, array<long>
+		// check if the values need to be read into lists or as single values
 		if (lengthVector == null) {
 			if (vector.isRepeating) { // fill complete column with first value
-				long repeatingValue = vector.vector[0];
-				fillColumnWithRepeatingValue(rows, fieldIdx, repeatingValue, childCount);
+				if (fieldIdx == -1) { // set as an object
+					for (int i = 0; i < childCount; i++) {
+						// do not reuse repeated value due to mutability of Date
+						vals[i] = readDate(vector.vector[0]);
+					}
+				} else { // set as a field of Row
+					Row[] rows = (Row[]) vals;
+					for (int i = 0; i < childCount; i++) {
+						// do not reuse repeated value due to mutability of Date
+						rows[i].setField(fieldIdx, readDate(vector.vector[0]));
+					}
+				}
 			} else {
 				if (fieldIdx == -1) { // set as an object
 					for (int i = 0; i < childCount; i++) {
-						rows[i] = vector.vector[i];
+						vals[i] = readDate(vector.vector[i]);
 					}
 				} else { // set as a field of Row
+					Row[] rows = (Row[]) vals;
 					for (int i = 0; i < childCount; i++) {
-						((Row) rows[i]).setField(fieldIdx, (Long) vector.vector[i]);
+						rows[i].setField(fieldIdx, readDate(vector.vector[i]));
 					}
 				}
 			}
 		} else { // in a list
-			long[] temp;
+			Date[] temp;
 			int offset = 0;
 			if (vector.isRepeating) { // fill complete list with first value
-				long repeatingValue = vector.vector[0];
-				if (fieldIdx == -1) { // set list as an object
-					for (int i = 0; offset < childCount; i++) {
-						temp = new long[(int) lengthVector[i]];
-						Arrays.fill(temp, repeatingValue);
-						rows[i] = temp;
-						offset += temp.length;
-					}
-				} else { // set list as a field of Row
-					for (int i = 0; offset < childCount; i++) {
-						temp = new long[(int) lengthVector[i]];
-						Arrays.fill(temp, repeatingValue);
-						((Row) rows[i]).setField(fieldIdx, temp);
-						offset += temp.length;
+				for (int i = 0; offset < childCount; i++) {
+					temp = new Date[(int) lengthVector[i]];
+					for (int j = 0; j < temp.length; j++) {
+						temp[j] = readDate(vector.vector[0]);
+					}
+					offset += temp.length;
+					if (fieldIdx == -1) {
+						vals[i] = temp;
+					} else {
+						((Row) vals[i]).setField(fieldIdx, temp);
 					}
 				}
 			} else {
-				if (fieldIdx == -1) { // set list as an object
-					for (int i = 0; offset < childCount; i++) {
-						temp = new long[(int) lengthVector[i]];
-						for (int j = 0; j < temp.length; j++) {
-							temp[j] = vector.vector[offset++];
-						}
-						rows[i] = temp;
+				for (int i = 0; offset < childCount; i++) {
+					temp = new Date[(int) lengthVector[i]];
+					for (int j = 0; j < temp.length; j++) {
+						temp[j] = readDate(vector.vector[offset++]);
 					}
-				} else { // set list as a field of Row
-					for (int i = 0; offset < childCount; i++) {
-						temp = new long[(int) lengthVector[i]];
-						for (int j = 0; j < temp.length; j++) {
-							temp[j] = vector.vector[offset++];
-						}
-						((Row) rows[i]).setField(fieldIdx, temp);
+					if (fieldIdx == -1) {
+						vals[i] = temp;
+					} else {
+						((Row) vals[i]).setField(fieldIdx, temp);
 					}
 				}
 			}
 		}
 	}
 
-	private static void readNonNullFloatColumn(Object[] rows, int fieldIdx, DoubleColumnVector vector, long[] lengthVector, int childCount) {
+	private static void readNonNullTimestampColumn(Object[] vals, int fieldIdx, TimestampColumnVector vector, long[] lengthVector, int childCount) {
 
-		// check if float is directly in a list or not, e.g, array<float>
+		// check if the timestamps need to be read into lists or as single values
 		if (lengthVector == null) {
 			if (vector.isRepeating) { // fill complete column with first value
-				float repeatingValue = (float) vector.vector[0];
-				fillColumnWithRepeatingValue(rows, fieldIdx, repeatingValue, childCount);
+				if (fieldIdx == -1) { // set as an object
+					for (int i = 0; i < childCount; i++) {
+						// do not reuse value to prevent object mutation
+						vals[i] = readTimestamp(vector.time[0], vector.nanos[0]);
+					}
+				} else { // set as a field of Row
+					Row[] rows = (Row[]) vals;
+					for (int i = 0; i < childCount; i++) {
+						// do not reuse value to prevent object mutation
+						rows[i].setField(fieldIdx, readTimestamp(vector.time[0], vector.nanos[0]));
+					}
+				}
 			} else {
 				if (fieldIdx == -1) { // set as an object
 					for (int i = 0; i < childCount; i++) {
-						rows[i] = (float) vector.vector[i];
+						vals[i] = readTimestamp(vector.time[i], vector.nanos[i]);
 					}
 				} else { // set as a field of Row
+					Row[] rows = (Row[]) vals;
 					for (int i = 0; i < childCount; i++) {
-						((Row) rows[i]).setField(fieldIdx, (float) vector.vector[i]);
+						rows[i].setField(fieldIdx, readTimestamp(vector.time[i], vector.nanos[i]));
 					}
 				}
 			}
-		} else { // in a list
-			float[] temp;
+		} else {
+			Timestamp[] temp;
 			int offset = 0;
 			if (vector.isRepeating) { // fill complete list with first value
-				float repeatingValue = (float) vector.vector[0];
-				if (fieldIdx == -1) { // set list as an object
-					for (int i = 0; offset < childCount; i++) {
-						temp = new float[(int) lengthVector[i]];
-						Arrays.fill(temp, repeatingValue);
-						rows[i] = temp;
-						offset += temp.length;
-					}
-				} else { // set list as a field of Row
-					for (int i = 0; offset < childCount; i++) {
-						temp = new float[(int) lengthVector[i]];
-						Arrays.fill(temp, repeatingValue);
-						((Row) rows[i]).setField(fieldIdx, temp);
-						offset += temp.length;
+				for (int i = 0; offset < childCount; i++) {
+					temp = new Timestamp[(int) lengthVector[i]];
+					for (int j = 0; j < temp.length; j++) {
+						// do not reuse value to prevent object mutation
+						temp[j] = readTimestamp(vector.time[0], vector.nanos[0]);
+					}
+					offset += temp.length;
+					if (fieldIdx == -1) {
+						vals[i] = temp;
+					} else {
+						((Row) vals[i]).setField(fieldIdx, temp);
 					}
 				}
 			} else {
-				if (fieldIdx == -1) { // set list as an object
-					for (int i = 0; offset < childCount; i++) {
-						temp = new float[(int) lengthVector[i]];
-						for (int j = 0; j < temp.length; j++) {
-							temp[j] = (float) vector.vector[offset++];
-						}
-						rows[i] = temp;
+				for (int i = 0; offset < childCount; i++) {
+					temp = new Timestamp[(int) lengthVector[i]];
+					for (int j = 0; j < temp.length; j++) {
+						temp[j] = readTimestamp(vector.time[offset], vector.nanos[offset]);
+						offset++;
 					}
-				} else { // set list as a field of Row
-					for (int i = 0; offset < childCount; i++) {
-						temp = new float[(int) lengthVector[i]];
-						for (int j = 0; j < temp.length; j++) {
-							temp[j] = (float) vector.vector[offset++];
-						}
-						((Row) rows[i]).setField(fieldIdx, temp);
+					if (fieldIdx == -1) {
+						vals[i] = temp;
+					} else {
+						((Row) vals[i]).setField(fieldIdx, temp);
 					}
 				}
 			}
 		}
 	}
 
-	private static void readNonNullDoubleColumn(Object[] rows, int fieldIdx, DoubleColumnVector vector, long[] lengthVector, int childCount) {
+	private static void readNonNullDecimalColumn(Object[] vals, int fieldIdx, DecimalColumnVector vector, long[] lengthVector, int childCount) {
 
-		// check if double is directly in a list or not, e.g, array<double>
+		// check if the decimals need to be read into lists or as single values
 		if (lengthVector == null) {
 			if (vector.isRepeating) { // fill complete column with first value
-				double repeatingValue = vector.vector[0];
-				fillColumnWithRepeatingValue(rows, fieldIdx, repeatingValue, childCount);
+				fillColumnWithRepeatingValue(vals, fieldIdx, readBigDecimal(vector.vector[0]), childCount);
 			} else {
 				if (fieldIdx == -1) { // set as an object
 					for (int i = 0; i < childCount; i++) {
-						rows[i] = vector.vector[i];
+						vals[i] = readBigDecimal(vector.vector[i]);
 					}
 				} else { // set as a field of Row
+					Row[] rows = (Row[]) vals;
 					for (int i = 0; i < childCount; i++) {
-						((Row) rows[i]).setField(fieldIdx, vector.vector[i]);
+						rows[i].setField(fieldIdx, readBigDecimal(vector.vector[i]));
 					}
 				}
 			}
-		} else { // in a list
-			double[] temp;
+		} else {
+			BigDecimal[] temp;
 			int offset = 0;
 			if (vector.isRepeating) { // fill complete list with first value
-				double repeatingValue = vector.vector[0];
-				if (fieldIdx == -1) { // set list as an object
-					for (int i = 0; offset < childCount; i++) {
-						temp = new double[(int) lengthVector[i]];
-						Arrays.fill(temp, repeatingValue);
-						rows[i] = temp;
-						offset += temp.length;
-					}
-				} else { // set list as a field of Row
-					for (int i = 0; offset < childCount; i++) {
-						temp = new double[(int) lengthVector[i]];
-						Arrays.fill(temp, repeatingValue);
-						((Row) rows[i]).setField(fieldIdx, temp);
-						offset += temp.length;
+				BigDecimal repeatingValue = readBigDecimal(vector.vector[0]);
+				for (int i = 0; offset < childCount; i++) {
+					temp = new BigDecimal[(int) lengthVector[i]];
+					Arrays.fill(temp, repeatingValue);
+					offset += temp.length;
+					if (fieldIdx == -1) {
+						vals[i] = temp;
+					} else {
+						((Row) vals[i]).setField(fieldIdx, temp);
 					}
 				}
 			} else {
-				if (fieldIdx == -1) { // set list as an object
-					for (int i = 0; offset < childCount; i++) {
-						temp = new double[(int) lengthVector[i]];
-						for (int j = 0; j < temp.length; j++) {
-							temp[j] = vector.vector[offset++];
-						}
-						rows[i] = temp;
+				for (int i = 0; offset < childCount; i++) {
+					temp = new BigDecimal[(int) lengthVector[i]];
+					for (int j = 0; j < temp.length; j++) {
+						temp[j] = readBigDecimal(vector.vector[offset++]);
 					}
-				} else { // set list as a field of Row
-					for (int i = 0; offset < childCount; i++) {
-						temp = new double[(int) lengthVector[i]];
-						for (int j = 0; j < temp.length; j++) {
-							temp[j] = vector.vector[offset++];
-						}
-						((Row) rows[i]).setField(fieldIdx, temp);
+					if (fieldIdx == -1) {
+						vals[i] = temp;
+					} else {
+						((Row) vals[i]).setField(fieldIdx, temp);
 					}
 				}
 			}
 		}
+
 	}
 
-	private static void readNonNullStringColumn(Object[] rows, int fieldIdx, BytesColumnVector bytes, long[] lengthVector, int childCount) {
+	private static void readNonNullStructColumn(Object[] vals, int fieldIdx, StructColumnVector structVector, TypeDescription schema, long[] lengthVector, int childCount) {
+
+		List<TypeDescription> childrenTypes = schema.getChildren();
+
+		int numFields = childrenTypes.size();
+		// create a batch of Rows to read the structs
+		Row[] structs = new Row[childCount];
+		// TODO: possible improvement: reuse existing Row objects
+		for (int i = 0; i < childCount; i++) {
+			structs[i] = new Row(numFields);
+		}
+
+		// read struct fields
+		for (int i = 0; i < numFields; i++) {
+			readField(structs, i, childrenTypes.get(i), structVector.fields[i], null, childCount);
+		}
 
-		// check if string is directly in a list or not, e.g, array<string>
+		// check if the structs need to be read into lists or as single values
 		if (lengthVector == null) {
-			if (bytes.isRepeating) { // fill complete column with first value
-				String repeatingValue = new String(bytes.vector[0], bytes.start[0], bytes.length[0]);
-				fillColumnWithRepeatingValue(rows, fieldIdx, repeatingValue, childCount);
-			} else {
-				if (fieldIdx == -1) { // set as an object
-					for (int i = 0; i < childCount; i++) {
-						rows[i] = new String(bytes.vector[i], bytes.start[i], bytes.length[i]);
-					}
-				} else { // set as a field of Row
-					for (int i = 0; i < childCount; i++) {
-						((Row) rows[i]).setField(fieldIdx, new String(bytes.vector[i], bytes.start[i], bytes.length[i]));
-					}
+			if (fieldIdx == -1) { // set struct as an object
+				System.arraycopy(structs, 0, vals, 0, childCount);
+			} else { // set struct as a field of Row
+				Row[] rows = (Row[]) vals;
+				for (int i = 0; i < childCount; i++) {
+					rows[i].setField(fieldIdx, structs[i]);
 				}
 			}
-		}
-		else { // in a list
-			String[] temp;
+		} else { // struct in a list
 			int offset = 0;
-			if (bytes.isRepeating) { // fill list with first value
-				String repeatingValue = new String(bytes.vector[0], bytes.start[0], bytes.length[0]);
-				if (fieldIdx == -1) { // set list as an object
-					for (int i = 0; i < childCount; i++) {
-						temp = new String[(int) lengthVector[i]];
-						Arrays.fill(temp, repeatingValue);
-						rows[i] = temp;
-						offset += temp.length;
-					}
-				} else { // set list as a field
-					for (int i = 0; i < childCount; i++) {
-						temp = new String[(int) lengthVector[i]];
-						Arrays.fill(temp, repeatingValue);
-						((Row) rows[i]).setField(fieldIdx, temp);
-						offset += temp.length;
-					}
+			Row[] temp;
+			for (int i = 0; offset < childCount; i++) {
+				temp = new Row[(int) lengthVector[i]];
+				System.arraycopy(structs, offset, temp, 0, temp.length);
+				offset = offset + temp.length;
+				if (fieldIdx == -1) {
+					vals[i] = temp;
+				} else {
+					((Row) vals[i]).setField(fieldIdx, temp);
 				}
-			} else {
-				if (fieldIdx == -1) { // set list as an object
-					for (int i = 0; offset < childCount; i++) {
-						temp = new String[(int) lengthVector[i]];
-						for (int j = 0; j < temp.length; j++) {
-							temp[j] = new String(bytes.vector[offset], bytes.start[offset], bytes.length[offset]);
-							offset++;
-						}
-						rows[i] = temp;
-					}
-				} else { // set list as a field
-					for (int i = 0; offset < childCount; i++) {
-						temp = new String[(int) lengthVector[i]];
-						for (int j = 0; j < temp.length; j++) {
-							temp[j] = new String(bytes.vector[offset], bytes.start[offset], bytes.length[offset]);
-							offset++;
-						}
-						((Row) rows[i]).setField(fieldIdx, temp);
-					}
+			}
+		}
+	}
+
+	private static void readNonNullListColumn(Object[] vals, int fieldIdx, ListColumnVector list, TypeDescription schema, long[] lengthVector, int childCount) {
+
+		TypeDescription fieldType = schema.getChildren().get(0);
+		// check if the list need to be read into lists or as single values
+		if (lengthVector == null) {
+			long[] lengthVectorNested = list.lengths;
+			readField(vals, fieldIdx, fieldType, list.child, lengthVectorNested, list.childCount);
+		} else { // list in a list
+			Object[] nestedLists = new Object[childCount];
+			// length vector for nested list
+			long[] lengthVectorNested = list.lengths;
+			// read nested list
+			readField(nestedLists, -1, fieldType, list.child, lengthVectorNested, list.childCount);
+			// get type of nestedList
+			Class<?> classType = nestedLists[0].getClass();
+
+			// fill outer list with nested list
+			int offset = 0;
+			int length;
+			for (int i = 0; offset < childCount; i++) {
+				length = (int) lengthVector[i];
+				Object[] temp = (Object[]) Array.newInstance(classType, length);
+				System.arraycopy(nestedLists, offset, temp, 0, length);
+				offset = offset + length;
+				if (fieldIdx == -1) {
+					vals[i] = temp;
+				} else {
+					((Row) vals[i]).setField(fieldIdx, temp);
 				}
 			}
 		}
+	}
+
+	private static void readNonNullMapColumn(Object[] vals, int fieldIdx, MapColumnVector mapsVector, TypeDescription schema, long[] lengthVector, int childCount) {
+
+		List<TypeDescription> fieldType = schema.getChildren();
+		TypeDescription keyType = fieldType.get(0);
+		TypeDescription valueType = fieldType.get(1);
+
+		ColumnVector keys = mapsVector.keys;
+		ColumnVector values = mapsVector.values;
+		Object[] keyRows = new Object[mapsVector.childCount];
+		Object[] valueRows = new Object[mapsVector.childCount];
+
+		// read map keys and values
+		readField(keyRows, -1, keyType, keys, null, keyRows.length);
+		readField(valueRows, -1, valueType, values, null, valueRows.length);
+
+		// check if the maps need to be read into lists or as single values
+		if (lengthVector == null) {
+			long[] lengthVectorMap = mapsVector.lengths;
+			int offset = 0;
+
+			for (int i = 0; i < childCount; i++) {
+				long numMapEntries = lengthVectorMap[i];
+				HashMap map = readHashMap(keyRows, valueRows, offset, numMapEntries);
+				offset += numMapEntries;
+
+				if (fieldIdx == -1) {
+					vals[i] = map;
+				} else {
+					((Row) vals[i]).setField(fieldIdx, map);
+				}
+			}
+		} else { // list of map
+
+			long[] lengthVectorMap = mapsVector.lengths;
+			int mapOffset = 0; // offset of map element
+			int offset = 0; // offset of map
+			HashMap[] temp;
 
+			for (int i = 0; offset < childCount; i++) {
+				temp = new HashMap[(int) lengthVector[i]];
+				for (int j = 0; j < temp.length; j++) {
+					long numMapEntries = lengthVectorMap[offset];
+					temp[j] = readHashMap(keyRows, valueRows, mapOffset, numMapEntries);
+					mapOffset += numMapEntries;
+					offset++;
+				}
+				if (fieldIdx == 1) {
+					vals[i] = temp;
+				} else {
+					((Row) vals[i]).setField(fieldIdx, temp);
+				}
+			}
+		}
 	}
 
-	private static void readNonNullDateColumn(Object[] rows, int fieldIdx, LongColumnVector vector, long[] lengthVector, int childCount) {
+	private static <T> void readLongColumn(Object[] vals, int fieldIdx, LongColumnVector vector, long[] lengthVector, int childCount,
+											LongFunction<T> reader, IntFunction<T[]> array) {
 
-		// check if date is directly in a list or not, e.g, array<date>
+		// check if the values need to be read into lists or as single values
 		if (lengthVector == null) {
 			if (vector.isRepeating) { // fill complete column with first value
-				if (fieldIdx == -1) { // set as an object
-					for (int i = 0; i < childCount; i++) {
-						rows[i] = readDate(vector.vector[0]);
-					}
-				} else { // set as a field of Row
-					for (int i = 0; i < childCount; i++) {
-						((Row) rows[i]).setField(fieldIdx, readDate(vector.vector[0]));
-					}
-				}
+				// since the column contains null values and has just one distinct value, the repeated value is null
+				fillColumnWithRepeatingValue(vals, fieldIdx, null, childCount);
 			} else {
+				boolean[] isNullVector = vector.isNull;
 				if (fieldIdx == -1) { // set as an object
 					for (int i = 0; i < childCount; i++) {
-						rows[i] = readDate(vector.vector[i]);
+						if (isNullVector[i]) {
+							vals[i] = null;
+						} else {
+							vals[i] = reader.apply(vector.vector[i]);
+						}
 					}
 				} else { // set as a field of Row
+					Row[] rows = (Row[]) vals;
 					for (int i = 0; i < childCount; i++) {
-						((Row) rows[i]).setField(fieldIdx, readDate(vector.vector[i]));
-					}
-				}
-			}
-		} else {
-			Date[] temp;
-			int offset = 0;
-			if (vector.isRepeating) { // fill complete list with first value
-				if (fieldIdx == -1) { // set list as an object
-					for (int i = 0; offset < childCount; i++) {
-						temp = new Date[(int) lengthVector[i]];
-						for (int j = 0; j < temp.length; j++) {
-							temp[j] = readDate(vector.vector[0]);
-						}
-						rows[i] = temp;
-						offset += temp.length;
-					}
-				} else { // set list as a field of Row
-					for (int i = 0; offset < childCount; i++) {
-						temp = new Date[(int) lengthVector[i]];
-						for (int j = 0; j < temp.length; j++) {
-							temp[j] = readDate(vector.vector[0]);
+						if (isNullVector[i]) {
+							rows[i].setField(fieldIdx, null);
+						} else {
+							rows[i].setField(fieldIdx, reader.apply(vector.vector[i]));
 						}
-						((Row) rows[i]).setField(fieldIdx, temp);
-						offset += temp.length;
 					}
 				}
+			}
+		} else { // in a list
+			if (vector.isRepeating) { // // fill complete list with first value
+				// since the column contains null values and has just one distinct value, the repeated value is null
+				fillListWithRepeatingNull(vals, fieldIdx, lengthVector, childCount, array);
 			} else {
-				if (fieldIdx == -1) { // set list as an object
-					for (int i = 0; offset < childCount; i++) {
-						temp = new Date[(int) lengthVector[i]];
-						for (int j = 0; j < temp.length; j++) {
-							temp[j] = readDate(vector.vector[offset++]);
+				// column contain null values
+				int offset = 0;
+				T[] temp;
+				boolean[] isNullVector = vector.isNull;
+				for (int i = 0; offset < childCount; i++) {
+					temp = array.apply((int) lengthVector[i]);
+					for (int j = 0; j < temp.length; j++) {
+						if (isNullVector[offset]) {
+							offset++;
+						} else {
+							temp[j] = reader.apply(vector.vector[offset++]);
 						}
-						rows[i] = temp;
 					}
-				} else { // set list as a field of Row
-					for (int i = 0; offset < childCount; i++) {
-						temp = new Date[(int) lengthVector[i]];
-						for (int j = 0; j < temp.length; j++) {
-							temp[j] = readDate(vector.vector[offset++]);
-						}
-						((Row) rows[i]).setField(fieldIdx, temp);
+					if (fieldIdx == -1) {
+						vals[i] = temp;
+					} else {
+						((Row) vals[i]).setField(fieldIdx, temp);
 					}
 				}
 			}
 		}
 	}
 
-	private static void readNonNullTimestampColumn(Object[] rows, int fieldIdx, TimestampColumnVector vector, long[] lengthVector, int childCount) {
+	private static <T> void readDoubleColumn(Object[] vals, int fieldIdx, DoubleColumnVector vector, long[] lengthVector, int childCount,
+												DoubleFunction<T> reader, IntFunction<T[]> array) {
 
-		// check if timestamp is directly in a list or not, e.g, array<timestamp>
+		// check if the values need to be read into lists or as single values
 		if (lengthVector == null) {
 			if (vector.isRepeating) { // fill complete column with first value
-				if (fieldIdx == -1) { // set as an object
-					for (int i = 0; i < childCount; i++) {
-						rows[i] = readTimeStamp(vector.time[0], vector.nanos[0]);
-					}
-				} else { // set as a field of Row
-					for (int i = 0; i < childCount; i++) {
-						((Row) rows[i]).setField(fieldIdx, readTimeStamp(vector.time[0], vector.nanos[0]));
-					}
-				}
+				// since the column contains null values and has just one distinct value, the repeated value is null
+				fillColumnWithRepeatingValue(vals, fieldIdx, null, childCount);
 			} else {
+				boolean[] isNullVector = vector.isNull;
 				if (fieldIdx == -1) { // set as an object
 					for (int i = 0; i < childCount; i++) {
-						rows[i] = readTimeStamp(vector.time[i], vector.nanos[i]);
+						if (isNullVector[i]) {
+							vals[i] = null;
+						} else {
+							vals[i] = reader.apply(vector.vector[i]);
+						}
 					}
 				} else { // set as a field of Row
+					Row[] rows = (Row[]) vals;
 					for (int i = 0; i < childCount; i++) {
-						((Row) rows[i]).setField(fieldIdx, readTimeStamp(vector.time[i], vector.nanos[i]));
-					}
-				}
-			}
-		} else {
-			Timestamp[] temp;
-			int offset = 0;
-			if (vector.isRepeating) { // fill complete list with first value
-				if (fieldIdx == -1) { // set list as an object
-					for (int i = 0; offset < childCount; i++) {
-						temp = new Timestamp[(int) lengthVector[i]];
-						for (int j = 0; j < temp.length; j++) {
-							temp[j] = readTimeStamp(vector.time[0], vector.nanos[0]);
-						}
-						rows[i] = temp;
-						offset += temp.length;
-					}
-				} else { // set list as a field of Row
-					for (int i = 0; offset < childCount; i++) {
-						temp = new Timestamp[(int) lengthVector[i]];
-						for (int j = 0; j < temp.length; j++) {
-							temp[j] = readTimeStamp(vector.time[0], vector.nanos[0]);
+						if (isNullVector[i]) {
+							rows[i].setField(fieldIdx, null);
+						} else {
+							rows[i].setField(fieldIdx, reader.apply(vector.vector[i]));
 						}
-						((Row) rows[i]).setField(fieldIdx, temp);
-						offset += temp.length;
 					}
 				}
+			}
+		} else { // in a list
+			if (vector.isRepeating) { // // fill complete list with first value
+				// since the column contains null values and has just one distinct value, the repeated value is null
+				fillListWithRepeatingNull(vals, fieldIdx, lengthVector, childCount, array);
 			} else {
-				if (fieldIdx == -1) { // set list as an object
-					for (int i = 0; offset < childCount; i++) {
-						temp = new Timestamp[(int) lengthVector[i]];
-						for (int j = 0; j < temp.length; j++) {
-							temp[j] = readTimeStamp(vector.time[offset], vector.nanos[offset]);
+				// column contain null values
+				int offset = 0;
+				T[] temp;
+				boolean[] isNullVector = vector.isNull;
+				for (int i = 0; offset < childCount; i++) {
+					temp = array.apply((int) lengthVector[i]);
+					for (int j = 0; j < temp.length; j++) {
+						if (isNullVector[offset]) {
 							offset++;
+						} else {
+							temp[j] = reader.apply(vector.vector[offset++]);
 						}
-						rows[i] = temp;
 					}
-				} else { // set list as a field of Row
-					for (int i = 0; offset < childCount; i++) {
-						temp = new Timestamp[(int) lengthVector[i]];
-						for (int j = 0; j < temp.length; j++) {
-							temp[j] = readTimeStamp(vector.time[offset], vector.nanos[offset]);
-							offset++;
-						}
-						((Row) rows[i]).setField(fieldIdx, temp);
+					if (fieldIdx == -1) {
+						vals[i] = temp;
+					} else {
+						((Row) vals[i]).setField(fieldIdx, temp);
 					}
 				}
 			}
 		}
 	}
 
-	private static void readNonNullBinaryColumn(Object[] rows, int fieldIdx, BytesColumnVector bytes, long[] lengthVector, int childCount) {
+	private static void readBytesColumnAsString(Object[] vals, int fieldIdx, BytesColumnVector bytes, long[] lengthVector, int childCount) {
 
-		// check if string is directly in a list or not, e.g, array<string>
+		// check if the values need to be read into lists or as single values
 		if (lengthVector == null) {
 			if (bytes.isRepeating) { // fill complete column with first value
-				if (fieldIdx == -1) { // set as an object
-					for (int i = 0; i < childCount; i++) {
-						rows[i] = readBinary(bytes.vector[0], bytes.start[0], bytes.length[0]);
-					}
-				} else { // set as a field of Row
-					for (int i = 0; i < childCount; i++) {
-						((Row) rows[i]).setField(fieldIdx, readBinary(bytes.vector[0], bytes.start[0], bytes.length[0]));
-					}
-				}
-			} else {
-				if (fieldIdx == -1) { // set as an object
-					for (int i = 0; i < childCount; i++) {
-						rows[i] = readBinary(bytes.vector[i], bytes.start[i], bytes.length[i]);
-					}
-				} else { // set as a field of Row
-					for (int i = 0; i < childCount; i++) {
-						((Row) rows[i]).setField(fieldIdx, readBinary(bytes.vector[i], bytes.start[i], bytes.length[i]));
-					}
-				}
-			}
-		} else {
-			byte[][] temp;
-			int offset = 0;
-			if (bytes.isRepeating) { // fill complete list with first value
-				if (fieldIdx == -1) { // set list as an object
-					for (int i = 0; offset < childCount; i++) {
-						temp = new byte[(int) lengthVector[i]][];
-						for (int j = 0; j < temp.length; j++) {
-							temp[j] = readBinary(bytes.vector[0], bytes.start[0], bytes.length[0]);
-						}
-						rows[i] = temp;
-						offset += temp.length;
-					}
-				} else { // set list as a field
-					for (int i = 0; offset < childCount; i++) {
-						temp = new byte[(int) lengthVector[i]][];
-						for (int j = 0; j < temp.length; j++) {
-							temp[j] = readBinary(bytes.vector[0], bytes.start[0], bytes.length[0]);
-						}
-						((Row) rows[i]).setField(fieldIdx, temp);
-						offset += temp.length;
-					}
-				}
-			} else {
-				if (fieldIdx == -1) { // set list as an object
-					for (int i = 0; offset < childCount; i++) {
-						temp = new byte[(int) lengthVector[i]][];
-						for (int j = 0; j < temp.length; j++) {
-							temp[j] = readBinary(bytes.vector[offset], bytes.start[offset], bytes.length[offset]);
-							offset++;
-						}
-						rows[i] = temp;
-					}
-				} else { // set list as a field
-					for (int i = 0; offset < childCount; i++) {
-						temp = new byte[(int) lengthVector[i]][];
-						for (int j = 0; j < temp.length; j++) {
-							temp[j] = readBinary(bytes.vector[offset], bytes.start[offset], bytes.length[offset]);
-							offset++;
-						}
-						((Row) rows[i]).setField(fieldIdx, temp);
-					}
-				}
-			}
-		}
-
-	}
-
-	private static void readNonNullDecimalColumn(Object[] rows, int fieldIdx, DecimalColumnVector vector, long[] lengthVector, int childCount) {
-
-		// check if decimal is directly in a list or not, e.g, array<decimal>
-		if (lengthVector == null) {
-			if (vector.isRepeating) { // fill complete column with first value
-				fillColumnWithRepeatingValue(rows, fieldIdx, readBigDecimal(vector.vector[0]), childCount);
-			} else {
-				if (fieldIdx == -1) { // set as an object
-					for (int i = 0; i < childCount; i++) {
-						rows[i] = readBigDecimal(vector.vector[i]);
-					}
-				} else { // set as a field of Row
-					for (int i = 0; i < childCount; i++) {
-						((Row) rows[i]).setField(fieldIdx, readBigDecimal(vector.vector[i]));
-					}
-				}
-			}
-		} else {
-			BigDecimal[] temp;
-			int offset = 0;
-			if (vector.isRepeating) { // fill complete list with first value
-				BigDecimal repeatingValue = readBigDecimal(vector.vector[0]);
-				if (fieldIdx == -1) { // set list as an object
-					for (int i = 0; offset < childCount; i++) {
-						temp = new BigDecimal[(int) lengthVector[i]];
-						Arrays.fill(temp, repeatingValue);
-						rows[i] = temp;
-						offset += temp.length;
-					}
-				} else { // set list as a field of Row
-					for (int i = 0; offset < childCount; i++) {
-						temp = new BigDecimal[(int) lengthVector[i]];
-						Arrays.fill(temp, repeatingValue);
-						((Row) rows[i]).setField(fieldIdx, temp);
-						offset += temp.length;
-					}
-				}
-			} else {
-				if (fieldIdx == -1) { // set list as an object
-					for (int i = 0; offset < childCount; i++) {
-						temp = new BigDecimal[(int) lengthVector[i]];
-						for (int j = 0; j < temp.length; j++) {
-							temp[j] = readBigDecimal(vector.vector[offset++]);
-						}
-						rows[i] = temp;
-					}
-				} else { // set list as a field of Row
-					for (int i = 0; offset < childCount; i++) {
-						temp = new BigDecimal[(int) lengthVector[i]];
-						for (int j = 0; j < temp.length; j++) {
-							temp[j] = readBigDecimal(vector.vector[offset++]);
-						}
-						((Row) rows[i]).setField(fieldIdx, temp);
-					}
-				}
-			}
-		}
-
-	}
-
-	private static void readNonNullStructColumn(Object[] rows, int fieldIdx, StructColumnVector struct, TypeDescription schema, long[] lengthVector, int childCount) {
-
-		List<TypeDescription> childrenTypes = schema.getChildren();
-
-		int numChildren = childrenTypes.size();
-		Row[] nestedFields = new Row[childCount];
-		for (int i = 0; i < childCount; i++) {
-			nestedFields[i] = new Row(numChildren);
-		}
-		for (int i = 0; i < numChildren; i++) {
-			readField(nestedFields, i, childrenTypes.get(i), struct.fields[i], null, childCount);
-		}
-
-		// check if struct is directly in a list or not, e.g, array<struct<dt>>
-		if (lengthVector == null) {
-			if (fieldIdx == -1) { // set struct as an object
-				System.arraycopy(nestedFields, 0, rows, 0, childCount);
-			}
-			else { // set struct as a field of Row
-				for (int i = 0; i < childCount; i++) {
-					((Row) rows[i]).setField(fieldIdx, nestedFields[i]);
-				}
-			}
-		}
-		else { // struct in a list
-			int offset = 0;
-			Row[] temp;
-			if (fieldIdx == -1) { // set list of struct as an object
-				for (int i = 0; offset < childCount; i++) {
-					temp = new Row[(int) lengthVector[i]];
-					System.arraycopy(nestedFields, offset, temp, 0, temp.length);
-					offset = offset + temp.length;
-					rows[i] = temp;
-				}
-			}
-			else { // set list of struct as a field of Row
-				for (int i = 0; offset < childCount; i++) {
-					temp = new Row[(int) lengthVector[i]];
-					System.arraycopy(nestedFields, offset, temp, 0, temp.length);
-					offset = offset + temp.length;
-					((Row) rows[i]).setField(fieldIdx, temp);
-				}
-			}
-		}
-	}
-
-	private static void readNonNullListColumn(Object[] rows, int fieldIdx, ListColumnVector list, TypeDescription schema, long[] lengthVector, int childCount) {
-
-		TypeDescription fieldType = schema.getChildren().get(0);
-		if (lengthVector == null) {
-			long[] lengthVectorNested = list.lengths;
-			readField(rows, fieldIdx, fieldType, list.child, lengthVectorNested, list.childCount);
-		}
-		else { // list in a list
-
-			Object[] nestedList = new Object[childCount];
-
-			// length vector for nested list
-			long[] lengthVectorNested = list.lengths;
-
-			// read nested list
-			readField(nestedList, -1, fieldType, list.child, lengthVectorNested, list.childCount);
-
-			// get type of nestedList
-			Class<?> classType = nestedList[0].getClass();
-
-			// fill outer list with nested list
-			int offset = 0;
-			int length;
-			if (fieldIdx == -1) { // set list of list as an object
-				for (int i = 0; offset < childCount; i++) {
-					length = (int) lengthVector[i];
-					Object temp = Array.newInstance(classType, length);
-					System.arraycopy(nestedList, offset, temp, 0, length);
-					offset = offset + length;
-					rows[i] = temp;
-
-				}
-			} else { // set list of list as an field on Row
-				for (int i = 0; offset < childCount; i++) {
-					length = (int) lengthVector[i];
-					Object temp = Array.newInstance(classType, length);
-					System.arraycopy(nestedList, offset, temp, 0, length);
-					offset = offset + length;
-					((Row) rows[i]).setField(fieldIdx, temp);
-				}
-			}
-		}
-
-	}
-
-	private static void readNonNullMapColumn(Object[] rows, int fieldIdx, MapColumnVector map, TypeDescription schema, long[] lengthVector, int childCount) {
-
-		List<TypeDescription> fieldType = schema.getChildren();
-		TypeDescription keyType = fieldType.get(0);
-		TypeDescription valueType = fieldType.get(1);
-
-		ColumnVector keys = map.keys;
-		ColumnVector values = map.values;
-		Object[] keyRows = new Object[map.childCount];
-		Object[] valueRows = new Object[map.childCount];
-
-		// read map kes and values
-		readField(keyRows, -1, keyType, keys, null, keyRows.length);
-		readField(valueRows, -1, valueType, values, null, valueRows.length);
-
-		// check if map is directly in a list or not, e.g, array<map<k,v>>
-		if (lengthVector == null) {
-			long[] lengthVectorMap = map.lengths;
-			int offset = 0;
-			if (fieldIdx == -1) {
-				for (int i = 0; i < childCount; i++) {
-					rows[i] = readHashMap(keyRows, valueRows, offset, lengthVectorMap[i]);
-					offset += lengthVectorMap[i];
-				}
-			} else {
-				for (int i = 0; i < childCount; i++) {
-					((Row) rows[i]).setField(fieldIdx, readHashMap(keyRows, valueRows, offset, lengthVectorMap[i]));
-					offset += lengthVectorMap[i];
-				}
-			}
-		} else { // list of map
-
-			long[] lengthVectorMap = map.lengths;
-			int mapOffset = 0; // offset of map element
-			int offset = 0; // offset of map
-			HashMap[] temp;
-			if (fieldIdx == -1) { // set map list as an object
-				for (int i = 0; offset < childCount; i++) {
-					temp = new HashMap[(int) lengthVector[i]];
-					for (int j = 0; j < temp.length; j++) {
-						temp[j] = readHashMap(keyRows, valueRows, mapOffset, lengthVectorMap[offset]);
-						mapOffset += lengthVectorMap[offset];
-						offset++;
-					}
-					rows[i] = temp;
-				}
-			} else { // set map list as a field of Row
-				for (int i = 0; offset < childCount; i++) {
-					temp = new HashMap[(int) lengthVector[i]];
-					for (int j = 0; j < temp.length; j++) {
-						temp[j] = readHashMap(keyRows, valueRows, mapOffset, lengthVectorMap[offset]);
-						mapOffset += lengthVectorMap[offset];
-						offset++;
-					}
-					((Row) rows[i]).setField(fieldIdx, temp);
-				}
-			}
-		}
-	}
-
-	private static void fillColumnWithRepeatingValue(Object[] rows, int fieldIdx, Object repeatingValue, int childCount) {
-
-		if (fieldIdx == -1) { // set as an object
-			for (int i = 0; i < childCount; i++) {
-				rows[i] = repeatingValue;
-			}
-		} else { // set as a field of Row
-			for (int i = 0; i < childCount; i++) {
-				((Row) rows[i]).setField(fieldIdx, repeatingValue);
-			}
-		}
-	}
-
-	private static void fillListWithRepeatingNull(Object[] rows, int fieldIdx, Class<?> classType, long[] lengthVector, int childCount) {
-
-		int length;
-		if (fieldIdx == -1) {
-			for (int i = 0; i < childCount; i++) {
-				length = (int) lengthVector[i];
-				Object temp = Array.newInstance(classType, length);
-				rows[i] = temp;
-			}
-		} else {
-			for (int i = 0; i < childCount; i++) {
-				length = (int) lengthVector[i];
-				Object temp = Array.newInstance(classType, length);
-				((Row) rows[i]).setField(fieldIdx, temp);
-			}
-		}
-	}
-
-	private static void readBooleanColumn(Object[] rows, int fieldIdx, LongColumnVector vector, long[] lengthVector, int childCount) {
-
-		// check if data type(dt) is directly in list or not, e.g, array<dt>
-		if (lengthVector == null) {
-			if (vector.isRepeating) { // fill complete column with first value
-				// Also column contains null value and it's repeating
-				fillColumnWithRepeatingValue(rows, fieldIdx, null, childCount);
-			} else {
-				boolean[] isNullVector = vector.isNull;
-				if (fieldIdx == -1) { // set as an object
-					for (int i = 0; i < childCount; i++) {
-						if (isNullVector[i]) {
-							rows[i] = null;
-							continue;
-						}
-						rows[i] = vector.vector[i] != 0;
-					}
-				} else { // set as a field of Row
-					for (int i = 0; i < childCount; i++) {
-						if (isNullVector[i]) {
-							((Row) rows[i]).setField(fieldIdx, null);
-							continue;
-						}
-						((Row) rows[i]).setField(fieldIdx, vector.vector[i] != 0);
-					}
-				}
-			}
-		} else { // in a list
-			if (vector.isRepeating) { // // fill complete list with first value
-				// Also column contains null value and it's repeating
-				// so all values are null, but we need to set list with null values
-				fillListWithRepeatingNull(rows, fieldIdx, boolean[].class, lengthVector, childCount);
-			} else {
-				// column contain null values
-				int offset = 0;
-				boolean[] temp;
-				boolean[] isNullVector = vector.isNull;
-				if (fieldIdx == -1) { // set list as an object
-					for (int i = 0; offset < childCount; i++) {
-						temp = new boolean[(int) lengthVector[i]];
-						for (int j = 0; j < temp.length; j++) {
-							if (isNullVector[offset]) {
-								offset++;
-								continue;
-							}
-							temp[j] = vector.vector[offset++] != 0;
-						}
-						rows[i] = temp;
-					}
-				} else { // set list as a field of Row
-					for (int i = 0; offset < childCount; i++) {
-						temp = new boolean[(int) lengthVector[i]];
-						for (int j = 0; j < temp.length; j++) {
-							if (isNullVector[offset]) {
-								offset++;
-								continue;
-							}
-							temp[j] = vector.vector[offset++] != 0;
-						}
-						((Row) rows[i]).setField(fieldIdx, temp);
-					}
-				}
-			}
-		}
-	}
-
-	private static void readByteColumn(Object[] rows, int fieldIdx, LongColumnVector vector, long[] lengthVector, int childCount) {
-
-		// check if data type(dt) is directly in list or not, e.g, array<dt>
-		if (lengthVector == null) {
-			if (vector.isRepeating) { // fill complete column with first value
-				// Also column contains null value and it's repeating
-				fillColumnWithRepeatingValue(rows, fieldIdx, null, childCount);
-			} else {
-				boolean[] isNullVector = vector.isNull;
-				if (fieldIdx == -1) { // set as an object
-					for (int i = 0; i < childCount; i++) {
-						if (isNullVector[i]) {
-							rows[i] = null;
-							continue;
-						}
-						rows[i] = (byte) vector.vector[i];
-					}
-				} else { // set as a field of Row
-					for (int i = 0; i < childCount; i++) {
-						if (isNullVector[i]) {
-							((Row) rows[i]).setField(fieldIdx, null);
-							continue;
-						}
-						((Row) rows[i]).setField(fieldIdx, (byte) vector.vector[i]);
-					}
-				}
-			}
-		} else { // in a list
-			if (vector.isRepeating) { // // fill complete list with first value
-				// Also column contains null value and it's repeating
-				// so all values are null, but we need to set list with null values
-				fillListWithRepeatingNull(rows, fieldIdx, byte[].class, lengthVector, childCount);
-			} else {
-				// column contain null values
-				int offset = 0;
-				byte[] temp;
-				boolean[] isNullVector = vector.isNull;
-				if (fieldIdx == -1) { // set list as an object
-					for (int i = 0; offset < childCount; i++) {
-						temp = new byte[(int) lengthVector[i]];
-						for (int j = 0; j < temp.length; j++) {
-							if (isNullVector[offset]) {
-								offset++;
-								continue;
-							}
-							temp[j] = (byte) vector.vector[offset++];
-						}
-						rows[i] = temp;
-					}
-				} else { // set list as a field of Row
-					for (int i = 0; offset < childCount; i++) {
-						temp = new byte[(int) lengthVector[i]];
-						for (int j = 0; j < temp.length; j++) {
-							if (isNullVector[offset]) {
-								offset++;
-								continue;
-							}
-							temp[j] = (byte) vector.vector[offset++];
-						}
-						((Row) rows[i]).setField(fieldIdx, temp);
-					}
-				}
-			}
-		}
-	}
-
-	private static void readShortColumn(Object[] rows, int fieldIdx, LongColumnVector vector, long[] lengthVector, int childCount) {
-
-		// check if data type(dt) is directly in list or not, e.g, array<dt>
-		if (lengthVector == null) {
-			if (vector.isRepeating) { // fill complete column with first value
-				// Also column contains null value and it's repeating
-				fillColumnWithRepeatingValue(rows, fieldIdx, null, childCount);
-			} else {
-				boolean[] isNullVector = vector.isNull;
-				if (fieldIdx == -1) { // set as an object
-					for (int i = 0; i < childCount; i++) {
-						if (isNullVector[i]) {
-							rows[i] = null;
-							continue;
-						}
-						rows[i] = (short) vector.vector[i];
-					}
-				} else { // set as field of Row
-					for (int i = 0; i < childCount; i++) {
-						if (isNullVector[i]) {
-							((Row) rows[i]).setField(fieldIdx, null);
-							continue;
-						}
-						((Row) rows[i]).setField(fieldIdx, (short) vector.vector[i]);
-					}
-				}
-			}
-		} else { // in a list
-			if (vector.isRepeating) { // // fill complete list with first value
-				// Also column contains null value and it's repeating
-				// so all values are null, but we need to set list with null values
-				fillListWithRepeatingNull(rows, fieldIdx, short[].class, lengthVector, childCount);
-			} else {
-				// column contain null values
-				int offset = 0;
-				short[] temp;
-				boolean[] isNullVector = vector.isNull;
-				if (fieldIdx == -1) { // set list as an object
-					for (int i = 0; offset < childCount; i++) {
-						temp = new short[(int) lengthVector[i]];
-						for (int j = 0; j < temp.length; j++) {
-							if (isNullVector[offset]) {
-								offset++;
-								continue;
-							}
-							temp[j] = (short) vector.vector[offset++];
-						}
-						rows[i] = temp;
-					}
-				} else { // set list as a field of Row
-					for (int i = 0; offset < childCount; i++) {
-						temp = new short[(int) lengthVector[i]];
-						for (int j = 0; j < temp.length; j++) {
-							if (isNullVector[offset]) {
-								offset++;
-								continue;
-							}
-							temp[j] = (short) vector.vector[offset++];
-						}
-						((Row) rows[i]).setField(fieldIdx, temp);
-					}
-				}
-			}
-		}
-	}
-
-	private static void readIntColumn(Object[] rows, int fieldIdx, LongColumnVector vector, long[] lengthVector, int childCount) {
-
-		// check if data type(dt) is directly in list or not, e.g, array<dt>
-		if (lengthVector == null) {
-			if (vector.isRepeating) { // fill complete column with first value
-				// Also column contains null value and it's repeating
-				fillColumnWithRepeatingValue(rows, fieldIdx, null, childCount);
-			} else {
-				boolean[] isNullVector = vector.isNull;
-				if (fieldIdx == -1) { // set as an object
-					for (int i = 0; i < childCount; i++) {
-						if (isNullVector[i]) {
-							rows[i] = null;
-							continue;
-						}
-						rows[i] = (int) vector.vector[i];
-					}
-				} else { // set as a field of Row
-					for (int i = 0; i < childCount; i++) {
-						if (isNullVector[i]) {
-							((Row) rows[i]).setField(fieldIdx, null);
-							continue;
-						}
-						((Row) rows[i]).setField(fieldIdx, (int) vector.vector[i]);
-					}
-				}
-			}
-		} else { // in a list
-			if (vector.isRepeating) { // // fill complete list with first value
-				// Also column contains null value and it's repeating
-				// so all values are null, but we need to set list with null values
-				fillListWithRepeatingNull(rows, fieldIdx, int[].class, lengthVector, childCount);
-			} else {
-				// column contain null values
-				int offset = 0;
-				int[] temp;
-				boolean[] isNullVector = vector.isNull;
-				if (fieldIdx == -1) { // set list as an object
-					for (int i = 0; offset < childCount; i++) {
-						temp = new int[(int) lengthVector[i]];
-						for (int j = 0; j < temp.length; j++) {
-							if (isNullVector[offset]) {
-								offset++;
-								continue;
-							}
-							temp[j] = (int) vector.vector[offset++];
-						}
-						rows[i] = temp;
-					}
-				} else { // set list as a field of Row
-					for (int i = 0; offset < childCount; i++) {
-						temp = new int[(int) lengthVector[i]];
-						for (int j = 0; j < temp.length; j++) {
-							if (isNullVector[offset]) {
-								offset++;
-								continue;
-							}
-							temp[j] = (int) vector.vector[offset++];
-						}
-						((Row) rows[i]).setField(fieldIdx, temp);
-					}
-				}
-			}
-		}
-	}
-
-	private static void readLongColumn(Object[] rows, int fieldIdx, LongColumnVector vector, long[] lengthVector, int childCount) {
-
-		// check if data type(dt) is directly in list or not, e.g, array<dt>
-		if (lengthVector == null) {
-			if (vector.isRepeating) { // fill complete column with first value
-				// Also column contains null value and it's repeating
-				fillColumnWithRepeatingValue(rows, fieldIdx, null, childCount);
-			} else {
-				boolean[] isNullVector = vector.isNull;
-				if (fieldIdx == -1) { // set as an object
-					for (int i = 0; i < childCount; i++) {
-						if (isNullVector[i]) {
-							rows[i] = null;
-							continue;
-						}
-						rows[i] = vector.vector[i];
-					}
-				} else { // set as a field of Row
-					for (int i = 0; i < childCount; i++) {
-						if (isNullVector[i]) {
-							((Row) rows[i]).setField(fieldIdx, null);
-							continue;
-						}
-						((Row) rows[i]).setField(fieldIdx, vector.vector[i]);
-					}
-				}
-			}
-		} else { // in a list
-			if (vector.isRepeating) { // // fill complete list with first value
-				// Also column contains null value and it's repeating
-				// so all values are null, but we need to set list with null values
-				fillListWithRepeatingNull(rows, fieldIdx, long[].class, lengthVector, childCount);
-			} else {
-				// column contain null values
-				int offset = 0;
-				long[] temp;
-				boolean[] isNullVector = vector.isNull;
-				if (fieldIdx == -1) { // set list as an object
-					for (int i = 0; offset < childCount; i++) {
-						temp = new long[(int) lengthVector[i]];
-						for (int j = 0; j < temp.length; j++) {
-							if (isNullVector[offset]) {
-								offset++;
-								continue;
-							}
-							temp[j] = vector.vector[offset++];
-						}
-						rows[i] = temp;
-					}
-				} else { // set list as a field of Row
-					for (int i = 0; offset < childCount; i++) {
-						temp = new long[(int) lengthVector[i]];
-						for (int j = 0; j < temp.length; j++) {
-							if (isNullVector[offset]) {
-								offset++;
-								continue;
-							}
-							temp[j] = vector.vector[offset++];
-						}
-						((Row) rows[i]).setField(fieldIdx, temp);
-					}
-				}
-			}
-		}
-	}
-
-	private static void readFloatColumn(Object[] rows, int fieldIdx, DoubleColumnVector vector, long[] lengthVector, int childCount) {
-
-		// check if data type(dt) is directly in list or not, e.g, array<dt>
-		if (lengthVector == null) {
-			if (vector.isRepeating) { // fill complete column with first value
-				// Also column contains null value and it's repeating
-				fillColumnWithRepeatingValue(rows, fieldIdx, null, childCount);
+				// since the column contains null values and has just one distinct value, the repeated value is null
+				fillColumnWithRepeatingValue(vals, fieldIdx, null, childCount);
 			} else {
-				boolean[] isNullVector = vector.isNull;
+				boolean[] isNullVector = bytes.isNull;
 				if (fieldIdx == -1) { // set as an object
 					for (int i = 0; i < childCount; i++) {
 						if (isNullVector[i]) {
-							rows[i] = null;
-							continue;
+							vals[i] = null;
+						} else {
+							vals[i] = new String(bytes.vector[i], bytes.start[i], bytes.length[i]);
 						}
-						rows[i] = (float) vector.vector[i];
 					}
 				} else { // set as a field of Row
+					Row[] rows = (Row[]) vals;
 					for (int i = 0; i < childCount; i++) {
 						if (isNullVector[i]) {
-							((Row) rows[i]).setField(fieldIdx, null);
-							continue;
+							rows[i].setField(fieldIdx, null);
+						} else {
+							rows[i].setField(fieldIdx, new String(bytes.vector[i], bytes.start[i], bytes.length[i]));
 						}
-						((Row) rows[i]).setField(fieldIdx, (float) vector.vector[i]);
 					}
 				}
 			}
 		} else { // in a list
-			if (vector.isRepeating) { // // fill complete list with first value
-				// Also column contains null value and it's repeating
-				// so all values are null, but we need to set list with null values
-				fillListWithRepeatingNull(rows, fieldIdx, float[].class, lengthVector, childCount);
+			if (bytes.isRepeating) { // fill list with first value
+				// since the column contains null values and has just one distinct value, the repeated value is null
+				fillListWithRepeatingNull(vals, fieldIdx, lengthVector, childCount, OrcUtils::stringArray);
 			} else {
-				// column contain null values
 				int offset = 0;
-				float[] temp;
-				boolean[] isNullVector = vector.isNull;
-				if (fieldIdx == -1) { // set list as an object
-					for (int i = 0; offset < childCount; i++) {
-						temp = new float[(int) lengthVector[i]];
-						for (int j = 0; j < temp.length; j++) {
-							if (isNullVector[offset]) {
-								offset++;
-								continue;
-							}
-							temp[j] = (float) vector.vector[offset++];
-						}
-						rows[i] = temp;
-					}
-				} else { // set list as a field of Row
-					for (int i = 0; i < childCount; i++) {
-						temp = new float[(int) lengthVector[i]];
-						for (int j = 0; j < temp.length; j++) {
-							if (isNullVector[offset]) {
-								offset++;
-								continue;
-							}
-							temp[j] = (float) vector.vector[offset++];
-						}
-						((Row) rows[i]).setField(fieldIdx, temp);
-					}
-				}
-			}
-		}
-	}
-
-	private static void readDoubleColumn(Object[] rows, int fieldIdx, DoubleColumnVector vector, long[] lengthVector, int childCount) {
-
-		// check if data type(dt) is directly in list or not, e.g, array<dt>
-		if (lengthVector == null) {
-			if (vector.isRepeating) { // fill complete column with first value
-				// Also column contains null value and it's repeating
-				fillColumnWithRepeatingValue(rows, fieldIdx, null, childCount);
-			} else {
-				boolean[] isNullVector = vector.isNull;
-				if (fieldIdx == -1) { // set as an object
-					for (int i = 0; i < childCount; i++) {
-						if (isNullVector[i]) {
-							rows[i] = null;
-							continue;
-						}
-						rows[i] = vector.vector[i];
-					}
-				} else { // set as field of Row
-					for (int i = 0; i < childCount; i++) {
-						if (isNullVector[i]) {
-							((Row) rows[i]).setField(fieldIdx, null);
-							continue;
+				String[] temp;
+				boolean[] isNullVector = bytes.isNull;
+				for (int i = 0; offset < childCount; i++) {
+					temp = new String[(int) lengthVector[i]];
+					for (int j = 0; j < temp.length; j++) {
+						if (isNullVector[offset]) {
+							offset++;
+						} else {
+							temp[j] = new String(bytes.vector[offset], bytes.start[offset], bytes.length[offset]);
+							offset++;
 						}
-						((Row) rows[i]).setField(fieldIdx, vector.vector[i]);
 					}
-				}
-			}
-		} else { // in a list
-			if (vector.isRepeating) { // // fill complete list with first value
-				// Also column contains null value and it's repeating
-				// so all values are null, but we need to set list with null values
-				fillListWithRepeatingNull(rows, fieldIdx, double[].class, lengthVector, childCount);
-			} else {
-				// column contain null values
-				int offset = 0;
-				double[] temp;
-				boolean[] isNullVector = vector.isNull;
-				if (fieldIdx == -1) { // set list as an object
-					for (int i = 0; offset < childCount; i++) {
-						temp = new double[(int) lengthVector[i]];
-						for (int j = 0; j < temp.length; j++) {
-							if (isNullVector[offset]) {
-								offset++;
-								continue;
-							}
-							temp[j] = vector.vector[offset++];
-						}
-						rows[i] = temp;
-					}
-				} else { // set list as a field of Row
-					for (int i = 0; offset < childCount; i++) {
-						temp = new double[(int) lengthVector[i]];
-						for (int j = 0; j < temp.length; j++) {
-							if (isNullVector[offset]) {
-								offset++;
-								continue;
-							}
-							temp[j] = vector.vector[offset++];
-						}
-						((Row) rows[i]).setField(fieldIdx, temp);
+					if (fieldIdx == -1) {
+						vals[i] = temp;
+					} else {
+						((Row) vals[i]).setField(fieldIdx, temp);
 					}
 				}
 			}
 		}
 	}
 
-	private static void readStringColumn(Object[] rows, int fieldIdx, BytesColumnVector bytes, long[] lengthVector, int childCount) {
+	private static void readBytesColumnAsBinary(Object[] vals, int fieldIdx, BytesColumnVector bytes, long[] lengthVector, int childCount) {
 
-		// check if string is directly in a list or not, e.g, array<string>
+		// check if the binary need to be read into lists or as single values
 		if (lengthVector == null) {
 			if (bytes.isRepeating) { // fill complete column with first value
-				// Also column contains null value and it's repeating
-				fillColumnWithRepeatingValue(rows, fieldIdx, null, childCount);
+				// since the column contains null values and has just one distinct value, the repeated value is null
+				fillColumnWithRepeatingValue(vals, fieldIdx, null, childCount);
 			} else {
 				boolean[] isNullVector = bytes.isNull;
 				if (fieldIdx == -1) { // set as an object
 					for (int i = 0; i < childCount; i++) {
 						if (isNullVector[i]) {
-							rows[i] = null;
-							continue;
+							vals[i] = null;
+						} else {
+							vals[i] = readBinary(bytes.vector[i], bytes.start[i], bytes.length[i]);
 						}
-						rows[i] = new String(bytes.vector[i], bytes.start[i], bytes.length[i]);
 					}
 				} else { // set as a field of Row
+					Row[] rows = (Row[]) vals;
 					for (int i = 0; i < childCount; i++) {
 						if (isNullVector[i]) {
-							((Row) rows[i]).setField(fieldIdx, null);
-							continue;
+							rows[i].setField(fieldIdx, null);
+						} else {
+							rows[i].setField(fieldIdx, readBinary(bytes.vector[i], bytes.start[i], bytes.length[i]));
 						}
-						((Row) rows[i]).setField(fieldIdx, new String(bytes.vector[i], bytes.start[i], bytes.length[i]));
 					}
 				}
 			}
-		} else { // in a list
-			if (bytes.isRepeating) { // fill list with first value
-				// Also column contains null value and it's repeating
-				// so all values are null, but we need to set list with null values
-				fillListWithRepeatingNull(rows, fieldIdx, String[].class, lengthVector, childCount);
+		} else {
+			if (bytes.isRepeating) { // fill complete list with first value
+				// since the column contains null values and has just one distinct value, the repeated value is null
+				fillListWithRepeatingNull(vals, fieldIdx, lengthVector, childCount, OrcUtils::binaryArray);
 			} else {
 				int offset = 0;
-				String[] temp;
+				byte[][] temp;
 				boolean[] isNullVector = bytes.isNull;
-				if (fieldIdx == -1) { // set list as an object
-					for (int i = 0; offset < childCount; i++) {
-						temp = new String[(int) lengthVector[i]];
-						for (int j = 0; j < temp.length; j++) {
-							if (isNullVector[offset]) {
-								offset++;
-								temp[j] = null;
-								continue; // skip null value
-							}
-							temp[j] = new String(bytes.vector[offset], bytes.start[offset], bytes.length[offset]);
+				for (int i = 0; offset < childCount; i++) {
+					temp = new byte[(int) lengthVector[i]][];
+					for (int j = 0; j < temp.length; j++) {
+						if (isNullVector[offset]) {
 							offset++;
-						}
-						rows[i] = temp;
-					}
-				} else { // set list as a field
-					for (int i = 0; offset < childCount; i++) {
-						temp = new String[(int) lengthVector[i]];
-						for (int j = 0; j < temp.length; j++) {
-							if (isNullVector[offset]) {
-								offset++;
-								temp[j] = null;
-								continue; // skip null value
-							}
-							temp[j] = new String(bytes.vector[offset], bytes.start[offset], bytes.length[offset]);
+						} else {
+							temp[j] = readBinary(bytes.vector[offset], bytes.start[offset], bytes.length[offset]);
 							offset++;
 						}
-						((Row) rows[i]).setField(fieldIdx, temp);
+					}
+					if (fieldIdx == -1) {
+						vals[i] = temp;
+					} else {
+						((Row) vals[i]).setField(fieldIdx, temp);
 					}
 				}
 			}
 		}
-
 	}
 
-	private static void readDateColumn(Object[] rows, int fieldIdx, LongColumnVector vector, long[] lengthVector, int childCount) {
+	private static void readLongColumnAsDate(Object[] vals, int fieldIdx, LongColumnVector vector, long[] lengthVector, int childCount) {
 
-		// check if date is directly in a list or not, e.g, array<date>
+		// check if the values need to be read into lists or as single values
 		if (lengthVector == null) {
 			if (vector.isRepeating) { // fill complete column with first value
-				// Also column contains null value and it's repeating
-				fillColumnWithRepeatingValue(rows, fieldIdx, null, childCount);
+				// since the column contains null values and has just one distinct value, the repeated value is null
+				fillColumnWithRepeatingValue(vals, fieldIdx, null, childCount);
 			} else {
 				boolean[] isNullVector = vector.isNull;
 				if (fieldIdx == -1) { // set as an object
 					for (int i = 0; i < childCount; i++) {
 						if (isNullVector[i]) {
-							rows[i] = null;
-							continue;
+							vals[i] = null;
+						} else {
+							vals[i] = readDate(vector.vector[i]);
 						}
-						rows[i] = readDate(vector.vector[i]);
 					}
 				} else { // set as a field of Row
+					Row[] rows = (Row[]) vals;
 					for (int i = 0; i < childCount; i++) {
 						if (isNullVector[i]) {
-							((Row) rows[i]).setField(fieldIdx, null);
-							continue;
+							rows[i].setField(fieldIdx, null);
+						} else {
+							rows[i].setField(fieldIdx, readDate(vector.vector[i]));
 						}
-						((Row) rows[i]).setField(fieldIdx, readDate(vector.vector[i]));
 					}
 				}
 			}
-		} else {
-			if (vector.isRepeating) { // fill complete list with first value
-				// Also column contains null value and it's repeating
-				// so all values are null, but we need to set list with null values
-				fillListWithRepeatingNull(rows, fieldIdx, Date[].class, lengthVector, childCount);
+		} else { // in a list
+			if (vector.isRepeating) { // // fill complete list with first value
+				// since the column contains null values and has just one distinct value, the repeated value is null
+				fillListWithRepeatingNull(vals, fieldIdx, lengthVector, childCount, OrcUtils::dateArray);
 			} else {
+				// column contain null values
 				int offset = 0;
 				Date[] temp;
 				boolean[] isNullVector = vector.isNull;
-				if (fieldIdx == -1) { // set list as an object
-					for (int i = 0; offset < childCount; i++) {
-						temp = new Date[(int) lengthVector[i]];
-						for (int j = 0; j < temp.length; j++) {
-							if (isNullVector[offset]) {
-								offset++;
-								temp[j] = null;
-								continue;
-							}
-							temp[j] = readDate(vector.vector[offset++]);
-						}
-						rows[i] = temp;
-					}
-				} else { // set list as a field of Row
-					for (int i = 0; offset < childCount; i++) {
-						temp = new Date[(int) lengthVector[i]];
-						for (int j = 0; j < temp.length; j++) {
-							if (isNullVector[offset]) {
-								offset++;
-								temp[j] = null;
-								continue;
-							}
+				for (int i = 0; offset < childCount; i++) {
+					temp = new Date[(int) lengthVector[i]];
+					for (int j = 0; j < temp.length; j++) {
+						if (isNullVector[offset]) {
+							offset++;
+						} else {
 							temp[j] = readDate(vector.vector[offset++]);
 						}
-						((Row) rows[i]).setField(fieldIdx, temp);
+					}
+					if (fieldIdx == -1) {
+						vals[i] = temp;
+					} else {
+						((Row) vals[i]).setField(fieldIdx, temp);
 					}
 				}
 			}
 		}
-
 	}
 
-

<TRUNCATED>