You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by vo...@apache.org on 2019/06/07 13:24:59 UTC

[drill] branch master updated: DRILL-7251: Read Hive array w/o nulls

This is an automated email from the ASF dual-hosted git repository.

volodymyr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git


The following commit(s) were added to refs/heads/master by this push:
     new de0aec7  DRILL-7251: Read Hive array w/o nulls
de0aec7 is described below

commit de0aec7951254949ae9206d6f63b5077684dac8a
Author: Igor Guzenko <ih...@gmail.com>
AuthorDate: Tue May 14 20:16:46 2019 +0300

    DRILL-7251: Read Hive array w/o nulls
    
    1. HiveFieldConverter replaced by Hive writers for primitives
    2. Created HiveValueWriterFactory and HiveListWriter to implement arrays support
    4. Readers generation replaced by HiveDefaultRecordReader and HiveTextRecordReader
    5. Few reader initializers replaced by one
    6. Added method to repeated vardecimal writer
    7. Minor fix for array column in View
---
 contrib/storage-hive/core/pom.xml                  |   38 +
 .../storage-hive/core/src/main/codegen/config.fmpp |    1 -
 .../core/src/main/codegen/data/HiveFormats.tdd     |   50 -
 .../main/codegen/templates/HiveRecordReaders.java  |  178 --
 .../drill/exec/store/hive/HiveFieldConverter.java  |  241 ---
 .../exec/store/hive/HiveScanBatchCreator.java      |    6 +-
 .../drill/exec/store/hive/HiveUtilities.java       |    2 +-
 .../store/hive/readers/HiveAbstractReader.java     |  439 -----
 .../hive/readers/HiveDefaultRecordReader.java      |  530 ++++++
 .../store/hive/readers/HiveTextRecordReader.java   |  125 ++
 .../store/hive/readers/ReadersInitializer.java     |  127 ++
 .../initilializers/AbstractReadersInitializer.java |   78 -
 .../initilializers/DefaultReadersInitializer.java  |   53 -
 .../initilializers/EmptyReadersInitializer.java    |   46 -
 .../readers/initilializers/ReadersInitializer.java |   87 -
 .../inspectors/AbstractRecordsInspector.java       |    4 +-
 .../HiveValueWriter.java}                          |   34 +-
 .../store/hive/writers/HiveValueWriterFactory.java |  235 +++
 .../store/hive/writers/complex/HiveListWriter.java |   52 +
 .../package-info.java}                             |   26 +-
 .../primitive/AbstractSingleValueWriter.java}      |   31 +-
 .../primitive/HiveBinaryWriter.java}               |   29 +-
 .../primitive/HiveBooleanWriter.java}              |   24 +-
 .../primitive/HiveByteWriter.java}                 |   24 +-
 .../hive/writers/primitive/HiveCharWriter.java     |   43 +
 .../primitive/HiveDateWriter.java}                 |   28 +-
 .../primitive/HiveDecimalWriter.java}              |   30 +-
 .../primitive/HiveDoubleWriter.java}               |   24 +-
 .../primitive/HiveFloatWriter.java}                |   24 +-
 .../primitive/HiveIntWriter.java}                  |   24 +-
 .../primitive/HiveLongWriter.java}                 |   24 +-
 .../primitive/HiveShortWriter.java}                |   24 +-
 .../hive/writers/primitive/HiveStringWriter.java   |   43 +
 .../primitive/HiveTimestampWriter.java}            |   28 +-
 .../hive/writers/primitive/HiveVarCharWriter.java  |   43 +
 .../apache/drill/exec/hive/HiveTestUtilities.java  |   28 +
 .../exec/hive/complex_types/TestHiveArrays.java    | 1778 ++++++++++++++++++++
 .../complex_types/array/bigint_array.json          |    3 +
 .../complex_types/array/boolean_array.json         |    3 +
 .../resources/complex_types/array/char_array.json  |    3 +
 .../resources/complex_types/array/date_array.json  |    3 +
 .../complex_types/array/decimal_array.json         |    3 +
 .../complex_types/array/double_array.json          |    3 +
 .../resources/complex_types/array/float_array.json |    3 +
 .../resources/complex_types/array/int_array.json   |    3 +
 .../complex_types/array/smallint_array.json        |    3 +
 .../complex_types/array/string_array.json          |    3 +
 .../complex_types/array/timestamp_array.json       |    3 +
 .../complex_types/array/tinyint_array.json         |    3 +
 .../complex_types/array/varchar_array.json         |    3 +
 .../java/org/apache/drill/exec/dotdrill/View.java  |    6 +
 .../org/apache/drill/test/DrillTestWrapper.java    |   14 +-
 .../src/main/codegen/templates/ComplexWriters.java |    7 +
 53 files changed, 3251 insertions(+), 1416 deletions(-)

diff --git a/contrib/storage-hive/core/pom.xml b/contrib/storage-hive/core/pom.xml
index b4acdac..9912b83 100644
--- a/contrib/storage-hive/core/pom.xml
+++ b/contrib/storage-hive/core/pom.xml
@@ -142,6 +142,44 @@
         </exclusion>
       </exclusions>
     </dependency>
+    <!-- Used by complex types tests for loading nested data -->
+    <dependency>
+      <groupId>org.apache.hive.hcatalog</groupId>
+      <artifactId>hive-hcatalog-core</artifactId>
+      <version>2.3.2</version>
+      <scope>test</scope>
+      <exclusions>
+        <exclusion>
+          <groupId>commons-logging</groupId>
+          <artifactId>commons-logging</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>javax.servlet</groupId>
+          <artifactId>servlet-api</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>log4j</groupId>
+          <artifactId>log4j</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>org.slf4j</groupId>
+          <artifactId>slf4j-log4j12</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>org.apache.logging.log4j</groupId>
+          <artifactId>log4j-slf4j-impl</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>org.apache.logging.log4j</groupId>
+          <artifactId>log4j-1.2-api</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>org.apache.logging.log4j</groupId>
+          <artifactId>log4j-web</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+
   </dependencies>
 
   <build>
diff --git a/contrib/storage-hive/core/src/main/codegen/config.fmpp b/contrib/storage-hive/core/src/main/codegen/config.fmpp
index d8ca3fa..cd36891 100644
--- a/contrib/storage-hive/core/src/main/codegen/config.fmpp
+++ b/contrib/storage-hive/core/src/main/codegen/config.fmpp
@@ -16,7 +16,6 @@
 
 data: {
     drillOI:tdd(../data/HiveTypes.tdd)
-    hiveFormat:tdd(../data/HiveFormats.tdd)
 }
 freemarkerLinks: {
     includes: includes/
diff --git a/contrib/storage-hive/core/src/main/codegen/data/HiveFormats.tdd b/contrib/storage-hive/core/src/main/codegen/data/HiveFormats.tdd
deleted file mode 100644
index 5200e4a..0000000
--- a/contrib/storage-hive/core/src/main/codegen/data/HiveFormats.tdd
+++ /dev/null
@@ -1,50 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-# http:# www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-{
-  map: [
-    {
-      hiveFormat: "HiveAvro",
-      hiveReader: "Avro",
-      hasHeaderFooter: false,
-    },
-    {
-      hiveFormat: "HiveParquet",
-      hiveReader: "Parquet",
-      hasHeaderFooter: false,
-    },
-    {
-      hiveFormat: "HiveText",
-      hiveReader: "Text",
-      hasHeaderFooter: true,
-    },
-    {
-      hiveFormat: "HiveOrc",
-      hiveReader: "Orc",
-      hasHeaderFooter: false,
-    },
-    {
-       hiveFormat: "HiveRCFile",
-       hiveReader: "RCFile",
-       hasHeaderFooter: false,
-    },
-    {
-      hiveFormat: "HiveDefault",
-      hiveReader: "Default",
-      hasHeaderFooter: false,
-    }
-  ]
-}
diff --git a/contrib/storage-hive/core/src/main/codegen/templates/HiveRecordReaders.java b/contrib/storage-hive/core/src/main/codegen/templates/HiveRecordReaders.java
deleted file mode 100644
index 7448464..0000000
--- a/contrib/storage-hive/core/src/main/codegen/templates/HiveRecordReaders.java
+++ /dev/null
@@ -1,178 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-/*
- * This template is used to generate different Hive record reader classes for different data formats
- * to avoid JIT profile pullusion. These readers are derived from HiveAbstractReader which implements
- * codes for init and setup stage, but the repeated - and performance critical part - next() method is
- * separately implemented in the classes generated from this template. The internal SkipRecordReeader
- * class is also separated as well due to the same reason.
- *
- * As to the performance gain with this change, please refer to:
- * https://issues.apache.org/jira/browse/DRILL-4982
- *
- */
-<@pp.dropOutputFile />
-<#list hiveFormat.map as entry>
-<@pp.changeOutputFile name="/org/apache/drill/exec/store/hive/Hive${entry.hiveReader}Reader.java" />
-<#include "/@includes/license.ftl" />
-
-package org.apache.drill.exec.store.hive.readers;
-
-import java.util.Collection;
-import java.util.List;
-import java.util.Properties;
-import org.apache.drill.common.exceptions.DrillRuntimeException;
-import org.apache.drill.common.exceptions.ExecutionSetupException;
-import org.apache.drill.common.expression.SchemaPath;
-import org.apache.drill.exec.store.hive.HivePartition;
-import org.apache.drill.exec.store.hive.HiveTableWithColumnCache;
-import org.apache.drill.exec.ops.FragmentContext;
-import org.apache.drill.exec.vector.AllocationHelper;
-import org.apache.drill.exec.vector.ValueVector;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.mapred.InputSplit;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.hive.conf.HiveConf;
-
-import org.apache.hadoop.hive.serde2.SerDeException;
-
-import org.apache.hadoop.mapred.RecordReader;
-
-<#if entry.hasHeaderFooter == true>
-import org.apache.drill.exec.store.hive.readers.inspectors.AbstractRecordsInspector;
-import org.apache.drill.exec.store.hive.readers.inspectors.DefaultRecordsInspector;
-import org.apache.drill.exec.store.hive.readers.inspectors.SkipFooterRecordsInspector;
-import org.apache.drill.exec.store.hive.HiveUtilities;
-import org.apache.hadoop.hive.serde.serdeConstants;
-</#if>
-
-public class Hive${entry.hiveReader}Reader extends HiveAbstractReader {
-
-<#if entry.hasHeaderFooter == true>
-  AbstractRecordsInspector recordsInspector;
-<#else>
-  Object value;
-</#if>
-
-  public Hive${entry.hiveReader}Reader(HiveTableWithColumnCache table, HivePartition partition, Collection<InputSplit> inputSplit, List<SchemaPath> projectedColumns,
-                      FragmentContext context, final HiveConf hiveConf,
-                      UserGroupInformation proxyUgi) throws ExecutionSetupException {
-    super(table, partition, inputSplit, projectedColumns, context, hiveConf, proxyUgi);
-  }
-
-  public  void internalInit(Properties tableProperties, RecordReader<Object, Object> reader) {
-
-    key = reader.createKey();
-<#if entry.hasHeaderFooter == true>
-    int skipHeaderCount = HiveUtilities.retrieveIntProperty(tableProperties, serdeConstants.HEADER_COUNT, -1);
-
-    // skip first N records to apply skip header policy
-    Object value = reader.createValue();
-    for (int i = 0; i < skipHeaderCount; i++) {
-      if (!hasNextValue(value)) {
-        // no more records to skip, we drained the table
-        empty = true;
-        break;
-      }
-    }
-
-    // if table was drained while skipping first N records, there is no need to check for skip footer logic
-    if (!empty) {
-      int skipFooterCount = HiveUtilities.retrieveIntProperty(tableProperties, serdeConstants.FOOTER_COUNT, -1);
-
-      // if we need to skip N last records, use records inspector which will buffer records while reading
-      if (skipFooterCount > 0) {
-        recordsInspector = new SkipFooterRecordsInspector(reader, skipFooterCount);
-      } else {
-        recordsInspector = new DefaultRecordsInspector(reader.createValue());
-      }
-    }
-<#else>
-    value = reader.createValue();
-</#if>
-
-  }
-
-<#if entry.hasHeaderFooter == true>
-
-  @Override
-  public int next() {
-    for (ValueVector vv : vectors) {
-      AllocationHelper.allocateNew(vv, TARGET_RECORD_COUNT);
-    }
-
-    if (empty) {
-      setValueCountAndPopulatePartitionVectors(0);
-      return 0;
-    }
-
-    try {
-      // starting new batch, reset processed records count
-      recordsInspector.reset();
-
-      // process records till batch is full or all records were processed
-      while (!recordsInspector.isBatchFull() && hasNextValue(recordsInspector.getValueHolder())) {
-        Object value = recordsInspector.getNextValue();
-        if (value != null) {
-          Object deSerializedValue = partitionDeserializer.deserialize((Writable) value);
-          if (partTblObjectInspectorConverter != null) {
-            deSerializedValue = partTblObjectInspectorConverter.convert(deSerializedValue);
-          }
-          readHiveRecordAndInsertIntoRecordBatch(deSerializedValue, recordsInspector.getProcessedRecordCount());
-          recordsInspector.incrementProcessedRecordCount();
-        }
-      }
-      setValueCountAndPopulatePartitionVectors(recordsInspector.getProcessedRecordCount());
-      return recordsInspector.getProcessedRecordCount();
-    } catch (SerDeException e) {
-      throw new DrillRuntimeException(e);
-    }
-  }
-
-<#else>
-  @Override
-  public int next() {
-    for (ValueVector vv : vectors) {
-      AllocationHelper.allocateNew(vv, TARGET_RECORD_COUNT);
-    }
-    if (empty) {
-      setValueCountAndPopulatePartitionVectors(0);
-      return 0;
-    }
-
-    try {
-      int recordCount = 0;
-      while (recordCount < TARGET_RECORD_COUNT && hasNextValue(value)) {
-        Object deSerializedValue = partitionDeserializer.deserialize((Writable) value);
-        if (partTblObjectInspectorConverter != null) {
-          deSerializedValue = partTblObjectInspectorConverter.convert(deSerializedValue);
-        }
-        readHiveRecordAndInsertIntoRecordBatch(deSerializedValue, recordCount);
-        recordCount++;
-      }
-
-      setValueCountAndPopulatePartitionVectors(recordCount);
-      return recordCount;
-    } catch (SerDeException e) {
-      throw new DrillRuntimeException(e);
-    }
-  }
-</#if>
-
-}
-</#list>
\ No newline at end of file
diff --git a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveFieldConverter.java b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveFieldConverter.java
deleted file mode 100644
index 83286d7..0000000
--- a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveFieldConverter.java
+++ /dev/null
@@ -1,241 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.store.hive;
-
-import java.math.RoundingMode;
-import java.util.Map;
-
-import org.apache.drill.exec.vector.NullableBigIntVector;
-import org.apache.drill.exec.vector.NullableBitVector;
-import org.apache.drill.exec.vector.NullableDateVector;
-import org.apache.drill.exec.vector.NullableFloat4Vector;
-import org.apache.drill.exec.vector.NullableFloat8Vector;
-import org.apache.drill.exec.vector.NullableIntVector;
-import org.apache.drill.exec.vector.NullableTimeStampVector;
-import org.apache.drill.exec.vector.NullableVarBinaryVector;
-import org.apache.drill.exec.vector.NullableVarCharVector;
-import org.apache.drill.exec.vector.NullableVarDecimalVector;
-import org.apache.drill.exec.vector.ValueVector;
-import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
-import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector.PrimitiveCategory;
-import org.apache.hadoop.hive.serde2.objectinspector.primitive.BinaryObjectInspector;
-import org.apache.hadoop.hive.serde2.objectinspector.primitive.BooleanObjectInspector;
-import org.apache.hadoop.hive.serde2.objectinspector.primitive.ByteObjectInspector;
-import org.apache.hadoop.hive.serde2.objectinspector.primitive.DateObjectInspector;
-import org.apache.hadoop.hive.serde2.objectinspector.primitive.DoubleObjectInspector;
-import org.apache.hadoop.hive.serde2.objectinspector.primitive.FloatObjectInspector;
-import org.apache.hadoop.hive.serde2.objectinspector.primitive.HiveCharObjectInspector;
-import org.apache.hadoop.hive.serde2.objectinspector.primitive.HiveDecimalObjectInspector;
-import org.apache.hadoop.hive.serde2.objectinspector.primitive.HiveVarcharObjectInspector;
-import org.apache.hadoop.hive.serde2.objectinspector.primitive.IntObjectInspector;
-import org.apache.hadoop.hive.serde2.objectinspector.primitive.LongObjectInspector;
-import org.apache.hadoop.hive.serde2.objectinspector.primitive.ShortObjectInspector;
-import org.apache.hadoop.hive.serde2.objectinspector.primitive.StringObjectInspector;
-import org.apache.hadoop.hive.serde2.objectinspector.primitive.TimestampObjectInspector;
-import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
-import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
-import org.apache.hadoop.io.Text;
-import org.joda.time.DateTime;
-import org.joda.time.DateTimeZone;
-
-import org.apache.drill.shaded.guava.com.google.common.collect.Maps;
-
-import static org.apache.drill.exec.store.hive.HiveUtilities.throwUnsupportedHiveDataTypeError;
-
-public abstract class HiveFieldConverter {
-
-  public abstract void setSafeValue(ObjectInspector oi, Object hiveFieldValue, ValueVector outputVV, int outputIndex);
-
-  private static Map<PrimitiveCategory, Class< ? extends HiveFieldConverter>> primMap = Maps.newHashMap();
-
-  // TODO (DRILL-2470)
-  // Byte and short (tinyint and smallint in SQL types) are currently read as integers
-  // as these smaller integer types are not fully supported in Drill today.
-  // Here the same types are used, as we have to read out of the correct typed converter
-  // from the hive side, in the FieldConverter classes below for Byte and Short we convert
-  // to integer when writing into Drill's vectors.
-  static {
-    primMap.put(PrimitiveCategory.BINARY, Binary.class);
-    primMap.put(PrimitiveCategory.BOOLEAN, Boolean.class);
-    primMap.put(PrimitiveCategory.BYTE, Byte.class);
-    primMap.put(PrimitiveCategory.DOUBLE, Double.class);
-    primMap.put(PrimitiveCategory.FLOAT, Float.class);
-    primMap.put(PrimitiveCategory.INT, Int.class);
-    primMap.put(PrimitiveCategory.LONG, Long.class);
-    primMap.put(PrimitiveCategory.SHORT, Short.class);
-    primMap.put(PrimitiveCategory.STRING, String.class);
-    primMap.put(PrimitiveCategory.VARCHAR, VarChar.class);
-    primMap.put(PrimitiveCategory.TIMESTAMP, Timestamp.class);
-    primMap.put(PrimitiveCategory.DATE, Date.class);
-    primMap.put(PrimitiveCategory.CHAR, Char.class);
-    primMap.put(PrimitiveCategory.DECIMAL, VarDecimal.class);
-  }
-
-
-  public static HiveFieldConverter create(TypeInfo typeInfo)
-      throws IllegalAccessException, InstantiationException {
-    switch (typeInfo.getCategory()) {
-      case PRIMITIVE:
-        final PrimitiveCategory pCat = ((PrimitiveTypeInfo) typeInfo).getPrimitiveCategory();
-        Class<? extends HiveFieldConverter> clazz = primMap.get(pCat);
-        if (clazz != null) {
-          return clazz.newInstance();
-        }
-
-        throwUnsupportedHiveDataTypeError(pCat.toString());
-        break;
-
-      case LIST:
-      case MAP:
-      case STRUCT:
-      case UNION:
-      default:
-        throwUnsupportedHiveDataTypeError(typeInfo.getCategory().toString());
-    }
-
-    return null;
-  }
-
-  public static class Binary extends HiveFieldConverter {
-    @Override
-    public void setSafeValue(ObjectInspector oi, Object hiveFieldValue, ValueVector outputVV, int outputIndex) {
-      final byte[] value = ((BinaryObjectInspector)oi).getPrimitiveJavaObject(hiveFieldValue);
-      ((NullableVarBinaryVector) outputVV).getMutator().setSafe(outputIndex, value, 0, value.length);
-    }
-  }
-
-  public static class Boolean extends HiveFieldConverter {
-    @Override
-    public void setSafeValue(ObjectInspector oi, Object hiveFieldValue, ValueVector outputVV, int outputIndex) {
-      final boolean value = (boolean) ((BooleanObjectInspector)oi).getPrimitiveJavaObject(hiveFieldValue);
-      ((NullableBitVector) outputVV).getMutator().setSafe(outputIndex, value ? 1 : 0);
-    }
-  }
-
-  public static class VarDecimal extends HiveFieldConverter {
-    @Override
-    public void setSafeValue(ObjectInspector oi, Object hiveFieldValue, ValueVector outputVV, int outputIndex) {
-      ((NullableVarDecimalVector) outputVV).getMutator()
-          .setSafe(
-              outputIndex,
-              ((HiveDecimalObjectInspector) oi)
-                  .getPrimitiveJavaObject(hiveFieldValue).bigDecimalValue()
-                  .setScale(outputVV.getField().getScale(), RoundingMode.HALF_UP));
-    }
-  }
-
-  public static class Double extends HiveFieldConverter {
-    @Override
-    public void setSafeValue(ObjectInspector oi, Object hiveFieldValue, ValueVector outputVV, int outputIndex) {
-      final double value = (double) ((DoubleObjectInspector)oi).getPrimitiveJavaObject(hiveFieldValue);
-      ((NullableFloat8Vector) outputVV).getMutator().setSafe(outputIndex, value);
-    }
-  }
-
-  public static class Float extends HiveFieldConverter {
-    @Override
-    public void setSafeValue(ObjectInspector oi, Object hiveFieldValue, ValueVector outputVV, int outputIndex) {
-      final float value = (float) ((FloatObjectInspector)oi).getPrimitiveJavaObject(hiveFieldValue);
-      ((NullableFloat4Vector) outputVV).getMutator().setSafe(outputIndex, value);
-    }
-  }
-
-  public static class Int extends HiveFieldConverter {
-    @Override
-    public void setSafeValue(ObjectInspector oi, Object hiveFieldValue, ValueVector outputVV, int outputIndex) {
-      final int value = (int) ((IntObjectInspector)oi).getPrimitiveJavaObject(hiveFieldValue);
-      ((NullableIntVector) outputVV).getMutator().setSafe(outputIndex, value);
-    }
-  }
-
-  // TODO (DRILL-2470)
-  // Byte and short (tinyint and smallint in SQL types) are currently read as integers
-  // as these smaller integer types are not fully supported in Drill today.
-  public static class Short extends HiveFieldConverter {
-    @Override
-    public void setSafeValue(ObjectInspector oi, Object hiveFieldValue, ValueVector outputVV, int outputIndex) {
-      final int value = (short) ((ShortObjectInspector)oi).getPrimitiveJavaObject(hiveFieldValue);
-      ((NullableIntVector) outputVV).getMutator().setSafe(outputIndex, value);
-    }
-  }
-
-  public static class Byte extends HiveFieldConverter {
-    @Override
-    public void setSafeValue(ObjectInspector oi, Object hiveFieldValue, ValueVector outputVV, int outputIndex) {
-      final int value = (byte)((ByteObjectInspector)oi).getPrimitiveJavaObject(hiveFieldValue);
-      ((NullableIntVector) outputVV).getMutator().setSafe(outputIndex, value);
-    }
-  }
-
-  public static class Long extends HiveFieldConverter {
-    @Override
-    public void setSafeValue(ObjectInspector oi, Object hiveFieldValue, ValueVector outputVV, int outputIndex) {
-      final long value = (long) ((LongObjectInspector)oi).getPrimitiveJavaObject(hiveFieldValue);
-      ((NullableBigIntVector) outputVV).getMutator().setSafe(outputIndex, value);
-    }
-  }
-
-  public static class String extends HiveFieldConverter {
-    @Override
-    public void setSafeValue(ObjectInspector oi, Object hiveFieldValue, ValueVector outputVV, int outputIndex) {
-      final Text value = ((StringObjectInspector)oi).getPrimitiveWritableObject(hiveFieldValue);
-      final byte[] valueBytes = value.getBytes();
-      final int len = value.getLength();
-      ((NullableVarCharVector) outputVV).getMutator().setSafe(outputIndex, valueBytes, 0, len);
-    }
-  }
-
-  public static class VarChar extends HiveFieldConverter {
-    @Override
-    public void setSafeValue(ObjectInspector oi, Object hiveFieldValue, ValueVector outputVV, int outputIndex) {
-      final Text value = ((HiveVarcharObjectInspector)oi).getPrimitiveWritableObject(hiveFieldValue).getTextValue();
-      final byte[] valueBytes = value.getBytes();
-      final int valueLen = value.getLength();
-      ((NullableVarCharVector) outputVV).getMutator().setSafe(outputIndex, valueBytes, 0, valueLen);
-    }
-  }
-
-  public static class Timestamp extends HiveFieldConverter {
-    @Override
-    public void setSafeValue(ObjectInspector oi, Object hiveFieldValue, ValueVector outputVV, int outputIndex) {
-      final java.sql.Timestamp value = ((TimestampObjectInspector)oi).getPrimitiveJavaObject(hiveFieldValue);
-      final DateTime ts = new DateTime(value.getTime()).withZoneRetainFields(DateTimeZone.UTC);
-      ((NullableTimeStampVector) outputVV).getMutator().setSafe(outputIndex, ts.getMillis());
-    }
-  }
-
-  public static class Date extends HiveFieldConverter {
-    @Override
-    public void setSafeValue(ObjectInspector oi, Object hiveFieldValue, ValueVector outputVV, int outputIndex) {
-      final java.sql.Date value = ((DateObjectInspector)oi).getPrimitiveJavaObject(hiveFieldValue);
-      final DateTime date = new DateTime(value.getTime()).withZoneRetainFields(DateTimeZone.UTC);
-      ((NullableDateVector) outputVV).getMutator().setSafe(outputIndex, date.getMillis());
-    }
-  }
-
-  public static class Char extends HiveFieldConverter {
-    @Override
-    public void setSafeValue(ObjectInspector oi, Object hiveFieldValue, ValueVector outputVV, int outputIndex) {
-      final Text value = ((HiveCharObjectInspector)oi).getPrimitiveWritableObject(hiveFieldValue).getStrippedValue();
-      final byte[] valueBytes = value.getBytes();
-      final int valueLen = value.getLength();
-      ((NullableVarCharVector) outputVV).getMutator().setSafe(outputIndex, valueBytes, 0, valueLen);
-    }
-  }
-
-}
\ No newline at end of file
diff --git a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveScanBatchCreator.java b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveScanBatchCreator.java
index 9f1e29e..bf19150 100644
--- a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveScanBatchCreator.java
+++ b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveScanBatchCreator.java
@@ -24,8 +24,7 @@ import org.apache.drill.exec.ops.ExecutorFragmentContext;
 import org.apache.drill.exec.physical.impl.BatchCreator;
 import org.apache.drill.exec.physical.impl.ScanBatch;
 import org.apache.drill.exec.record.RecordBatch;
-import org.apache.drill.exec.store.hive.readers.initilializers.AbstractReadersInitializer;
-import org.apache.drill.exec.store.hive.readers.initilializers.ReadersInitializer;
+import org.apache.drill.exec.store.hive.readers.ReadersInitializer;
 
 @SuppressWarnings("unused")
 public class HiveScanBatchCreator implements BatchCreator<HiveSubScan> {
@@ -34,7 +33,6 @@ public class HiveScanBatchCreator implements BatchCreator<HiveSubScan> {
   @Override
   public ScanBatch getBatch(ExecutorFragmentContext context, HiveSubScan config, List<RecordBatch> children)
       throws ExecutionSetupException {
-    AbstractReadersInitializer readersInitializer = ReadersInitializer.getInitializer(context, config);
-    return new ScanBatch(config, context, readersInitializer.init());
+    return new ScanBatch(config, context, ReadersInitializer.init(context, config));
   }
 }
diff --git a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveUtilities.java b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveUtilities.java
index 3eb134d..f2b5a28 100644
--- a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveUtilities.java
+++ b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveUtilities.java
@@ -517,7 +517,7 @@ public class HiveUtilities {
         .append("Unsupported Hive data type ").append(unsupportedType).append(". ")
         .append(System.lineSeparator())
         .append("Following Hive data types are supported in Drill for querying: ")
-        .append("BOOLEAN, TINYINT, SMALLINT, INT, BIGINT, FLOAT, DOUBLE, DATE, TIMESTAMP, BINARY, DECIMAL, STRING, VARCHAR and CHAR");
+        .append("BOOLEAN, TINYINT, SMALLINT, INT, BIGINT, FLOAT, DOUBLE, DATE, TIMESTAMP, BINARY, DECIMAL, STRING, VARCHAR, CHAR, ARRAY.");
 
     throw UserException.unsupportedError()
         .message(errMsg.toString())
diff --git a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/readers/HiveAbstractReader.java b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/readers/HiveAbstractReader.java
deleted file mode 100644
index cc774d9..0000000
--- a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/readers/HiveAbstractReader.java
+++ /dev/null
@@ -1,439 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.store.hive.readers;
-
-import java.io.IOException;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Properties;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutionException;
-import java.util.stream.Collectors;
-
-import org.apache.drill.shaded.guava.com.google.common.util.concurrent.ListenableFuture;
-import io.netty.buffer.DrillBuf;
-import org.apache.drill.common.exceptions.DrillRuntimeException;
-import org.apache.drill.common.exceptions.ExecutionSetupException;
-import org.apache.drill.common.expression.SchemaPath;
-import org.apache.drill.common.types.TypeProtos.MajorType;
-import org.apache.drill.exec.exception.SchemaChangeException;
-import org.apache.drill.exec.expr.TypeHelper;
-import org.apache.drill.exec.ops.FragmentContext;
-import org.apache.drill.exec.ops.OperatorContext;
-import org.apache.drill.exec.physical.impl.OutputMutator;
-import org.apache.drill.exec.record.MaterializedField;
-import org.apache.drill.exec.server.options.OptionManager;
-import org.apache.drill.exec.store.AbstractRecordReader;
-import org.apache.drill.exec.store.hive.HiveFieldConverter;
-import org.apache.drill.exec.store.hive.HivePartition;
-import org.apache.drill.exec.store.hive.HiveTableWithColumnCache;
-import org.apache.drill.exec.store.hive.HiveUtilities;
-import org.apache.drill.exec.vector.AllocationHelper;
-import org.apache.drill.exec.vector.ValueVector;
-import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
-import org.apache.hadoop.hive.metastore.api.FieldSchema;
-import org.apache.hadoop.hive.serde2.ColumnProjectionUtils;
-import org.apache.hadoop.hive.serde2.Deserializer;
-import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
-import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorConverters;
-import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorConverters.Converter;
-import org.apache.hadoop.hive.serde2.objectinspector.StructField;
-import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
-import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo;
-import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
-import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
-import org.apache.hadoop.mapred.InputSplit;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.RecordReader;
-import org.apache.hadoop.mapred.Reporter;
-
-import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
-import org.apache.hadoop.security.UserGroupInformation;
-
-
-public abstract class HiveAbstractReader extends AbstractRecordReader {
-  protected static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(HiveAbstractReader.class);
-
-  protected final DrillBuf managedBuffer;
-
-  protected HiveTableWithColumnCache table;
-  protected HivePartition partition;
-  protected Iterator<InputSplit> inputSplitsIterator;
-  protected List<String> selectedColumnNames;
-  protected List<StructField> selectedStructFieldRefs = Lists.newArrayList();
-  protected List<TypeInfo> selectedColumnTypes = Lists.newArrayList();
-  protected List<ObjectInspector> selectedColumnObjInspectors = Lists.newArrayList();
-  protected List<HiveFieldConverter> selectedColumnFieldConverters = Lists.newArrayList();
-  protected List<String> selectedPartitionNames = Lists.newArrayList();
-  protected List<TypeInfo> selectedPartitionTypes = Lists.newArrayList();
-  protected List<Object> selectedPartitionValues = Lists.newArrayList();
-
-  // Deserializer of the reading partition (or table if the table is non-partitioned)
-  protected Deserializer partitionDeserializer;
-
-  // ObjectInspector to read data from partitionDeserializer (for a non-partitioned table this is same as the table
-  // ObjectInspector).
-  protected StructObjectInspector partitionOI;
-
-  // Final ObjectInspector. We may not use the partitionOI directly if there are schema changes between the table and
-  // partition. If there are no schema changes then this is same as the partitionOI.
-  protected StructObjectInspector finalOI;
-
-  // Converter which converts data from partition schema to table schema.
-  protected Converter partTblObjectInspectorConverter;
-
-  protected Object key;
-  protected RecordReader<Object, Object> reader;
-  protected List<ValueVector> vectors = Lists.newArrayList();
-  protected List<ValueVector> pVectors = Lists.newArrayList();
-  protected boolean empty;
-  protected HiveConf hiveConf;
-  protected FragmentContext fragmentContext;
-  protected String defaultPartitionValue;
-  protected final UserGroupInformation proxyUgi;
-  protected JobConf job;
-
-
-  public static final int TARGET_RECORD_COUNT = 4000;
-
-  public HiveAbstractReader(HiveTableWithColumnCache table, HivePartition partition, Collection<InputSplit> inputSplits, List<SchemaPath> projectedColumns,
-                            FragmentContext context, final HiveConf hiveConf,
-                            UserGroupInformation proxyUgi) throws ExecutionSetupException {
-    this.table = table;
-    this.partition = partition;
-    this.empty = (inputSplits == null || inputSplits.isEmpty());
-    this.inputSplitsIterator = empty ? Collections.<InputSplit>emptyIterator() : inputSplits.iterator();
-    this.hiveConf = hiveConf;
-    this.fragmentContext = context;
-    this.proxyUgi = proxyUgi;
-    this.managedBuffer = fragmentContext.getManagedBuffer().reallocIfNeeded(256);
-    setColumns(projectedColumns);
-  }
-
-  public abstract void internalInit(Properties tableProperties, RecordReader<Object, Object> reader);
-
-  private void init() throws ExecutionSetupException {
-    job = new JobConf(hiveConf);
-
-    // Get the configured default val
-    defaultPartitionValue = hiveConf.get(ConfVars.DEFAULTPARTITIONNAME.varname);
-
-    Properties tableProperties;
-    try {
-      tableProperties = HiveUtilities.getTableMetadata(table);
-      final Properties partitionProperties =
-          (partition == null) ?  tableProperties :
-              HiveUtilities.getPartitionMetadata(partition, table);
-      HiveUtilities.addConfToJob(job, partitionProperties);
-
-      final Deserializer tableDeserializer = createDeserializer(job, table.getSd().getSerdeInfo().getSerializationLib(), tableProperties);
-      final StructObjectInspector tableOI = getStructOI(tableDeserializer);
-
-      if (partition != null) {
-        partitionDeserializer = createDeserializer(job, partition.getSd().getSerdeInfo().getSerializationLib(), partitionProperties);
-        partitionOI = getStructOI(partitionDeserializer);
-
-        finalOI = (StructObjectInspector)ObjectInspectorConverters.getConvertedOI(partitionOI, tableOI);
-        partTblObjectInspectorConverter = ObjectInspectorConverters.getConverter(partitionOI, finalOI);
-        job.setInputFormat(HiveUtilities.getInputFormatClass(job, partition.getSd(), table));
-        HiveUtilities.verifyAndAddTransactionalProperties(job, partition.getSd());
-      } else {
-        // For non-partitioned tables, there is no need to create converter as there are no schema changes expected.
-        partitionDeserializer = tableDeserializer;
-        partitionOI = tableOI;
-        partTblObjectInspectorConverter = null;
-        finalOI = tableOI;
-        job.setInputFormat(HiveUtilities.getInputFormatClass(job, table.getSd(), table));
-        HiveUtilities.verifyAndAddTransactionalProperties(job, table.getSd());
-      }
-
-      if (logger.isTraceEnabled()) {
-        for (StructField field: finalOI.getAllStructFieldRefs()) {
-          logger.trace("field in finalOI: {}", field.getClass().getName());
-        }
-        logger.trace("partitionDeserializer class is {} {}", partitionDeserializer.getClass().getName());
-      }
-      // Get list of partition column names
-      final List<String> partitionNames = Lists.newArrayList();
-      for (FieldSchema field : table.getPartitionKeys()) {
-        partitionNames.add(field.getName());
-      }
-
-      // We should always get the columns names from ObjectInspector. For some of the tables (ex. avro) metastore
-      // may not contain the schema, instead it is derived from other sources such as table properties or external file.
-      // Deserializer object knows how to get the schema with all the config and table properties passed in initialization.
-      // ObjectInspector created from the Deserializer object has the schema.
-      final StructTypeInfo sTypeInfo = (StructTypeInfo) TypeInfoUtils.getTypeInfoFromObjectInspector(finalOI);
-      final List<String> tableColumnNames = sTypeInfo.getAllStructFieldNames();
-
-      // Select list of columns for project pushdown into Hive SerDe readers.
-      final List<Integer> columnIds = Lists.newArrayList();
-      if (isStarQuery()) {
-        selectedColumnNames = tableColumnNames;
-        for(int i=0; i<selectedColumnNames.size(); i++) {
-          columnIds.add(i);
-        }
-        selectedPartitionNames = partitionNames;
-      } else {
-        selectedColumnNames = Lists.newArrayList();
-        for (SchemaPath field : getColumns()) {
-          String columnName = field.getRootSegment().getPath();
-          if (partitionNames.contains(columnName)) {
-            selectedPartitionNames.add(columnName);
-          } else {
-            columnIds.add(tableColumnNames.indexOf(columnName));
-            selectedColumnNames.add(columnName);
-          }
-        }
-      }
-      List<String> paths = getColumns().stream()
-          .map(SchemaPath::getRootSegmentPath)
-          .collect(Collectors.toList());
-      ColumnProjectionUtils.appendReadColumns(job, columnIds, selectedColumnNames, paths);
-
-      for (String columnName : selectedColumnNames) {
-        StructField fieldRef = finalOI.getStructFieldRef(columnName);
-        selectedStructFieldRefs.add(fieldRef);
-        ObjectInspector fieldOI = fieldRef.getFieldObjectInspector();
-
-        TypeInfo typeInfo = TypeInfoUtils.getTypeInfoFromTypeString(fieldOI.getTypeName());
-
-        selectedColumnObjInspectors.add(fieldOI);
-        selectedColumnTypes.add(typeInfo);
-        selectedColumnFieldConverters.add(HiveFieldConverter.create(typeInfo));
-      }
-
-      for(int i=0; i<selectedColumnNames.size(); ++i){
-        logger.trace("inspector:typeName={}, className={}, TypeInfo: {}, converter:{}",
-            selectedColumnObjInspectors.get(i).getTypeName(),
-            selectedColumnObjInspectors.get(i).getClass().getName(),
-            selectedColumnTypes.get(i).toString(),
-            selectedColumnFieldConverters.get(i).getClass().getName());
-      }
-
-      for (int i = 0; i < table.getPartitionKeys().size(); i++) {
-        FieldSchema field = table.getPartitionKeys().get(i);
-        if (selectedPartitionNames.contains(field.getName())) {
-          TypeInfo pType = TypeInfoUtils.getTypeInfoFromTypeString(field.getType());
-          selectedPartitionTypes.add(pType);
-
-          if (partition != null) {
-            selectedPartitionValues.add(
-                HiveUtilities.convertPartitionType(pType, partition.getValues().get(i), defaultPartitionValue));
-          }
-        }
-      }
-    } catch (Exception e) {
-      throw new ExecutionSetupException("Failure while initializing Hive Reader " + this.getClass().getName(), e);
-    }
-
-    if (!empty && initNextReader(job)) {
-      internalInit(tableProperties, reader);
-    }
-  }
-
-  /**
-   * Initializes next reader if available, will close previous reader if any.
-   *
-   * @param job map / reduce job configuration.
-   * @return true if new reader was initialized, false is no more readers are available
-   * @throws ExecutionSetupException if could not init record reader
-   */
-  protected boolean initNextReader(JobConf job) throws ExecutionSetupException {
-    if (inputSplitsIterator.hasNext()) {
-      if (reader != null) {
-        closeReader();
-      }
-      InputSplit inputSplit = inputSplitsIterator.next();
-      try {
-        reader = (org.apache.hadoop.mapred.RecordReader<Object, Object>) job.getInputFormat().getRecordReader(inputSplit, job, Reporter.NULL);
-        logger.trace("hive reader created: {} for inputSplit {}", reader.getClass().getName(), inputSplit.toString());
-      } catch (Exception e) {
-        throw new ExecutionSetupException("Failed to get o.a.hadoop.mapred.RecordReader from Hive InputFormat", e);
-      }
-      return true;
-    }
-    return false;
-  }
-
-  /**
-   * Utility method which creates a Deserializer object for given Deserializer class name and properties.
-   * TODO: Replace Deserializer interface with AbstractSerDe, once all Hive clients is upgraded to 2.3 version
-   */
-  private static Deserializer createDeserializer(final JobConf job, final String sLib, final Properties properties) throws Exception {
-    final Class<? extends Deserializer> c = Class.forName(sLib).asSubclass(Deserializer.class);
-    final Deserializer deserializer = c.getConstructor().newInstance();
-    deserializer.initialize(job, properties);
-
-    return deserializer;
-  }
-
-  private static StructObjectInspector getStructOI(final Deserializer deserializer) throws Exception {
-    ObjectInspector oi = deserializer.getObjectInspector();
-    if (oi.getCategory() != ObjectInspector.Category.STRUCT) {
-      throw new UnsupportedOperationException(String.format("%s category not supported", oi.getCategory()));
-    }
-    return (StructObjectInspector) oi;
-  }
-
-  @Override
-  public void setup(OperatorContext context, OutputMutator output)
-      throws ExecutionSetupException {
-    // initializes "reader"
-    final Callable<Void> readerInitializer = new Callable<Void>() {
-      @Override
-      public Void call() throws Exception {
-        init();
-        return null;
-      }
-    };
-
-    final ListenableFuture<Void> result = context.runCallableAs(proxyUgi, readerInitializer);
-    try {
-      result.get();
-    } catch (InterruptedException e) {
-      result.cancel(true);
-      // Preserve evidence that the interruption occurred so that code higher up on the call stack can learn of the
-      // interruption and respond to it if it wants to.
-      Thread.currentThread().interrupt();
-    } catch (ExecutionException e) {
-      throw ExecutionSetupException.fromThrowable(e.getMessage(), e);
-    }
-    try {
-      final OptionManager options = fragmentContext.getOptions();
-      for (int i = 0; i < selectedColumnNames.size(); i++) {
-        MajorType type = HiveUtilities.getMajorTypeFromHiveTypeInfo(selectedColumnTypes.get(i), options);
-        MaterializedField field = MaterializedField.create(selectedColumnNames.get(i), type);
-        Class<? extends ValueVector> vvClass = TypeHelper.getValueVectorClass(type.getMinorType(), type.getMode());
-        vectors.add(output.addField(field, vvClass));
-      }
-
-      for (int i = 0; i < selectedPartitionNames.size(); i++) {
-        MajorType type = HiveUtilities.getMajorTypeFromHiveTypeInfo(selectedPartitionTypes.get(i), options);
-        MaterializedField field = MaterializedField.create(selectedPartitionNames.get(i), type);
-        Class<? extends ValueVector> vvClass = TypeHelper.getValueVectorClass(field.getType().getMinorType(), field.getDataMode());
-        pVectors.add(output.addField(field, vvClass));
-      }
-    } catch(SchemaChangeException e) {
-      throw new ExecutionSetupException(e);
-    }
-  }
-
-  @Override
-  public abstract int next();
-
-  protected void setValueCountAndPopulatePartitionVectors(int recordCount) {
-    for (ValueVector v : vectors) {
-      v.getMutator().setValueCount(recordCount);
-    }
-
-    if (partition != null) {
-      populatePartitionVectors(recordCount);
-    }
-  }
-
-  protected void readHiveRecordAndInsertIntoRecordBatch(Object deSerializedValue, int outputRecordIndex) {
-    for (int i = 0; i < selectedStructFieldRefs.size(); i++) {
-      Object hiveValue = finalOI.getStructFieldData(deSerializedValue, selectedStructFieldRefs.get(i));
-      if (hiveValue != null) {
-        selectedColumnFieldConverters.get(i).setSafeValue(selectedColumnObjInspectors.get(i), hiveValue,
-            vectors.get(i), outputRecordIndex);
-      }
-    }
-  }
-
-  @Override
-  public void close() {
-    closeReader();
-  }
-
-  /**
-   * Will close record reader if any. Any exception will be logged as warning.
-   */
-  private void closeReader() {
-    try {
-      if (reader != null) {
-        reader.close();
-        reader = null;
-      }
-    } catch (Exception e) {
-      logger.warn("Failure while closing Hive Record reader.", e);
-    }
-  }
-
-  protected void populatePartitionVectors(int recordCount) {
-    for (int i = 0; i < pVectors.size(); i++) {
-      final ValueVector vector = pVectors.get(i);
-      final Object val = selectedPartitionValues.get(i);
-
-      AllocationHelper.allocateNew(vector, recordCount);
-
-      if (val != null) {
-        HiveUtilities.populateVector(vector, managedBuffer, val, 0, recordCount);
-      }
-
-      vector.getMutator().setValueCount(recordCount);
-    }
-  }
-
-  /**
-   * Writes value in the given value holder if next value available.
-   * If value is not, checks if there are any other available readers
-   * that may hold next value and tried to obtain value from them.
-   *
-   * @param value value holder
-   * @return true if value was written, false otherwise
-   */
-  protected boolean hasNextValue(Object value) {
-    while (true) {
-      try {
-        if (reader.next(key, value)) {
-          return true;
-        }
-
-        if (initNextReader(job)) {
-          continue;
-        }
-
-        return false;
-
-      } catch (IOException | ExecutionSetupException e) {
-        throw new DrillRuntimeException(e);
-      }
-    }
-  }
-
-  @Override
-  public String toString() {
-    long position = -1;
-    try {
-      if (reader != null) {
-        position = reader.getPos();
-      }
-    } catch (IOException e) {
-      logger.trace("Unable to obtain reader position.", e);
-    }
-    return getClass().getSimpleName() + "[Database=" + table.getDbName()
-        + ", Table=" + table.getTableName()
-        + ", Position=" + position
-        + "]";
-  }
-}
diff --git a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/readers/HiveDefaultRecordReader.java b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/readers/HiveDefaultRecordReader.java
new file mode 100644
index 0000000..d490f5d
--- /dev/null
+++ b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/readers/HiveDefaultRecordReader.java
@@ -0,0 +1,530 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.hive.readers;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import io.netty.buffer.DrillBuf;
+import org.apache.drill.common.exceptions.DrillRuntimeException;
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.expression.PathSegment;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.common.types.TypeProtos;
+import org.apache.drill.exec.expr.TypeHelper;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.ops.OperatorContext;
+import org.apache.drill.exec.physical.impl.OutputMutator;
+import org.apache.drill.exec.record.MaterializedField;
+import org.apache.drill.exec.server.options.OptionManager;
+import org.apache.drill.exec.store.AbstractRecordReader;
+import org.apache.drill.exec.store.hive.HivePartition;
+import org.apache.drill.exec.store.hive.HiveTableWithColumnCache;
+import org.apache.drill.exec.store.hive.HiveUtilities;
+import org.apache.drill.exec.store.hive.writers.HiveValueWriter;
+import org.apache.drill.exec.store.hive.writers.HiveValueWriterFactory;
+import org.apache.drill.exec.vector.AllocationHelper;
+import org.apache.drill.exec.vector.ValueVector;
+import org.apache.drill.exec.vector.complex.impl.VectorContainerWriter;
+import org.apache.drill.shaded.guava.com.google.common.util.concurrent.ListenableFuture;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
+import org.apache.hadoop.hive.serde2.ColumnProjectionUtils;
+import org.apache.hadoop.hive.serde2.Deserializer;
+import org.apache.hadoop.hive.serde2.SerDeException;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorConverters;
+import org.apache.hadoop.hive.serde2.objectinspector.StructField;
+import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
+import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Reader which uses complex writer underneath to fill in value vectors with data read from Hive.
+ * At first glance initialization code in the writer looks cumbersome, but in the end it's main aim is to prepare list of key
+ * fields used in next() and readHiveRecordAndInsertIntoRecordBatch(Object rowValue) methods.
+ * <p>
+ * In a nutshell, the reader is used in two stages:
+ * 1) Setup stage configures mapredReader, partitionObjInspector, partitionDeserializer, list of {@link HiveValueWriter}s for each column in record
+ * batch, partition vectors and values
+ * 2) Reading stage uses objects configured previously to get rows from InputSplits, represent each row as Struct of columns values,
+ * and write each row value of column into Drill's value vectors using HiveValueWriter for each specific column
+ */
+public class HiveDefaultRecordReader extends AbstractRecordReader {
+
+  protected static final Logger logger = LoggerFactory.getLogger(HiveDefaultRecordReader.class);
+
+  /**
+   * Max amount of records that can be consumed by one next() method call
+   */
+  public static final int TARGET_RECORD_COUNT = 4000;
+
+  /**
+   * Key of partition columns in grouped map.
+   */
+  private static final boolean PARTITION_COLUMNS = true;
+
+  /**
+   * Manages all writes to value vectors received using OutputMutator
+   */
+  protected VectorContainerWriter outputWriter;
+
+  /**
+   * Before creation of mapredReader Drill creates the object which holds
+   * meta about table that should be read.
+   */
+  private final HiveTableWithColumnCache hiveTable;
+
+  /**
+   * Contains info about current user and group. Used to initialize
+   * the mapredReader using under the user permissions.
+   */
+  private final UserGroupInformation proxyUserGroupInfo;
+
+  /**
+   * Config to be used for creation of JobConf instance.
+   */
+  private final HiveConf hiveConf;
+
+  /**
+   * Hive partition wrapper with index of column list in ColumnListsCache.
+   */
+  private final HivePartition partition;
+
+  /**
+   * This mapredReader creates JobConf instance to use it's handsome metadata.
+   * Actually the job won't be executed.
+   */
+  private JobConf job;
+
+  /**
+   * Deserializer to be used for deserialization of row.
+   * Depending on partition presence it may be partition or table deserializer.
+   */
+  protected Deserializer partitionDeserializer;
+
+  /**
+   * Used to inspect rows parsed by partitionDeserializer
+   */
+  private StructObjectInspector partitionObjInspector;
+
+  /**
+   * Converts value deserialized using partitionDeserializer
+   */
+  protected ObjectInspectorConverters.Converter partitionToTableSchemaConverter;
+
+  /**
+   * Used to inspect rowValue of each column
+   */
+  private StructObjectInspector finalObjInspector;
+
+  /**
+   * For each concrete column to be read we assign concrete writer
+   * which encapsulates writing of column values read from Hive into
+   * specific value vector
+   */
+  private HiveValueWriter[] columnValueWriters;
+
+  /**
+   * At the moment of mapredReader instantiation we can check inputSplits,
+   * if splits aren't present than there are no records to read,
+   * so mapredReader can finish work early.
+   */
+  protected boolean empty;
+
+  /**
+   * Buffer used for population of partition vectors  and to fill in data into vectors via writers
+   */
+  private final DrillBuf drillBuf;
+
+  /**
+   * The fragmentContext holds different helper objects
+   * associated with fragment. In the reader it's used
+   * to get options for accurate detection of partition columns types.
+   */
+  private final FragmentContext fragmentContext;
+
+  /**
+   * Partition vectors and values are linked together and gets filled after we know that all records are read.
+   * This two arrays must have same sizes.
+   */
+  private ValueVector[] partitionVectors;
+
+  /**
+   * Values to be written into partition vector.
+   */
+  private Object[] partitionValues;
+
+
+  /**
+   * InputSplits to be processed by mapredReader.
+   */
+  private final Iterator<InputSplit> inputSplitsIterator;
+
+  /**
+   * Reader used to to get data from InputSplits
+   */
+  protected RecordReader<Object, Object> mapredReader;
+
+  /**
+   * Helper object used together with mapredReader to get data from InputSplit.
+   */
+  private Object key;
+
+  /**
+   * Helper object used together with mapredReader to get data from InputSplit.
+   */
+  protected Object valueHolder;
+
+  /**
+   * Array of StructField representing columns to be read by the reader.
+   * Used to extract row value of column from final object inspector.
+   */
+  private StructField[] selectedStructFieldRefs;
+
+
+  /**
+   * Readers constructor called by initializer.
+   *
+   * @param table            metadata about Hive table being read
+   * @param partition        holder of metadata about table partitioning
+   * @param inputSplits      input splits for reading data from distributed storage
+   * @param projectedColumns target columns for scan
+   * @param context          fragmentContext of fragment
+   * @param hiveConf         Hive configuration
+   * @param proxyUgi         user/group info to be used for initialization
+   */
+  public HiveDefaultRecordReader(HiveTableWithColumnCache table, HivePartition partition,
+                                 Collection<InputSplit> inputSplits, List<SchemaPath> projectedColumns,
+                                 FragmentContext context, HiveConf hiveConf, UserGroupInformation proxyUgi) {
+    this.hiveTable = table;
+    this.partition = partition;
+    this.hiveConf = hiveConf;
+    this.proxyUserGroupInfo = proxyUgi;
+    this.empty = inputSplits == null || inputSplits.isEmpty();
+    this.inputSplitsIterator = empty ? Collections.emptyIterator() : inputSplits.iterator();
+    this.drillBuf = context.getManagedBuffer().reallocIfNeeded(256);
+    this.partitionVectors = new ValueVector[0];
+    this.partitionValues = new Object[0];
+    setColumns(projectedColumns);
+    this.fragmentContext = context;
+  }
+
+  @Override
+  public void setup(OperatorContext context, OutputMutator output) throws ExecutionSetupException {
+    ListenableFuture<Void> initTaskFuture = context.runCallableAs(proxyUserGroupInfo, getInitTask(output));
+    try {
+      initTaskFuture.get();
+    } catch (InterruptedException e) {
+      initTaskFuture.cancel(true);
+      Thread.currentThread().interrupt();
+    } catch (ExecutionException e) {
+      throw ExecutionSetupException.fromThrowable(e.getMessage(), e);
+    }
+  }
+
+  private Callable<Void> getInitTask(OutputMutator output) {
+    return () -> {
+      this.job = new JobConf(hiveConf);
+      Properties hiveTableProperties = HiveUtilities.getTableMetadata(hiveTable);
+      final Deserializer tableDeserializer = createDeserializer(job, hiveTable.getSd(), hiveTableProperties);
+      final StructObjectInspector tableObjInspector = getStructOI(tableDeserializer);
+
+      if (partition == null) {
+        this.partitionDeserializer = tableDeserializer;
+        this.partitionObjInspector = tableObjInspector;
+        this.partitionToTableSchemaConverter = (obj) -> obj;
+        this.finalObjInspector = tableObjInspector;
+
+        job.setInputFormat(HiveUtilities.getInputFormatClass(job, hiveTable.getSd(), hiveTable));
+        HiveUtilities.verifyAndAddTransactionalProperties(job, hiveTable.getSd());
+      } else {
+        this.partitionDeserializer = createDeserializer(job, partition.getSd(), HiveUtilities.getPartitionMetadata(partition, hiveTable));
+        this.partitionObjInspector = getStructOI(partitionDeserializer);
+
+        this.finalObjInspector = (StructObjectInspector) ObjectInspectorConverters.getConvertedOI(partitionObjInspector, tableObjInspector);
+        this.partitionToTableSchemaConverter = ObjectInspectorConverters.getConverter(partitionObjInspector, finalObjInspector);
+
+        this.job.setInputFormat(HiveUtilities.getInputFormatClass(job, partition.getSd(), hiveTable));
+        HiveUtilities.verifyAndAddTransactionalProperties(job, partition.getSd());
+      }
+
+
+      final List<FieldSchema> partitionKeyFields = hiveTable.getPartitionKeys();
+      final List<String> partitionColumnNames = partitionKeyFields.stream()
+          .map(FieldSchema::getName)
+          .collect(Collectors.toList());
+      // We should always get the columns names from ObjectInspector. For some of the tables (ex. avro) metastore
+      // may not contain the schema, instead it is derived from other sources such as table properties or external file.
+      // Deserializer object knows how to get the schema with all the config and table properties passed in initialization.
+      // ObjectInspector created from the Deserializer object has the schema.
+      final List<String> allTableColumnNames = ((StructTypeInfo) TypeInfoUtils.getTypeInfoFromObjectInspector(finalObjInspector)).getAllStructFieldNames();
+
+      // Determining which regular/partition column names should be selected
+      List<String> selectedColumnNames;
+      List<String> selectedPartitionColumnNames;
+      List<Integer> idsOfProjectedColumns;
+
+      if (isStarQuery()) {
+        selectedColumnNames = allTableColumnNames;
+        selectedPartitionColumnNames = partitionColumnNames;
+        idsOfProjectedColumns = IntStream.range(0, selectedColumnNames.size())
+            .boxed()
+            .collect(Collectors.toList());
+      } else {
+        Map<Boolean, List<String>> groupOfSelectedColumns = getColumns().stream()
+            .map(SchemaPath::getRootSegment)
+            .map(PathSegment.NameSegment::getPath)
+            .distinct()
+            .collect(Collectors.groupingBy(partitionColumnNames::contains));
+
+        selectedColumnNames = groupOfSelectedColumns.getOrDefault(!PARTITION_COLUMNS, Collections.emptyList());
+        selectedPartitionColumnNames = groupOfSelectedColumns.getOrDefault(PARTITION_COLUMNS, Collections.emptyList());
+        idsOfProjectedColumns = selectedColumnNames.stream()
+            .map(allTableColumnNames::indexOf)
+            .collect(Collectors.toList());
+      }
+
+      List<String> nestedColumnPaths = getColumns().stream()
+          .map(SchemaPath::getRootSegmentPath)
+          .collect(Collectors.toList());
+      ColumnProjectionUtils.appendReadColumns(job, idsOfProjectedColumns, selectedColumnNames, nestedColumnPaths);
+
+      // Initialize selectedStructFieldRefs and columnValueWriters, which are two key collections of
+      // objects used to read and save columns row data into Drill's value vectors
+      this.selectedStructFieldRefs = new StructField[selectedColumnNames.size()];
+      this.columnValueWriters = new HiveValueWriter[selectedColumnNames.size()];
+      this.outputWriter = new VectorContainerWriter(output, /*enabled union*/ false);
+      HiveValueWriterFactory hiveColumnValueWriterFactory = new HiveValueWriterFactory(drillBuf, outputWriter.getWriter());
+      for (int refIdx = 0; refIdx < selectedStructFieldRefs.length; refIdx++) {
+        String columnName = selectedColumnNames.get(refIdx);
+        StructField fieldRef = finalObjInspector.getStructFieldRef(columnName);
+        this.selectedStructFieldRefs[refIdx] = fieldRef;
+        this.columnValueWriters[refIdx] = hiveColumnValueWriterFactory.createHiveColumnValueWriter(columnName, fieldRef);
+      }
+
+      // Defining selected partition vectors and values to be filled into them
+      if (partition != null && selectedPartitionColumnNames.size() > 0) {
+        List<ValueVector> partitionVectorList = new ArrayList<>(selectedPartitionColumnNames.size());
+        List<Object> partitionValueList = new ArrayList<>(selectedPartitionColumnNames.size());
+        String defaultPartitionValue = hiveConf.get(HiveConf.ConfVars.DEFAULTPARTITIONNAME.varname);
+        OptionManager options = fragmentContext.getOptions();
+        for (int i = 0; i < partitionKeyFields.size(); i++) {
+          FieldSchema field = partitionKeyFields.get(i);
+          String partitionColumnName = field.getName();
+          if (selectedPartitionColumnNames.contains(partitionColumnName)) {
+            TypeInfo partitionColumnTypeInfo = TypeInfoUtils.getTypeInfoFromTypeString(field.getType());
+            TypeProtos.MajorType majorType = HiveUtilities.getMajorTypeFromHiveTypeInfo(partitionColumnTypeInfo, options);
+            MaterializedField materializedField = MaterializedField.create(partitionColumnName, majorType);
+            Class<? extends ValueVector> partitionVectorClass = TypeHelper.getValueVectorClass(materializedField.getType().getMinorType(),
+                materializedField.getDataMode());
+
+            ValueVector partitionVector = output.addField(materializedField, partitionVectorClass);
+            partitionVectorList.add(partitionVector);
+            Object partitionValue = HiveUtilities.convertPartitionType(partitionColumnTypeInfo, partition.getValues().get(i), defaultPartitionValue);
+            partitionValueList.add(partitionValue);
+          }
+        }
+        this.partitionVectors = partitionVectorList.toArray(new ValueVector[0]);
+        this.partitionValues = partitionValueList.toArray();
+      }
+
+      if (!empty && initNextReader(job)) {
+        key = mapredReader.createKey();
+        valueHolder = mapredReader.createValue();
+        internalInit(hiveTableProperties);
+      }
+      return null;
+    };
+  }
+
+  /**
+   * Default implementation does nothing, used to apply skip header/footer functionality
+   *
+   * @param hiveTableProperties hive table properties
+   */
+  protected void internalInit(Properties hiveTableProperties) {
+  }
+
+  @Override
+  public int next() {
+    outputWriter.allocate();
+    outputWriter.reset();
+    if (empty) {
+      outputWriter.setValueCount(0);
+      populatePartitionVectors(0);
+      return 0;
+    }
+
+    try {
+      int recordCount;
+      for (recordCount = 0; (recordCount < TARGET_RECORD_COUNT && hasNextValue(valueHolder)); recordCount++) {
+        Object deserializedHiveRecord = partitionToTableSchemaConverter.convert(partitionDeserializer.deserialize((Writable) valueHolder));
+        outputWriter.setPosition(recordCount);
+        readHiveRecordAndInsertIntoRecordBatch(deserializedHiveRecord);
+      }
+      outputWriter.setValueCount(recordCount);
+      populatePartitionVectors(recordCount);
+      return recordCount;
+    } catch (ExecutionSetupException | IOException | SerDeException e) {
+      throw new DrillRuntimeException(e.getMessage(), e);
+    }
+  }
+
+  protected void readHiveRecordAndInsertIntoRecordBatch(Object rowValue) {
+    for (int columnRefIdx = 0; columnRefIdx < selectedStructFieldRefs.length; columnRefIdx++) {
+      Object columnValue = finalObjInspector.getStructFieldData(rowValue, selectedStructFieldRefs[columnRefIdx]);
+      if (columnValue != null) {
+        columnValueWriters[columnRefIdx].write(columnValue);
+      }
+    }
+  }
+
+  /**
+   * Checks and reads next value of input split into valueHolder.
+   * Note that if current mapredReader doesn't contain data to read from
+   * InputSplit, this method will try to initialize reader for next InputSplit
+   * and will try to use the new mapredReader.
+   *
+   * @param valueHolder holder for next row value data
+   * @return true if next value present and read into valueHolder
+   * @throws IOException             exception which may be thrown in case when mapredReader failed to read next value
+   * @throws ExecutionSetupException exception may be thrown when next input split is present but reader
+   *                                 initialization for it failed
+   */
+  protected boolean hasNextValue(Object valueHolder) throws IOException, ExecutionSetupException {
+    while (true) {
+      if (mapredReader.next(key, valueHolder)) {
+        return true;
+      } else if (initNextReader(job)) {
+        continue;
+      }
+      return false;
+    }
+  }
+
+
+  @Override
+  public void close() {
+    closeMapredReader();
+  }
+
+  private static Deserializer createDeserializer(JobConf job, StorageDescriptor sd, Properties properties) throws Exception {
+    final Class<? extends Deserializer> c = Class.forName(sd.getSerdeInfo().getSerializationLib()).asSubclass(Deserializer.class);
+    final Deserializer deserializer = c.getConstructor().newInstance();
+    deserializer.initialize(job, properties);
+
+    return deserializer;
+  }
+
+  /**
+   * Get and cast deserializer's objectInspector to StructObjectInspector type
+   *
+   * @param deserializer hive deserializer
+   * @return StructObjectInspector instance
+   * @throws SerDeException in case if can't get inspector from deserializer
+   */
+  private static StructObjectInspector getStructOI(final Deserializer deserializer) throws SerDeException {
+    ObjectInspector oi = deserializer.getObjectInspector();
+    if (oi.getCategory() != ObjectInspector.Category.STRUCT) {
+      throw new UnsupportedOperationException(String.format("%s category not supported", oi.getCategory()));
+    }
+    return (StructObjectInspector) oi;
+  }
+
+
+  /**
+   * Helper method which fills selected partition vectors.
+   * Invoked after completion of reading other records.
+   *
+   * @param recordCount count of records that was read by reader
+   */
+  private void populatePartitionVectors(int recordCount) {
+    if (partition == null) {
+      return;
+    }
+    for (int i = 0; i < partitionVectors.length; i++) {
+      final ValueVector vector = partitionVectors[i];
+      AllocationHelper.allocateNew(vector, recordCount);
+      if (partitionValues[i] != null) {
+        HiveUtilities.populateVector(vector, drillBuf, partitionValues[i], 0, recordCount);
+      }
+      vector.getMutator().setValueCount(recordCount);
+    }
+  }
+
+  /**
+   * Closes previous mapredReader if any, then initializes mapredReader for next present InputSplit,
+   * or returns false when there are no more splits.
+   *
+   * @param job map / reduce job configuration.
+   * @return true if new mapredReader initialized
+   * @throws ExecutionSetupException if could not init record mapredReader
+   */
+  @SuppressWarnings("unchecked")
+  private boolean initNextReader(JobConf job) throws ExecutionSetupException {
+    if (inputSplitsIterator.hasNext()) {
+      closeMapredReader();
+      InputSplit inputSplit = inputSplitsIterator.next();
+      try {
+        mapredReader = job.getInputFormat().getRecordReader(inputSplit, job, Reporter.NULL);
+        logger.trace("hive mapredReader created: {} for inputSplit {}", mapredReader.getClass().getName(), inputSplit.toString());
+        return true;
+      } catch (Exception e) {
+        throw new ExecutionSetupException("Failed to get o.a.hadoop.mapred.RecordReader from Hive InputFormat", e);
+      }
+    }
+    return false;
+  }
+
+  /**
+   * Closes and sets mapredReader value to null.
+   */
+  private void closeMapredReader() {
+    if (mapredReader != null) {
+      try {
+        mapredReader.close();
+      } catch (Exception e) {
+        logger.warn("Failure while closing Hive Record mapredReader.", e);
+      } finally {
+        mapredReader = null;
+      }
+    }
+  }
+
+}
diff --git a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/readers/HiveTextRecordReader.java b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/readers/HiveTextRecordReader.java
new file mode 100644
index 0000000..dadd3bd
--- /dev/null
+++ b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/readers/HiveTextRecordReader.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.hive.readers;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.List;
+import java.util.Properties;
+
+import org.apache.drill.common.exceptions.DrillRuntimeException;
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.store.hive.HivePartition;
+import org.apache.drill.exec.store.hive.HiveTableWithColumnCache;
+import org.apache.drill.exec.store.hive.HiveUtilities;
+import org.apache.drill.exec.store.hive.readers.inspectors.SkipFooterRecordsInspector;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.serde.serdeConstants;
+import org.apache.hadoop.hive.serde2.SerDeException;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.security.UserGroupInformation;
+
+/**
+ * Reading of Hive table stored in text format may require to skip few header/footer records.
+ * This class extends default reader to add the functionality.
+ */
+public class HiveTextRecordReader extends HiveDefaultRecordReader {
+
+  private SkipFooterRecordsInspector skipFooterValueHolder;
+
+  /**
+   * Constructor matching super.
+   *
+   * @param table            metadata about Hive table being read
+   * @param partition        holder of metadata about table partitioning
+   * @param inputSplits      input splits for reading data from distributed storage
+   * @param projectedColumns target columns for scan
+   * @param context          fragmentContext of fragment
+   * @param hiveConf         Hive configuration
+   * @param proxyUgi         user/group info to be used for initialization
+   */
+  public HiveTextRecordReader(HiveTableWithColumnCache table, HivePartition partition,
+                              Collection<InputSplit> inputSplits, List<SchemaPath> projectedColumns,
+                              FragmentContext context, HiveConf hiveConf, UserGroupInformation proxyUgi) {
+    super(table, partition, inputSplits, projectedColumns, context, hiveConf, proxyUgi);
+  }
+
+  @Override
+  protected void internalInit(Properties hiveTableProperties) {
+    int skipHeaderCount = HiveUtilities.retrieveIntProperty(hiveTableProperties, serdeConstants.HEADER_COUNT, -1);
+
+    // skip first N records to apply skip header policy
+    try {
+      for (int i = 0; i < skipHeaderCount; i++) {
+        if (!hasNextValue(valueHolder)) {
+          // no more records to skip, we drained the table
+          empty = true;
+          break;
+        }
+      }
+    } catch (IOException | ExecutionSetupException e) {
+      throw new DrillRuntimeException(e.getMessage(), e);
+    }
+
+    // if table was drained while skipping first N records, there is no need to check for skip footer logic
+    if (!empty) {
+      int skipFooterCount = HiveUtilities.retrieveIntProperty(hiveTableProperties, serdeConstants.FOOTER_COUNT, -1);
+
+      // if we need to skip N last records, use records skipFooterValueHolder which will buffer records while reading
+      if (skipFooterCount > 0) {
+        skipFooterValueHolder = new SkipFooterRecordsInspector(mapredReader, skipFooterCount);
+      }
+    }
+  }
+
+  /**
+   * Reads batch of records skipping footer rows when necessary.
+   *
+   * @return count of read records
+   */
+  @Override
+  public int next() {
+    if (skipFooterValueHolder == null) {
+      return super.next();
+    } else {
+      try {
+        // starting new batch, reset processed records count
+        skipFooterValueHolder.reset();
+
+        while (!skipFooterValueHolder.isBatchFull() && hasNextValue(skipFooterValueHolder.getValueHolder())) {
+          Object value = skipFooterValueHolder.getNextValue();
+          if (value != null) {
+            Object deSerializedValue = partitionToTableSchemaConverter.convert(partitionDeserializer.deserialize((Writable) value));
+            outputWriter.setPosition(skipFooterValueHolder.getProcessedRecordCount());
+            readHiveRecordAndInsertIntoRecordBatch(deSerializedValue);
+            skipFooterValueHolder.incrementProcessedRecordCount();
+          }
+        }
+        outputWriter.setValueCount(skipFooterValueHolder.getProcessedRecordCount());
+
+        return skipFooterValueHolder.getProcessedRecordCount();
+      } catch (ExecutionSetupException | IOException | SerDeException e) {
+        throw new DrillRuntimeException(e.getMessage(), e);
+      }
+    }
+  }
+
+}
diff --git a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/readers/ReadersInitializer.java b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/readers/ReadersInitializer.java
new file mode 100644
index 0000000..fc3d548
--- /dev/null
+++ b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/readers/ReadersInitializer.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.hive.readers;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.ops.ExecutorFragmentContext;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.store.RecordReader;
+import org.apache.drill.exec.store.hive.HivePartition;
+import org.apache.drill.exec.store.hive.HiveSubScan;
+import org.apache.drill.exec.store.hive.HiveTableWithColumnCache;
+import org.apache.drill.exec.util.ImpersonationUtil;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.TextInputFormat;
+import org.apache.hadoop.security.UserGroupInformation;
+
+/**
+ * Factory for creation of Hive record readers used by {@link org.apache.drill.exec.store.hive.HiveScanBatchCreator}.
+ */
+public class ReadersInitializer {
+
+  private static final String TEXT_FORMAT = TextInputFormat.class.getCanonicalName();
+
+  /**
+   * Selects reader constructor reference as {@link HiveReaderFactory} readerFactory.
+   * Then check if input splits are empty creates empty record reader, or one reader per split otherwise.
+   *
+   * @param ctx    context related to fragment
+   * @param config context which holds different Hive configurations
+   * @return list containing one or more readers
+   */
+  public static List<RecordReader> init(ExecutorFragmentContext ctx, HiveSubScan config) {
+    final HiveReaderFactory readerFactory = getReaderFactory(config);
+    final UserGroupInformation proxyUgi = ImpersonationUtil.createProxyUgi(config.getUserName(), ctx.getQueryUserName());
+    final List<List<InputSplit>> inputSplits = config.getInputSplits();
+    final HiveConf hiveConf = config.getHiveConf();
+
+    if (inputSplits.isEmpty()) {
+      return Collections.singletonList(
+          readerFactory.createReader(config.getTable(), null /*partition*/, null /*split*/, config.getColumns(), ctx, hiveConf, proxyUgi)
+      );
+    } else {
+      IndexedPartitions partitions = getPartitions(config);
+      return IntStream.range(0, inputSplits.size())
+          .mapToObj(idx ->
+              readerFactory.createReader(
+                  config.getTable(),
+                  partitions.get(idx),
+                  inputSplits.get(idx),
+                  config.getColumns(),
+                  ctx, hiveConf, proxyUgi))
+          .collect(Collectors.toList());
+    }
+  }
+
+  /**
+   * Returns reference to {@link HiveTextRecordReader}'s constructor if
+   * file being read has text format or reference to {@link HiveDefaultRecordReader}'s
+   * constructor otherwise.
+   *
+   * @param config context which holds different Hive configurations
+   * @return reference to concrete reader constructor which is unified under type {@link HiveReaderFactory}
+   */
+  private static HiveReaderFactory getReaderFactory(HiveSubScan config) {
+    String inputFormat = config.getTable().getSd().getInputFormat();
+    return TEXT_FORMAT.equals(inputFormat) ? HiveTextRecordReader::new : HiveDefaultRecordReader::new;
+  }
+
+  /**
+   * Used to select logic for partitions retrieval by index.
+   * If table hasn't partitions, get by index just returns null.
+   *
+   * @param config context which holds different Hive configurations
+   * @return get by index lambda expression unified under type {@link IndexedPartitions}
+   */
+  private static IndexedPartitions getPartitions(HiveSubScan config) {
+    return (config.getPartitions() == null || config.getPartitions().isEmpty()) ? (idx) -> null : config.getPartitions()::get;
+  }
+
+
+  /**
+   * Functional interface used to describe parameters accepted by concrete
+   * readers constructor.
+   */
+  @FunctionalInterface
+  private interface HiveReaderFactory {
+
+    RecordReader createReader(HiveTableWithColumnCache table, HivePartition partition,
+                              Collection<InputSplit> inputSplits, List<SchemaPath> projectedColumns,
+                              FragmentContext context, HiveConf hiveConf, UserGroupInformation proxyUgi);
+
+  }
+
+  /**
+   * Functional interface used to represent get partition
+   * by index logic.
+   */
+  @FunctionalInterface
+  private interface IndexedPartitions {
+
+    HivePartition get(int idx);
+
+  }
+
+}
diff --git a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/readers/initilializers/AbstractReadersInitializer.java b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/readers/initilializers/AbstractReadersInitializer.java
deleted file mode 100644
index 7f9e0c0..0000000
--- a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/readers/initilializers/AbstractReadersInitializer.java
+++ /dev/null
@@ -1,78 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.store.hive.readers.initilializers;
-
-import org.apache.drill.common.exceptions.DrillRuntimeException;
-import org.apache.drill.exec.ops.FragmentContext;
-import org.apache.drill.exec.store.RecordReader;
-import org.apache.drill.exec.store.hive.HivePartition;
-import org.apache.drill.exec.store.hive.HiveSubScan;
-import org.apache.drill.exec.store.hive.HiveTableWithColumnCache;
-import org.apache.drill.exec.store.hive.readers.HiveAbstractReader;
-import org.apache.drill.exec.util.ImpersonationUtil;
-import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.metastore.api.Partition;
-import org.apache.hadoop.security.UserGroupInformation;
-
-import java.lang.reflect.Constructor;
-import java.util.Collection;
-import java.util.List;
-
-/**
- * Parent class for reader initializers which create reader based on reader class.
- * Holds common logic how to create reader constructor and reader instance.
- * Is responsible to ensure each child class implements logic for initializing record reader.
- */
-public abstract class AbstractReadersInitializer {
-
-  protected final HiveSubScan config;
-
-  private final FragmentContext context;
-  private final Class<? extends HiveAbstractReader> readerClass;
-  private final UserGroupInformation proxyUgi;
-
-  public AbstractReadersInitializer(FragmentContext context, HiveSubScan config, Class<? extends HiveAbstractReader> readerClass) {
-    this.config = config;
-    this.context = context;
-    this.readerClass = readerClass;
-    this.proxyUgi = ImpersonationUtil.createProxyUgi(config.getUserName(), context.getQueryUserName());
-  }
-
-  protected Constructor<? extends HiveAbstractReader> createReaderConstructor() {
-    try {
-      return readerClass.getConstructor(HiveTableWithColumnCache.class, HivePartition.class,
-          Collection.class,
-          List.class, FragmentContext.class, HiveConf.class, UserGroupInformation.class);
-    } catch (ReflectiveOperationException e) {
-      throw new DrillRuntimeException(String.format("Unable to retrieve constructor for Hive reader class [%s]", readerClass), e);
-    }
-  }
-
-  protected HiveAbstractReader createReader(Constructor<? extends HiveAbstractReader> readerConstructor, Partition partition, Object split) {
-    try {
-      return readerConstructor.newInstance(config.getTable(), partition, split, config.getColumns(), context, config.getHiveConf(), proxyUgi);
-    } catch (ReflectiveOperationException e) {
-      throw new DrillRuntimeException(String.format("Unable to create instance for Hive reader [%s]", readerConstructor), e);
-    }
-  }
-
-  /**
-   * @return list of initialized records readers
-   */
-  public abstract List<RecordReader> init();
-}
diff --git a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/readers/initilializers/DefaultReadersInitializer.java b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/readers/initilializers/DefaultReadersInitializer.java
deleted file mode 100644
index c6fbdca..0000000
--- a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/readers/initilializers/DefaultReadersInitializer.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.store.hive.readers.initilializers;
-
-import org.apache.drill.exec.ops.FragmentContext;
-import org.apache.drill.exec.store.RecordReader;
-import org.apache.drill.exec.store.hive.HivePartition;
-import org.apache.drill.exec.store.hive.HiveSubScan;
-import org.apache.drill.exec.store.hive.readers.HiveAbstractReader;
-import org.apache.hadoop.mapred.InputSplit;
-
-import java.lang.reflect.Constructor;
-import java.util.LinkedList;
-import java.util.List;
-
-/**
- * Creates separate record reader for each given input split group.
- */
-public class DefaultReadersInitializer extends AbstractReadersInitializer {
-
-  public DefaultReadersInitializer(FragmentContext context, HiveSubScan config, Class<? extends HiveAbstractReader> readerClass) {
-    super(context, config, readerClass);
-  }
-
-  @Override
-  public List<RecordReader> init() {
-    List<List<InputSplit>> inputSplits = config.getInputSplits();
-    List<HivePartition> partitions = config.getPartitions();
-    boolean hasPartitions = partitions != null && !partitions.isEmpty();
-
-    List<RecordReader> readers = new LinkedList<>();
-    Constructor<? extends HiveAbstractReader> readerConstructor = createReaderConstructor();
-    for (int i = 0; i < inputSplits.size(); i++) {
-      readers.add(createReader(readerConstructor, hasPartitions ? partitions.get(i) : null, inputSplits.get(i)));
-    }
-    return readers;
-  }
-}
diff --git a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/readers/initilializers/EmptyReadersInitializer.java b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/readers/initilializers/EmptyReadersInitializer.java
deleted file mode 100644
index b6620b0..0000000
--- a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/readers/initilializers/EmptyReadersInitializer.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.store.hive.readers.initilializers;
-
-import org.apache.drill.exec.ops.FragmentContext;
-import org.apache.drill.exec.store.RecordReader;
-import org.apache.drill.exec.store.hive.HiveSubScan;
-import org.apache.drill.exec.store.hive.readers.HiveAbstractReader;
-
-import java.lang.reflect.Constructor;
-import java.util.ArrayList;
-import java.util.List;
-
-/**
- * If table is empty creates an empty record reader to output the schema.
- */
-public class EmptyReadersInitializer extends AbstractReadersInitializer {
-
-  public EmptyReadersInitializer(FragmentContext context, HiveSubScan config, Class<? extends HiveAbstractReader> readerClass) {
-    super(context, config, readerClass);
-  }
-
-  @Override
-  public List<RecordReader> init() {
-    List<RecordReader> readers = new ArrayList<>(1);
-    Constructor<? extends HiveAbstractReader> readerConstructor = createReaderConstructor();
-    readers.add(createReader(readerConstructor, null, null));
-    return readers;
-  }
-
-}
diff --git a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/readers/initilializers/ReadersInitializer.java b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/readers/initilializers/ReadersInitializer.java
deleted file mode 100644
index 78aaf42..0000000
--- a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/readers/initilializers/ReadersInitializer.java
+++ /dev/null
@@ -1,87 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.store.hive.readers.initilializers;
-
-import org.apache.drill.exec.ops.FragmentContext;
-import org.apache.drill.exec.store.hive.HiveSubScan;
-import org.apache.drill.exec.store.hive.readers.HiveAbstractReader;
-import org.apache.drill.exec.store.hive.readers.HiveAvroReader;
-import org.apache.drill.exec.store.hive.readers.HiveDefaultReader;
-import org.apache.drill.exec.store.hive.readers.HiveOrcReader;
-import org.apache.drill.exec.store.hive.readers.HiveParquetReader;
-import org.apache.drill.exec.store.hive.readers.HiveRCFileReader;
-import org.apache.drill.exec.store.hive.readers.HiveTextReader;
-import org.apache.hadoop.hive.ql.io.RCFileInputFormat;
-import org.apache.hadoop.hive.ql.io.avro.AvroContainerInputFormat;
-import org.apache.hadoop.hive.ql.io.orc.OrcInputFormat;
-import org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat;
-import org.apache.hadoop.mapred.TextInputFormat;
-
-import java.util.HashMap;
-import java.util.Map;
-
-public class ReadersInitializer {
-
-  /**
-   * List of all available readers classes for a different Hive nativ formats:
-   * ORC, AVRO, RCFFile, Text and Parquet.
-   */
-  private static final Map<String, Class<? extends HiveAbstractReader>> READER_MAP = new HashMap<>();
-
-  static {
-    READER_MAP.put(OrcInputFormat.class.getCanonicalName(), HiveOrcReader.class);
-    READER_MAP.put(AvroContainerInputFormat.class.getCanonicalName(), HiveAvroReader.class);
-    READER_MAP.put(RCFileInputFormat.class.getCanonicalName(), HiveRCFileReader.class);
-    READER_MAP.put(MapredParquetInputFormat.class.getCanonicalName(), HiveParquetReader.class);
-    READER_MAP.put(TextInputFormat.class.getCanonicalName(), HiveTextReader.class);
-  }
-
-  /**
-   * Determines which reader initializer should be used got given table configuration.
-   * Decision is made based on table content and skip header / footer logic usage.
-   *
-   * @param context fragment context
-   * @param config Hive table config
-   * @return reader initializer
-   */
-  public static AbstractReadersInitializer getInitializer(FragmentContext context, HiveSubScan config) {
-    Class<? extends HiveAbstractReader> readerClass = getReaderClass(config);
-    if (config.getInputSplits().isEmpty()) {
-      return new EmptyReadersInitializer(context, config, readerClass);
-    } else {
-      return new DefaultReadersInitializer(context, config, readerClass);
-    }
-  }
-
-  /**
-   * Will try to find reader class based on Hive table input format.
-   * If reader class was not find, will use default reader class.
-   *
-   * @param config Hive table config
-   * @return reader class
-   */
-  private static Class<? extends HiveAbstractReader> getReaderClass(HiveSubScan config) {
-    final String formatName = config.getTable().getSd().getInputFormat();
-    Class<? extends HiveAbstractReader> readerClass = HiveDefaultReader.class;
-    if (READER_MAP.containsKey(formatName)) {
-      readerClass = READER_MAP.get(formatName);
-    }
-    return readerClass;
-  }
-
-}
diff --git a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/readers/inspectors/AbstractRecordsInspector.java b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/readers/inspectors/AbstractRecordsInspector.java
index a5ab239..d291ee6 100644
--- a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/readers/inspectors/AbstractRecordsInspector.java
+++ b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/readers/inspectors/AbstractRecordsInspector.java
@@ -17,7 +17,7 @@
  */
 package org.apache.drill.exec.store.hive.readers.inspectors;
 
-import org.apache.drill.exec.store.hive.readers.HiveAbstractReader;
+import org.apache.drill.exec.store.hive.readers.HiveDefaultRecordReader;
 
 /**
  * Parent class for records inspectors which responsible for counting of processed records
@@ -33,7 +33,7 @@ public abstract class AbstractRecordsInspector {
    * @return true if reached max number of records in batch
    */
   public boolean isBatchFull() {
-    return processedRecordCount >= HiveAbstractReader.TARGET_RECORD_COUNT;
+    return processedRecordCount >= HiveDefaultRecordReader.TARGET_RECORD_COUNT;
   }
 
   /**
diff --git a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/readers/inspectors/DefaultRecordsInspector.java b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/writers/HiveValueWriter.java
similarity index 57%
copy from contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/readers/inspectors/DefaultRecordsInspector.java
copy to contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/writers/HiveValueWriter.java
index 55a1a3d..8fd5ab2 100644
--- a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/readers/inspectors/DefaultRecordsInspector.java
+++ b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/writers/HiveValueWriter.java
@@ -15,28 +15,24 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.drill.exec.store.hive.readers.inspectors;
+package org.apache.drill.exec.store.hive.writers;
+
 
 /**
- * Default records inspector that uses the same value holder for each record.
- * Each value once written is immediately processed thus value holder can be re-used.
+ * The writer is used to abstract writing of row values or values embedded into row
+ * (for example, elements of List complex type).
+ * <p>
+ * {@link HiveValueWriterFactory} constructs top level writers for columns, which then mapped to Hive
+ * table column and used for writing each column's row value into vectors.
  */
-public class DefaultRecordsInspector extends AbstractRecordsInspector {
-
-  private final Object value;
-
-  public DefaultRecordsInspector(Object value) {
-    this.value = value;
-  }
-
-  @Override
-  public Object getValueHolder() {
-    return value;
-  }
+public interface HiveValueWriter {
 
-  @Override
-  public Object getNextValue() {
-    return value;
-  }
+  /**
+   * Accepts row top or embedded value of concrete column and writes it into
+   * appropriate value vector.
+   *
+   * @param value top or embedded row value of Hive column
+   */
+  void write(Object value);
 
 }
diff --git a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/writers/HiveValueWriterFactory.java b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/writers/HiveValueWriterFactory.java
new file mode 100644
index 0000000..c724a7a
--- /dev/null
+++ b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/writers/HiveValueWriterFactory.java
@@ -0,0 +1,235 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.hive.writers;
+
+import java.util.function.BiFunction;
+import java.util.function.Function;
+
+import io.netty.buffer.DrillBuf;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.exec.store.hive.writers.complex.HiveListWriter;
+import org.apache.drill.exec.store.hive.writers.primitive.HiveBinaryWriter;
+import org.apache.drill.exec.store.hive.writers.primitive.HiveBooleanWriter;
+import org.apache.drill.exec.store.hive.writers.primitive.HiveByteWriter;
+import org.apache.drill.exec.store.hive.writers.primitive.HiveCharWriter;
+import org.apache.drill.exec.store.hive.writers.primitive.HiveDateWriter;
+import org.apache.drill.exec.store.hive.writers.primitive.HiveDecimalWriter;
+import org.apache.drill.exec.store.hive.writers.primitive.HiveDoubleWriter;
+import org.apache.drill.exec.store.hive.writers.primitive.HiveFloatWriter;
+import org.apache.drill.exec.store.hive.writers.primitive.HiveIntWriter;
+import org.apache.drill.exec.store.hive.writers.primitive.HiveLongWriter;
+import org.apache.drill.exec.store.hive.writers.primitive.HiveShortWriter;
+import org.apache.drill.exec.store.hive.writers.primitive.HiveStringWriter;
+import org.apache.drill.exec.store.hive.writers.primitive.HiveTimestampWriter;
+import org.apache.drill.exec.store.hive.writers.primitive.HiveVarCharWriter;
+import org.apache.drill.exec.vector.complex.impl.SingleMapWriter;
+import org.apache.drill.exec.vector.complex.writer.BaseWriter;
+import org.apache.drill.exec.vector.complex.writer.BaseWriter.ListWriter;
+import org.apache.drill.exec.vector.complex.writer.BaseWriter.MapWriter;
+import org.apache.drill.exec.vector.complex.writer.BigIntWriter;
+import org.apache.drill.exec.vector.complex.writer.BitWriter;
+import org.apache.drill.exec.vector.complex.writer.DateWriter;
+import org.apache.drill.exec.vector.complex.writer.Float4Writer;
+import org.apache.drill.exec.vector.complex.writer.Float8Writer;
+import org.apache.drill.exec.vector.complex.writer.IntWriter;
+import org.apache.drill.exec.vector.complex.writer.TimeStampWriter;
+import org.apache.drill.exec.vector.complex.writer.VarBinaryWriter;
+import org.apache.drill.exec.vector.complex.writer.VarCharWriter;
+import org.apache.drill.exec.vector.complex.writer.VarDecimalWriter;
+import org.apache.hadoop.hive.serde2.objectinspector.ListObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.StructField;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.BinaryObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.BooleanObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.ByteObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.DateObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.DoubleObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.FloatObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.HiveCharObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.HiveDecimalObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.HiveVarcharObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.IntObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.LongObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.ShortObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.StringObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.TimestampObjectInspector;
+import org.apache.hadoop.hive.serde2.typeinfo.DecimalTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.ListTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.drill.exec.store.hive.HiveUtilities.throwUnsupportedHiveDataTypeError;
+
+/**
+ * Factory used by reader to create Hive writers for columns.
+ */
+public final class HiveValueWriterFactory {
+
+  private static final Logger logger = LoggerFactory.getLogger(HiveValueWriterFactory.class);
+
+  /**
+   * Buffer shared across created Hive writers. May be used by writer for reading data
+   * to buffer than from buffer to vector.
+   */
+  private final DrillBuf drillBuf;
+
+  /**
+   * Used to manage and create column writers.
+   */
+  private final SingleMapWriter rootWriter;
+
+  public HiveValueWriterFactory(DrillBuf drillBuf, SingleMapWriter rootWriter) {
+    this.drillBuf = drillBuf;
+    this.rootWriter = rootWriter;
+  }
+
+  /**
+   * Method that will be called once for each column in reader to initialize column writer.
+   *
+   * @param columnName name of column for writer
+   * @param fieldRef   metadata about field type
+   * @return instance of writer for column
+   */
+  public HiveValueWriter createHiveColumnValueWriter(String columnName, StructField fieldRef) {
+    ObjectInspector objectInspector = fieldRef.getFieldObjectInspector();
+    final TypeInfo typeInfo = TypeInfoUtils.getTypeInfoFromTypeString(objectInspector.getTypeName());
+    return createHiveValueWriter(columnName, typeInfo, objectInspector, rootWriter);
+  }
+
+  private HiveValueWriter createHiveValueWriter(String columnName, TypeInfo typeInfo, ObjectInspector objectInspector, BaseWriter parentWriter) {
+    switch (typeInfo.getCategory()) {
+      case PRIMITIVE:
+        return createPrimitiveHiveValueWriter(columnName, objectInspector, (PrimitiveTypeInfo) typeInfo, parentWriter);
+      case LIST: {
+        ListWriter listWriter = extractWriter(columnName, parentWriter, MapWriter::list, ListWriter::list);
+        ListObjectInspector listObjectInspector = (ListObjectInspector) objectInspector;
+        TypeInfo elemTypeInfo = ((ListTypeInfo) typeInfo).getListElementTypeInfo();
+        ObjectInspector elementInspector = listObjectInspector.getListElementObjectInspector();
+        HiveValueWriter elementValueWriter = createHiveValueWriter(null, elemTypeInfo, elementInspector, listWriter);
+        return new HiveListWriter(listObjectInspector, listWriter, elementValueWriter);
+      }
+    }
+    throwUnsupportedHiveDataTypeError(typeInfo.getCategory().toString());
+    return null;
+  }
+
+  /**
+   * Creates writer for primitive type value.
+   *
+   * @param name      column name or null if nested
+   * @param inspector inspector of column values
+   * @param typeInfo  column type used to distinguish returned writers
+   * @return appropriate instance of HiveValueWriter for column containing primitive scalar
+   */
+  private HiveValueWriter createPrimitiveHiveValueWriter(String name, ObjectInspector inspector, PrimitiveTypeInfo typeInfo, BaseWriter parentWriter) {
+    switch (typeInfo.getPrimitiveCategory()) {
+      case BINARY: {
+        VarBinaryWriter writer = extractWriter(name, parentWriter, MapWriter::varBinary, ListWriter::varBinary);
+        return new HiveBinaryWriter((BinaryObjectInspector) inspector, writer, drillBuf);
+      }
+      case BOOLEAN: {
+        BitWriter writer = extractWriter(name, parentWriter, MapWriter::bit, ListWriter::bit);
+        return new HiveBooleanWriter((BooleanObjectInspector) inspector, writer);
+      }
+      case BYTE: {
+        IntWriter writer = extractWriter(name, parentWriter, MapWriter::integer, ListWriter::integer);
+        return new HiveByteWriter((ByteObjectInspector) inspector, writer);
+      }
+      case DOUBLE: {
+        Float8Writer writer = extractWriter(name, parentWriter, MapWriter::float8, ListWriter::float8);
+        return new HiveDoubleWriter((DoubleObjectInspector) inspector, writer);
+      }
+      case FLOAT: {
+        Float4Writer writer = extractWriter(name, parentWriter, MapWriter::float4, ListWriter::float4);
+        return new HiveFloatWriter((FloatObjectInspector) inspector, writer);
+      }
+      case INT: {
+        IntWriter writer = extractWriter(name, parentWriter, MapWriter::integer, ListWriter::integer);
+        return new HiveIntWriter((IntObjectInspector) inspector, writer);
+      }
+      case LONG: {
+        BigIntWriter writer = extractWriter(name, parentWriter, MapWriter::bigInt, ListWriter::bigInt);
+        return new HiveLongWriter((LongObjectInspector) inspector, writer);
+      }
+      case SHORT: {
+        IntWriter writer = extractWriter(name, parentWriter, MapWriter::integer, ListWriter::integer);
+        return new HiveShortWriter((ShortObjectInspector) inspector, writer);
+      }
+      case STRING: {
+        VarCharWriter writer = extractWriter(name, parentWriter, MapWriter::varChar, ListWriter::varChar);
+        return new HiveStringWriter((StringObjectInspector) inspector, writer, drillBuf);
+      }
+      case VARCHAR: {
+        VarCharWriter writer = extractWriter(name, parentWriter, MapWriter::varChar, ListWriter::varChar);
+        return new HiveVarCharWriter((HiveVarcharObjectInspector) inspector, writer, drillBuf);
+      }
+      case TIMESTAMP: {
+        TimeStampWriter writer = extractWriter(name, parentWriter, MapWriter::timeStamp, ListWriter::timeStamp);
+        return new HiveTimestampWriter((TimestampObjectInspector) inspector, writer);
+      }
+      case DATE: {
+        DateWriter writer = extractWriter(name, parentWriter, MapWriter::date, ListWriter::date);
+        return new HiveDateWriter((DateObjectInspector) inspector, writer);
+      }
+      case CHAR: {
+        VarCharWriter writer = extractWriter(name, parentWriter, MapWriter::varChar, ListWriter::varChar);
+        return new HiveCharWriter((HiveCharObjectInspector) inspector, writer, drillBuf);
+      }
+      case DECIMAL: {
+        DecimalTypeInfo decimalType = (DecimalTypeInfo) typeInfo;
+        int scale = decimalType.getScale();
+        int precision = decimalType.getPrecision();
+        VarDecimalWriter writer = extractWriter(name, parentWriter,
+            (mapWriter, key) -> mapWriter.varDecimal(key, scale, precision),
+            listWriter -> listWriter.varDecimal(scale, precision));
+        return new HiveDecimalWriter((HiveDecimalObjectInspector) inspector, writer, scale);
+      }
+      default:
+        throw UserException.unsupportedError()
+            .message("Unsupported primitive data type %s")
+            .build(logger);
+    }
+  }
+
+  /**
+   * Used to extract child writer from parent writer, assuming that parent writer may be instance of
+   * {@link MapWriter} or {@link ListWriter}
+   *
+   * @param name         column or struct field name
+   * @param parentWriter parent writer used for getting child writer
+   * @param fromMap      function for extracting writer from map parent writer
+   * @param fromList     function for extracting writer from list parent writer
+   * @param <T>          type of extracted writer
+   * @return writer extracted using either fromMap or fromList function
+   */
+  private static <T> T extractWriter(String name, BaseWriter parentWriter,
+                                     BiFunction<MapWriter, String, T> fromMap,
+                                     Function<ListWriter, T> fromList) {
+    if (parentWriter instanceof MapWriter && name != null) {
+      return fromMap.apply((MapWriter) parentWriter, name);
+    } else if (parentWriter instanceof ListWriter) {
+      return fromList.apply((ListWriter) parentWriter);
+    } else {
+      throw new IllegalStateException(String.format("Parent writer with type [%s] is unsupported", parentWriter.getClass()));
+    }
+  }
+
+}
diff --git a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/writers/complex/HiveListWriter.java b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/writers/complex/HiveListWriter.java
new file mode 100644
index 0000000..11b265e
--- /dev/null
+++ b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/writers/complex/HiveListWriter.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.hive.writers.complex;
+
+import org.apache.drill.exec.store.hive.writers.HiveValueWriter;
+import org.apache.drill.exec.vector.complex.writer.BaseWriter;
+import org.apache.hadoop.hive.serde2.objectinspector.ListObjectInspector;
+
+public class HiveListWriter implements HiveValueWriter {
+
+  private final ListObjectInspector listInspector;
+
+  private final HiveValueWriter elementWriter;
+
+  private final BaseWriter.ListWriter listWriter;
+
+  public HiveListWriter(ListObjectInspector listInspector, BaseWriter.ListWriter listWriter, HiveValueWriter elementWriter) {
+    this.listInspector = listInspector;
+    this.elementWriter = elementWriter;
+    this.listWriter = listWriter;
+  }
+
+
+  @Override
+  public void write(Object value) {
+    listWriter.startList();
+    for (final Object element : listInspector.getList(value)) {
+      if (element == null) {
+        throw new UnsupportedOperationException("Null is not supported in Hive array!");
+      } else {
+        elementWriter.write(element);
+      }
+    }
+    listWriter.endList();
+  }
+
+}
diff --git a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/readers/inspectors/DefaultRecordsInspector.java b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/writers/package-info.java
similarity index 59%
copy from contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/readers/inspectors/DefaultRecordsInspector.java
copy to contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/writers/package-info.java
index 55a1a3d..d6babf2 100644
--- a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/readers/inspectors/DefaultRecordsInspector.java
+++ b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/writers/package-info.java
@@ -15,28 +15,10 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.drill.exec.store.hive.readers.inspectors;
 
 /**
- * Default records inspector that uses the same value holder for each record.
- * Each value once written is immediately processed thus value holder can be re-used.
+ * Package containing writers used to write data from Hive table columns into Drill's value vectors.
+ * All instances of {@link org.apache.drill.exec.store.hive.writers.HiveValueWriter} should be instantiated
+ * using {@link org.apache.drill.exec.store.hive.writers.HiveValueWriterFactory} to be configured properly.
  */
-public class DefaultRecordsInspector extends AbstractRecordsInspector {
-
-  private final Object value;
-
-  public DefaultRecordsInspector(Object value) {
-    this.value = value;
-  }
-
-  @Override
-  public Object getValueHolder() {
-    return value;
-  }
-
-  @Override
-  public Object getNextValue() {
-    return value;
-  }
-
-}
+package org.apache.drill.exec.store.hive.writers;
diff --git a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/readers/inspectors/DefaultRecordsInspector.java b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/writers/primitive/AbstractSingleValueWriter.java
similarity index 54%
copy from contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/readers/inspectors/DefaultRecordsInspector.java
copy to contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/writers/primitive/AbstractSingleValueWriter.java
index 55a1a3d..5579204 100644
--- a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/readers/inspectors/DefaultRecordsInspector.java
+++ b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/writers/primitive/AbstractSingleValueWriter.java
@@ -15,28 +15,27 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.drill.exec.store.hive.readers.inspectors;
+package org.apache.drill.exec.store.hive.writers.primitive;
+
+import org.apache.drill.exec.store.hive.writers.HiveValueWriter;
+import org.apache.drill.exec.vector.complex.writer.BaseWriter;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
 
 /**
- * Default records inspector that uses the same value holder for each record.
- * Each value once written is immediately processed thus value holder can be re-used.
+ * Parent class for all primitive value writers
+ *
+ * @param <I> type of inspector
+ * @param <W> type of underlying vector writer
  */
-public class DefaultRecordsInspector extends AbstractRecordsInspector {
+public abstract class AbstractSingleValueWriter<I extends ObjectInspector, W extends BaseWriter> implements HiveValueWriter {
 
-  private final Object value;
+  protected final I inspector;
 
-  public DefaultRecordsInspector(Object value) {
-    this.value = value;
-  }
-
-  @Override
-  public Object getValueHolder() {
-    return value;
-  }
+  protected final W writer;
 
-  @Override
-  public Object getNextValue() {
-    return value;
+  public AbstractSingleValueWriter(I inspector, W writer) {
+    this.inspector = inspector;
+    this.writer = writer;
   }
 
 }
diff --git a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/readers/inspectors/DefaultRecordsInspector.java b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/writers/primitive/HiveBinaryWriter.java
similarity index 51%
copy from contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/readers/inspectors/DefaultRecordsInspector.java
copy to contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/writers/primitive/HiveBinaryWriter.java
index 55a1a3d..695d0fe 100644
--- a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/readers/inspectors/DefaultRecordsInspector.java
+++ b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/writers/primitive/HiveBinaryWriter.java
@@ -15,28 +15,27 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.drill.exec.store.hive.readers.inspectors;
+package org.apache.drill.exec.store.hive.writers.primitive;
 
-/**
- * Default records inspector that uses the same value holder for each record.
- * Each value once written is immediately processed thus value holder can be re-used.
- */
-public class DefaultRecordsInspector extends AbstractRecordsInspector {
+import io.netty.buffer.DrillBuf;
+import org.apache.drill.exec.vector.complex.writer.VarBinaryWriter;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.BinaryObjectInspector;
 
-  private final Object value;
+public class HiveBinaryWriter extends AbstractSingleValueWriter<BinaryObjectInspector, VarBinaryWriter> {
 
-  public DefaultRecordsInspector(Object value) {
-    this.value = value;
-  }
+  private DrillBuf drillBuf;
 
-  @Override
-  public Object getValueHolder() {
-    return value;
+  public HiveBinaryWriter(BinaryObjectInspector inspector, VarBinaryWriter writer, DrillBuf drillBuf) {
+    super(inspector, writer);
+    this.drillBuf = drillBuf;
   }
 
   @Override
-  public Object getNextValue() {
-    return value;
+  public void write(Object value) {
+    byte[] bytes = inspector.getPrimitiveJavaObject(value);
+    drillBuf = drillBuf.reallocIfNeeded(bytes.length);
+    drillBuf.setBytes(0, bytes);
+    writer.writeVarBinary(0, bytes.length, drillBuf);
   }
 
 }
diff --git a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/readers/inspectors/DefaultRecordsInspector.java b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/writers/primitive/HiveBooleanWriter.java
similarity index 61%
copy from contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/readers/inspectors/DefaultRecordsInspector.java
copy to contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/writers/primitive/HiveBooleanWriter.java
index 55a1a3d..02d30c8 100644
--- a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/readers/inspectors/DefaultRecordsInspector.java
+++ b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/writers/primitive/HiveBooleanWriter.java
@@ -15,28 +15,20 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.drill.exec.store.hive.readers.inspectors;
+package org.apache.drill.exec.store.hive.writers.primitive;
 
-/**
- * Default records inspector that uses the same value holder for each record.
- * Each value once written is immediately processed thus value holder can be re-used.
- */
-public class DefaultRecordsInspector extends AbstractRecordsInspector {
-
-  private final Object value;
+import org.apache.drill.exec.vector.complex.writer.BitWriter;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.BooleanObjectInspector;
 
-  public DefaultRecordsInspector(Object value) {
-    this.value = value;
-  }
+public class HiveBooleanWriter extends AbstractSingleValueWriter<BooleanObjectInspector, BitWriter> {
 
-  @Override
-  public Object getValueHolder() {
-    return value;
+  public HiveBooleanWriter(BooleanObjectInspector inspector, BitWriter writer) {
+    super(inspector, writer);
   }
 
   @Override
-  public Object getNextValue() {
-    return value;
+  public void write(Object value) {
+    writer.writeBit(inspector.get(value) ? 1 : 0);
   }
 
 }
diff --git a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/readers/inspectors/DefaultRecordsInspector.java b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/writers/primitive/HiveByteWriter.java
similarity index 61%
copy from contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/readers/inspectors/DefaultRecordsInspector.java
copy to contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/writers/primitive/HiveByteWriter.java
index 55a1a3d..a989665 100644
--- a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/readers/inspectors/DefaultRecordsInspector.java
+++ b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/writers/primitive/HiveByteWriter.java
@@ -15,28 +15,20 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.drill.exec.store.hive.readers.inspectors;
+package org.apache.drill.exec.store.hive.writers.primitive;
 
-/**
- * Default records inspector that uses the same value holder for each record.
- * Each value once written is immediately processed thus value holder can be re-used.
- */
-public class DefaultRecordsInspector extends AbstractRecordsInspector {
-
-  private final Object value;
+import org.apache.drill.exec.vector.complex.writer.IntWriter;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.ByteObjectInspector;
 
-  public DefaultRecordsInspector(Object value) {
-    this.value = value;
-  }
+public class HiveByteWriter extends AbstractSingleValueWriter<ByteObjectInspector, IntWriter> {
 
-  @Override
-  public Object getValueHolder() {
-    return value;
+  public HiveByteWriter(ByteObjectInspector inspector, IntWriter writer) {
+    super(inspector, writer);
   }
 
   @Override
-  public Object getNextValue() {
-    return value;
+  public void write(Object value) {
+    writer.writeInt(inspector.get(value));
   }
 
 }
diff --git a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/writers/primitive/HiveCharWriter.java b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/writers/primitive/HiveCharWriter.java
new file mode 100644
index 0000000..5b9830e
--- /dev/null
+++ b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/writers/primitive/HiveCharWriter.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.hive.writers.primitive;
+
+import io.netty.buffer.DrillBuf;
+import org.apache.drill.exec.vector.complex.writer.VarCharWriter;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.HiveCharObjectInspector;
+import org.apache.hadoop.io.Text;
+
+public class HiveCharWriter extends AbstractSingleValueWriter<HiveCharObjectInspector, VarCharWriter> {
+
+  private DrillBuf drillBuf;
+
+  public HiveCharWriter(HiveCharObjectInspector inspector, VarCharWriter writer, DrillBuf drillBuf) {
+    super(inspector, writer);
+    this.drillBuf = drillBuf;
+  }
+
+  @Override
+  public void write(Object value) {
+    final Text textValue = inspector.getPrimitiveWritableObject(value).getStrippedValue();
+    byte[] bytes = textValue.getBytes();
+    drillBuf = drillBuf.reallocIfNeeded(bytes.length);
+    drillBuf.setBytes(0, bytes);
+    writer.writeVarChar(0, textValue.getLength(), drillBuf);
+  }
+
+}
diff --git a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/readers/inspectors/DefaultRecordsInspector.java b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/writers/primitive/HiveDateWriter.java
similarity index 53%
copy from contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/readers/inspectors/DefaultRecordsInspector.java
copy to contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/writers/primitive/HiveDateWriter.java
index 55a1a3d..a1b4822 100644
--- a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/readers/inspectors/DefaultRecordsInspector.java
+++ b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/writers/primitive/HiveDateWriter.java
@@ -15,28 +15,24 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.drill.exec.store.hive.readers.inspectors;
+package org.apache.drill.exec.store.hive.writers.primitive;
 
-/**
- * Default records inspector that uses the same value holder for each record.
- * Each value once written is immediately processed thus value holder can be re-used.
- */
-public class DefaultRecordsInspector extends AbstractRecordsInspector {
-
-  private final Object value;
+import org.apache.drill.exec.vector.complex.writer.DateWriter;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.DateObjectInspector;
+import org.joda.time.DateTime;
+import org.joda.time.DateTimeZone;
 
-  public DefaultRecordsInspector(Object value) {
-    this.value = value;
-  }
+public class HiveDateWriter extends AbstractSingleValueWriter<DateObjectInspector, DateWriter> {
 
-  @Override
-  public Object getValueHolder() {
-    return value;
+  public HiveDateWriter(DateObjectInspector inspector, DateWriter writer) {
+    super(inspector, writer);
   }
 
   @Override
-  public Object getNextValue() {
-    return value;
+  public void write(Object value) {
+    final java.sql.Date dateValue = inspector.getPrimitiveJavaObject(value);
+    final DateTime date = new DateTime(dateValue.getTime()).withZoneRetainFields(DateTimeZone.UTC);
+    writer.writeDate(date.getMillis());
   }
 
 }
diff --git a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/readers/inspectors/DefaultRecordsInspector.java b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/writers/primitive/HiveDecimalWriter.java
similarity index 51%
copy from contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/readers/inspectors/DefaultRecordsInspector.java
copy to contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/writers/primitive/HiveDecimalWriter.java
index 55a1a3d..36949a8 100644
--- a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/readers/inspectors/DefaultRecordsInspector.java
+++ b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/writers/primitive/HiveDecimalWriter.java
@@ -15,28 +15,28 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.drill.exec.store.hive.readers.inspectors;
+package org.apache.drill.exec.store.hive.writers.primitive;
 
-/**
- * Default records inspector that uses the same value holder for each record.
- * Each value once written is immediately processed thus value holder can be re-used.
- */
-public class DefaultRecordsInspector extends AbstractRecordsInspector {
+import java.math.BigDecimal;
+import java.math.RoundingMode;
 
-  private final Object value;
+import org.apache.drill.exec.vector.complex.writer.VarDecimalWriter;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.HiveDecimalObjectInspector;
 
-  public DefaultRecordsInspector(Object value) {
-    this.value = value;
-  }
+public class HiveDecimalWriter extends AbstractSingleValueWriter<HiveDecimalObjectInspector, VarDecimalWriter> {
 
-  @Override
-  public Object getValueHolder() {
-    return value;
+  private final int scale;
+
+  public HiveDecimalWriter(HiveDecimalObjectInspector inspector, VarDecimalWriter writer, int scale) {
+    super(inspector, writer);
+    this.scale = scale;
   }
 
   @Override
-  public Object getNextValue() {
-    return value;
+  public void write(Object value) {
+    BigDecimal decimalValue = inspector.getPrimitiveJavaObject(value).bigDecimalValue()
+        .setScale(scale, RoundingMode.HALF_UP);
+    writer.writeVarDecimal(decimalValue);
   }
 
 }
diff --git a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/readers/inspectors/DefaultRecordsInspector.java b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/writers/primitive/HiveDoubleWriter.java
similarity index 61%
copy from contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/readers/inspectors/DefaultRecordsInspector.java
copy to contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/writers/primitive/HiveDoubleWriter.java
index 55a1a3d..af29719 100644
--- a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/readers/inspectors/DefaultRecordsInspector.java
+++ b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/writers/primitive/HiveDoubleWriter.java
@@ -15,28 +15,20 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.drill.exec.store.hive.readers.inspectors;
+package org.apache.drill.exec.store.hive.writers.primitive;
 
-/**
- * Default records inspector that uses the same value holder for each record.
- * Each value once written is immediately processed thus value holder can be re-used.
- */
-public class DefaultRecordsInspector extends AbstractRecordsInspector {
-
-  private final Object value;
+import org.apache.drill.exec.vector.complex.writer.Float8Writer;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.DoubleObjectInspector;
 
-  public DefaultRecordsInspector(Object value) {
-    this.value = value;
-  }
+public class HiveDoubleWriter extends AbstractSingleValueWriter<DoubleObjectInspector, Float8Writer> {
 
-  @Override
-  public Object getValueHolder() {
-    return value;
+  public HiveDoubleWriter(DoubleObjectInspector inspector, Float8Writer writer) {
+    super(inspector, writer);
   }
 
   @Override
-  public Object getNextValue() {
-    return value;
+  public void write(Object value) {
+    writer.writeFloat8(inspector.get(value));
   }
 
 }
diff --git a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/readers/inspectors/DefaultRecordsInspector.java b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/writers/primitive/HiveFloatWriter.java
similarity index 61%
copy from contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/readers/inspectors/DefaultRecordsInspector.java
copy to contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/writers/primitive/HiveFloatWriter.java
index 55a1a3d..bcf2396 100644
--- a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/readers/inspectors/DefaultRecordsInspector.java
+++ b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/writers/primitive/HiveFloatWriter.java
@@ -15,28 +15,20 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.drill.exec.store.hive.readers.inspectors;
+package org.apache.drill.exec.store.hive.writers.primitive;
 
-/**
- * Default records inspector that uses the same value holder for each record.
- * Each value once written is immediately processed thus value holder can be re-used.
- */
-public class DefaultRecordsInspector extends AbstractRecordsInspector {
-
-  private final Object value;
+import org.apache.drill.exec.vector.complex.writer.Float4Writer;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.FloatObjectInspector;
 
-  public DefaultRecordsInspector(Object value) {
-    this.value = value;
-  }
+public class HiveFloatWriter extends AbstractSingleValueWriter<FloatObjectInspector, Float4Writer> {
 
-  @Override
-  public Object getValueHolder() {
-    return value;
+  public HiveFloatWriter(FloatObjectInspector inspector, Float4Writer writer) {
+    super(inspector, writer);
   }
 
   @Override
-  public Object getNextValue() {
-    return value;
+  public void write(Object value) {
+    writer.writeFloat4(inspector.get(value));
   }
 
 }
diff --git a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/readers/inspectors/DefaultRecordsInspector.java b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/writers/primitive/HiveIntWriter.java
similarity index 61%
copy from contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/readers/inspectors/DefaultRecordsInspector.java
copy to contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/writers/primitive/HiveIntWriter.java
index 55a1a3d..aa4265a 100644
--- a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/readers/inspectors/DefaultRecordsInspector.java
+++ b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/writers/primitive/HiveIntWriter.java
@@ -15,28 +15,20 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.drill.exec.store.hive.readers.inspectors;
+package org.apache.drill.exec.store.hive.writers.primitive;
 
-/**
- * Default records inspector that uses the same value holder for each record.
- * Each value once written is immediately processed thus value holder can be re-used.
- */
-public class DefaultRecordsInspector extends AbstractRecordsInspector {
-
-  private final Object value;
+import org.apache.drill.exec.vector.complex.writer.IntWriter;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.IntObjectInspector;
 
-  public DefaultRecordsInspector(Object value) {
-    this.value = value;
-  }
+public class HiveIntWriter extends AbstractSingleValueWriter<IntObjectInspector, IntWriter> {
 
-  @Override
-  public Object getValueHolder() {
-    return value;
+  public HiveIntWriter(IntObjectInspector inspector, IntWriter writer) {
+    super(inspector, writer);
   }
 
   @Override
-  public Object getNextValue() {
-    return value;
+  public void write(Object value) {
+    writer.writeInt(inspector.get(value));
   }
 
 }
diff --git a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/readers/inspectors/DefaultRecordsInspector.java b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/writers/primitive/HiveLongWriter.java
similarity index 61%
copy from contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/readers/inspectors/DefaultRecordsInspector.java
copy to contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/writers/primitive/HiveLongWriter.java
index 55a1a3d..4618ae7 100644
--- a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/readers/inspectors/DefaultRecordsInspector.java
+++ b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/writers/primitive/HiveLongWriter.java
@@ -15,28 +15,20 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.drill.exec.store.hive.readers.inspectors;
+package org.apache.drill.exec.store.hive.writers.primitive;
 
-/**
- * Default records inspector that uses the same value holder for each record.
- * Each value once written is immediately processed thus value holder can be re-used.
- */
-public class DefaultRecordsInspector extends AbstractRecordsInspector {
-
-  private final Object value;
+import org.apache.drill.exec.vector.complex.writer.BigIntWriter;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.LongObjectInspector;
 
-  public DefaultRecordsInspector(Object value) {
-    this.value = value;
-  }
+public class HiveLongWriter extends AbstractSingleValueWriter<LongObjectInspector, BigIntWriter> {
 
-  @Override
-  public Object getValueHolder() {
-    return value;
+  public HiveLongWriter(LongObjectInspector inspector, BigIntWriter writer) {
+    super(inspector, writer);
   }
 
   @Override
-  public Object getNextValue() {
-    return value;
+  public void write(Object value) {
+    writer.writeBigInt(inspector.get(value));
   }
 
 }
diff --git a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/readers/inspectors/DefaultRecordsInspector.java b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/writers/primitive/HiveShortWriter.java
similarity index 61%
copy from contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/readers/inspectors/DefaultRecordsInspector.java
copy to contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/writers/primitive/HiveShortWriter.java
index 55a1a3d..303cb25 100644
--- a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/readers/inspectors/DefaultRecordsInspector.java
+++ b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/writers/primitive/HiveShortWriter.java
@@ -15,28 +15,20 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.drill.exec.store.hive.readers.inspectors;
+package org.apache.drill.exec.store.hive.writers.primitive;
 
-/**
- * Default records inspector that uses the same value holder for each record.
- * Each value once written is immediately processed thus value holder can be re-used.
- */
-public class DefaultRecordsInspector extends AbstractRecordsInspector {
-
-  private final Object value;
+import org.apache.drill.exec.vector.complex.writer.IntWriter;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.ShortObjectInspector;
 
-  public DefaultRecordsInspector(Object value) {
-    this.value = value;
-  }
+public class HiveShortWriter extends AbstractSingleValueWriter<ShortObjectInspector, IntWriter> {
 
-  @Override
-  public Object getValueHolder() {
-    return value;
+  public HiveShortWriter(ShortObjectInspector inspector, IntWriter writer) {
+    super(inspector, writer);
   }
 
   @Override
-  public Object getNextValue() {
-    return value;
+  public void write(Object value) {
+    writer.writeInt(inspector.get(value));
   }
 
 }
diff --git a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/writers/primitive/HiveStringWriter.java b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/writers/primitive/HiveStringWriter.java
new file mode 100644
index 0000000..0981611
--- /dev/null
+++ b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/writers/primitive/HiveStringWriter.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.hive.writers.primitive;
+
+import io.netty.buffer.DrillBuf;
+import org.apache.drill.exec.vector.complex.writer.VarCharWriter;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.StringObjectInspector;
+import org.apache.hadoop.io.Text;
+
+public class HiveStringWriter extends AbstractSingleValueWriter<StringObjectInspector, VarCharWriter> {
+
+  private DrillBuf drillBuf;
+
+  public HiveStringWriter(StringObjectInspector inspector, VarCharWriter writer, DrillBuf drillBuf) {
+    super(inspector, writer);
+    this.drillBuf = drillBuf;
+  }
+
+  @Override
+  public void write(Object value) {
+    Text textValue = inspector.getPrimitiveWritableObject(value);
+    byte[] bytes = textValue.getBytes();
+    drillBuf = drillBuf.reallocIfNeeded(bytes.length);
+    drillBuf.setBytes(0, bytes);
+    writer.writeVarChar(0, textValue.getLength(), drillBuf);
+  }
+
+}
diff --git a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/readers/inspectors/DefaultRecordsInspector.java b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/writers/primitive/HiveTimestampWriter.java
similarity index 51%
rename from contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/readers/inspectors/DefaultRecordsInspector.java
rename to contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/writers/primitive/HiveTimestampWriter.java
index 55a1a3d..9bc2b6a 100644
--- a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/readers/inspectors/DefaultRecordsInspector.java
+++ b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/writers/primitive/HiveTimestampWriter.java
@@ -15,28 +15,24 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.drill.exec.store.hive.readers.inspectors;
+package org.apache.drill.exec.store.hive.writers.primitive;
 
-/**
- * Default records inspector that uses the same value holder for each record.
- * Each value once written is immediately processed thus value holder can be re-used.
- */
-public class DefaultRecordsInspector extends AbstractRecordsInspector {
-
-  private final Object value;
+import org.apache.drill.exec.vector.complex.writer.TimeStampWriter;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.TimestampObjectInspector;
+import org.joda.time.DateTime;
+import org.joda.time.DateTimeZone;
 
-  public DefaultRecordsInspector(Object value) {
-    this.value = value;
-  }
+public class HiveTimestampWriter extends AbstractSingleValueWriter<TimestampObjectInspector, TimeStampWriter> {
 
-  @Override
-  public Object getValueHolder() {
-    return value;
+  public HiveTimestampWriter(TimestampObjectInspector inspector, TimeStampWriter writer) {
+    super(inspector, writer);
   }
 
   @Override
-  public Object getNextValue() {
-    return value;
+  public void write(Object value) {
+    final java.sql.Timestamp timestampValue = inspector.getPrimitiveJavaObject(value);
+    final DateTime ts = new DateTime(timestampValue.getTime()).withZoneRetainFields(DateTimeZone.UTC);
+    writer.writeTimeStamp(ts.getMillis());
   }
 
 }
diff --git a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/writers/primitive/HiveVarCharWriter.java b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/writers/primitive/HiveVarCharWriter.java
new file mode 100644
index 0000000..ec78bb2
--- /dev/null
+++ b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/writers/primitive/HiveVarCharWriter.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.hive.writers.primitive;
+
+import io.netty.buffer.DrillBuf;
+import org.apache.drill.exec.vector.complex.writer.VarCharWriter;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.HiveVarcharObjectInspector;
+import org.apache.hadoop.io.Text;
+
+public class HiveVarCharWriter extends AbstractSingleValueWriter<HiveVarcharObjectInspector, VarCharWriter> {
+
+  private DrillBuf drillBuf;
+
+  public HiveVarCharWriter(HiveVarcharObjectInspector inspector, VarCharWriter writer, DrillBuf drillBuf) {
+    super(inspector, writer);
+    this.drillBuf = drillBuf;
+  }
+
+  @Override
+  public void write(Object value) {
+    Text textValue = inspector.getPrimitiveWritableObject(value).getTextValue();
+    byte[] bytes = textValue.getBytes();
+    drillBuf = drillBuf.reallocIfNeeded(bytes.length);
+    drillBuf.setBytes(0, bytes);
+    writer.writeVarChar(0, textValue.getLength(), drillBuf);
+  }
+
+}
diff --git a/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/hive/HiveTestUtilities.java b/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/hive/HiveTestUtilities.java
index 7457511..94fa239 100644
--- a/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/hive/HiveTestUtilities.java
+++ b/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/hive/HiveTestUtilities.java
@@ -25,6 +25,7 @@ import java.nio.file.attribute.PosixFilePermission;
 import java.util.EnumSet;
 import java.util.Set;
 
+import org.apache.drill.test.TestTools;
 import org.apache.hadoop.hive.ql.CommandNeedRetryException;
 import org.apache.hadoop.hive.ql.Driver;
 import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse;
@@ -83,4 +84,31 @@ public class HiveTestUtilities {
     return dir;
   }
 
+  /**
+   * Load data from test resources file into table.
+   *
+   * @param driver hive driver
+   * @param tableName destination
+   * @param relativeTestResourcePath path to test resource
+   */
+  public static void loadData(Driver driver, String tableName, Path relativeTestResourcePath){
+    String dataAbsPath = TestTools.getResourceFile(relativeTestResourcePath).getAbsolutePath();
+    String loadDataSql = String.format("LOAD DATA LOCAL INPATH '%s' OVERWRITE INTO TABLE %s", dataAbsPath, tableName);
+    executeQuery(driver, loadDataSql);
+  }
+
+  /**
+   * Performs insert from select.
+   *
+   * @param driver hive driver
+   * @param srcTable source
+   * @param destTable destination
+   */
+  public static void insertData(Driver driver, String srcTable, String destTable){
+    executeQuery(driver, String.format(
+        "INSERT OVERWRITE TABLE %s SELECT * FROM %s",
+        destTable, srcTable
+    ));
+  }
+
 }
diff --git a/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/hive/complex_types/TestHiveArrays.java b/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/hive/complex_types/TestHiveArrays.java
new file mode 100644
index 0000000..d3aa2ef
--- /dev/null
+++ b/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/hive/complex_types/TestHiveArrays.java
@@ -0,0 +1,1778 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.hive.complex_types;
+
+import java.math.BigDecimal;
+import java.nio.file.Paths;
+import java.util.Arrays;
+import java.util.Collections;
+
+import org.apache.drill.categories.HiveStorageTest;
+import org.apache.drill.exec.hive.HiveTestFixture;
+import org.apache.drill.exec.hive.HiveTestUtilities;
+import org.apache.drill.exec.util.StoragePluginTestUtils;
+import org.apache.drill.exec.util.Text;
+import org.apache.drill.test.ClusterFixture;
+import org.apache.drill.test.ClusterTest;
+import org.apache.hadoop.hive.ql.Driver;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import static java.util.Arrays.asList;
+import static java.util.Collections.emptyList;
+import static org.apache.drill.exec.expr.fn.impl.DateUtility.parseBest;
+import static org.apache.drill.exec.expr.fn.impl.DateUtility.parseLocalDate;
+
+@Category({HiveStorageTest.class})
+public class TestHiveArrays extends ClusterTest {
+
+  private static HiveTestFixture hiveTestFixture;
+
+  @BeforeClass
+  public static void setUp() throws Exception {
+    startCluster(ClusterFixture.builder(dirTestWatcher));
+    hiveTestFixture = HiveTestFixture.builder(dirTestWatcher).build();
+    hiveTestFixture.getDriverManager().runWithinSession(TestHiveArrays::generateData);
+    hiveTestFixture.getPluginManager().addHivePluginTo(cluster.drillbit());
+  }
+
+  @AfterClass
+  public static void tearDown() throws Exception {
+    if (hiveTestFixture != null) {
+      hiveTestFixture.getPluginManager().removeHivePluginFrom(cluster.drillbit());
+    }
+  }
+
+  private static void generateData(Driver d) {
+    // int_array
+    HiveTestUtilities.executeQuery(d,
+        "CREATE TABLE int_array(rid INT, arr_n_0 ARRAY<INT>, arr_n_1 ARRAY<ARRAY<INT>>,arr_n_2 ARRAY<ARRAY<ARRAY<INT>>>) " +
+            "ROW FORMAT SERDE 'org.apache.hive.hcatalog.data.JsonSerDe' STORED AS TEXTFILE");
+    HiveTestUtilities.loadData(d, "int_array", Paths.get("complex_types/array/int_array.json"));
+
+    // string_array
+    HiveTestUtilities.executeQuery(d,
+        "CREATE TABLE string_array(rid INT, arr_n_0 ARRAY<STRING>, arr_n_1 ARRAY<ARRAY<STRING>>,arr_n_2 ARRAY<ARRAY<ARRAY<STRING>>>) " +
+            "ROW FORMAT SERDE 'org.apache.hive.hcatalog.data.JsonSerDe' STORED AS TEXTFILE");
+    HiveTestUtilities.loadData(d, "string_array", Paths.get("complex_types/array/string_array.json"));
+
+    // varchar_array
+    HiveTestUtilities.executeQuery(d,
+        "CREATE TABLE varchar_array(rid INT, arr_n_0 ARRAY<VARCHAR(5)>,arr_n_1 ARRAY<ARRAY<VARCHAR(5)>>,arr_n_2 ARRAY<ARRAY<ARRAY<VARCHAR(5)>>>) " +
+            "ROW FORMAT SERDE 'org.apache.hive.hcatalog.data.JsonSerDe' STORED AS TEXTFILE");
+    HiveTestUtilities.loadData(d, "varchar_array", Paths.get("complex_types/array/varchar_array.json"));
+
+    // char_array
+    HiveTestUtilities.executeQuery(d,
+        "CREATE TABLE char_array(rid INT, arr_n_0 ARRAY<CHAR(2)>,arr_n_1 ARRAY<ARRAY<CHAR(2)>>, arr_n_2 ARRAY<ARRAY<ARRAY<CHAR(2)>>>) " +
+            "ROW FORMAT SERDE 'org.apache.hive.hcatalog.data.JsonSerDe' STORED AS TEXTFILE");
+    HiveTestUtilities.loadData(d, "char_array", Paths.get("complex_types/array/char_array.json"));
+
+    // tinyint_array
+    HiveTestUtilities.executeQuery(d,
+        "CREATE TABLE tinyint_array(rid INT, arr_n_0 ARRAY<TINYINT>, arr_n_1 ARRAY<ARRAY<TINYINT>>, arr_n_2 ARRAY<ARRAY<ARRAY<TINYINT>>> ) " +
+            "ROW FORMAT SERDE 'org.apache.hive.hcatalog.data.JsonSerDe' STORED AS TEXTFILE");
+    HiveTestUtilities.loadData(d, "tinyint_array", Paths.get("complex_types/array/tinyint_array.json"));
+
+    // smallint_array
+    HiveTestUtilities.executeQuery(d,
+        "CREATE TABLE smallint_array(rid INT, arr_n_0 ARRAY<SMALLINT>, arr_n_1 ARRAY<ARRAY<SMALLINT>>, arr_n_2 ARRAY<ARRAY<ARRAY<SMALLINT>>>) " +
+            "ROW FORMAT SERDE 'org.apache.hive.hcatalog.data.JsonSerDe' STORED AS TEXTFILE");
+    HiveTestUtilities.loadData(d, "smallint_array", Paths.get("complex_types/array/smallint_array.json"));
+
+    // decimal_array
+    HiveTestUtilities.executeQuery(d,
+        "CREATE TABLE decimal_array(rid INT, arr_n_0 ARRAY<DECIMAL(9,3)>, arr_n_1 ARRAY<ARRAY<DECIMAL(9,3)>>,arr_n_2 ARRAY<ARRAY<ARRAY<DECIMAL(9,3)>>>) " +
+            "ROW FORMAT SERDE 'org.apache.hive.hcatalog.data.JsonSerDe' STORED AS TEXTFILE");
+    HiveTestUtilities.loadData(d, "decimal_array", Paths.get("complex_types/array/decimal_array.json"));
+
+    // boolean_array
+    HiveTestUtilities.executeQuery(d,
+        "CREATE TABLE boolean_array(rid INT, arr_n_0 ARRAY<BOOLEAN>, arr_n_1 ARRAY<ARRAY<BOOLEAN>>,arr_n_2 ARRAY<ARRAY<ARRAY<BOOLEAN>>>) " +
+            "ROW FORMAT SERDE 'org.apache.hive.hcatalog.data.JsonSerDe' STORED AS TEXTFILE");
+    HiveTestUtilities.loadData(d, "boolean_array", Paths.get("complex_types/array/boolean_array.json"));
+
+    // bigint_array
+    HiveTestUtilities.executeQuery(d,
+        "CREATE TABLE bigint_array(rid INT, arr_n_0 ARRAY<BIGINT>, arr_n_1 ARRAY<ARRAY<BIGINT>>,arr_n_2 ARRAY<ARRAY<ARRAY<BIGINT>>>) " +
+            "ROW FORMAT SERDE 'org.apache.hive.hcatalog.data.JsonSerDe' STORED AS TEXTFILE");
+    HiveTestUtilities.loadData(d, "bigint_array", Paths.get("complex_types/array/bigint_array.json"));
+
+    // float_array
+    HiveTestUtilities.executeQuery(d,
+        "CREATE TABLE float_array(rid INT, arr_n_0 ARRAY<FLOAT>, arr_n_1 ARRAY<ARRAY<FLOAT>>,arr_n_2 ARRAY<ARRAY<ARRAY<FLOAT>>>) " +
+            "ROW FORMAT SERDE 'org.apache.hive.hcatalog.data.JsonSerDe' STORED AS TEXTFILE");
+    HiveTestUtilities.loadData(d, "float_array", Paths.get("complex_types/array/float_array.json"));
+
+    // double_array
+    HiveTestUtilities.executeQuery(d,
+        "CREATE TABLE double_array(rid INT, arr_n_0 ARRAY<DOUBLE>, arr_n_1 ARRAY<ARRAY<DOUBLE>>, arr_n_2 ARRAY<ARRAY<ARRAY<DOUBLE>>>) " +
+            "ROW FORMAT SERDE 'org.apache.hive.hcatalog.data.JsonSerDe' STORED AS TEXTFILE");
+    HiveTestUtilities.loadData(d, "double_array", Paths.get("complex_types/array/double_array.json"));
+
+    // date_array
+    HiveTestUtilities.executeQuery(d,
+        "CREATE TABLE date_array(rid INT, arr_n_0 ARRAY<DATE>, arr_n_1 ARRAY<ARRAY<DATE>>,arr_n_2 ARRAY<ARRAY<ARRAY<DATE>>>) " +
+            "ROW FORMAT SERDE 'org.apache.hive.hcatalog.data.JsonSerDe' STORED AS TEXTFILE");
+    HiveTestUtilities.loadData(d, "date_array", Paths.get("complex_types/array/date_array.json"));
+
+    // timestamp_array
+    HiveTestUtilities.executeQuery(d,
+        "CREATE TABLE timestamp_array(rid INT, arr_n_0 ARRAY<TIMESTAMP>, arr_n_1 ARRAY<ARRAY<TIMESTAMP>>,arr_n_2 ARRAY<ARRAY<ARRAY<TIMESTAMP>>>) " +
+            "ROW FORMAT SERDE 'org.apache.hive.hcatalog.data.JsonSerDe' STORED AS TEXTFILE");
+    HiveTestUtilities.loadData(d, "timestamp_array", Paths.get("complex_types/array/timestamp_array.json"));
+
+    // binary_array
+    HiveTestUtilities.executeQuery(d,
+        "CREATE TABLE binary_array(arr_n_0 ARRAY<BINARY>) STORED AS TEXTFILE");
+    HiveTestUtilities.executeQuery(d, "insert into binary_array select array(binary('First'),binary('Second'),binary('Third'))");
+    HiveTestUtilities.executeQuery(d, "insert into binary_array select array(binary('First'))");
+
+    // arr_hive_view
+    HiveTestUtilities.executeQuery(d, "CREATE VIEW arr_view AS " +
+        "SELECT " +
+        "   int_array.rid as vwrid," +
+        "   int_array.arr_n_0 as int_n0," +
+        "   int_array.arr_n_1 as int_n1," +
+        "   string_array.arr_n_0 as string_n0," +
+        "   string_array.arr_n_1 as string_n1," +
+        "   varchar_array.arr_n_0 as varchar_n0," +
+        "   varchar_array.arr_n_1 as varchar_n1," +
+        "   char_array.arr_n_0 as char_n0," +
+        "   char_array.arr_n_1 as char_n1," +
+        "   tinyint_array.arr_n_0 as tinyint_n0," +
+        "   tinyint_array.arr_n_1 as tinyint_n1," +
+        "   smallint_array.arr_n_0 as smallint_n0," +
+        "   smallint_array.arr_n_1 as smallint_n1," +
+        "   decimal_array.arr_n_0 as decimal_n0," +
+        "   decimal_array.arr_n_1 as decimal_n1," +
+        "   boolean_array.arr_n_0 as boolean_n0," +
+        "   boolean_array.arr_n_1 as boolean_n1," +
+        "   bigint_array.arr_n_0 as bigint_n0," +
+        "   bigint_array.arr_n_1 as bigint_n1," +
+        "   float_array.arr_n_0 as float_n0," +
+        "   float_array.arr_n_1 as float_n1," +
+        "   double_array.arr_n_0 as double_n0," +
+        "   double_array.arr_n_1 as double_n1," +
+        "   date_array.arr_n_0 as date_n0," +
+        "   date_array.arr_n_1 as date_n1," +
+        "   timestamp_array.arr_n_0 as timestamp_n0," +
+        "   timestamp_array.arr_n_1 as timestamp_n1 " +
+        "FROM " +
+        "   int_array," +
+        "   string_array," +
+        "   varchar_array," +
+        "   char_array," +
+        "   tinyint_array," +
+        "   smallint_array," +
+        "   decimal_array," +
+        "   boolean_array," +
+        "   bigint_array," +
+        "   float_array," +
+        "   double_array," +
+        "   date_array," +
+        "   timestamp_array " +
+        "WHERE " +
+        "   int_array.rid=string_array.rid AND" +
+        "   int_array.rid=varchar_array.rid AND" +
+        "   int_array.rid=char_array.rid AND" +
+        "   int_array.rid=tinyint_array.rid AND" +
+        "   int_array.rid=smallint_array.rid AND" +
+        "   int_array.rid=decimal_array.rid AND" +
+        "   int_array.rid=boolean_array.rid AND" +
+        "   int_array.rid=bigint_array.rid AND" +
+        "   int_array.rid=float_array.rid AND" +
+        "   int_array.rid=double_array.rid AND" +
+        "   int_array.rid=date_array.rid AND" +
+        "   int_array.rid=timestamp_array.rid "
+    );
+  }
+
+  @Test
+  public void intArray() throws Exception {
+
+    // Nesting 0: reading ARRAY<INT>
+    testBuilder()
+        .sqlQuery("SELECT arr_n_0 FROM hive.`int_array`")
+        .unOrdered()
+        .baselineColumns("arr_n_0")
+        .baselineValuesForSingleColumn(asList(-1, 0, 1))
+        .baselineValuesForSingleColumn(emptyList())
+        .baselineValuesForSingleColumn(Collections.singletonList(100500))
+        .go();
+
+    // Nesting 1: reading ARRAY<ARRAY<INT>>
+    testBuilder()
+        .sqlQuery("SELECT arr_n_1 FROM hive.`int_array`")
+        .unOrdered()
+        .baselineColumns("arr_n_1")
+        .baselineValuesForSingleColumn(asList(asList(-1, 0, 1), asList(-2, 1)))
+        .baselineValuesForSingleColumn(asList(emptyList(), emptyList()))
+        .baselineValuesForSingleColumn(asList(asList(100500, 500100)))
+        .go();
+
+    // Nesting 2: reading ARRAY<ARRAY<ARRAY<INT>>>
+    testBuilder()
+        .sqlQuery("SELECT arr_n_2 FROM hive.`int_array` order by rid")
+        .ordered()
+        .baselineColumns("arr_n_2")
+        .baselineValuesForSingleColumn(
+            asList( // row
+                asList( // [0]
+                    asList(7, 81),//[0][0]
+                    asList(-92, 54, -83),//[0][1]
+                    asList(-10, -59)//[0][2]
+                ),
+                asList( // [1]
+                    asList(-43, -80)//[1][0]
+                ),
+                asList( // [2]
+                    asList(-70, -62)//[2][0]
+                )
+            )
+        )
+        .baselineValuesForSingleColumn(
+            asList( // row
+                asList( // [0]
+                    asList(34, -18)//[0][0]
+                ),
+                asList( // [1]
+                    asList(-87, 87),//[1][0]
+                    asList(52, 58),//[1][1]
+                    asList(58, 20, -81),//[1][2]
+                    asList(-94, -93)//[1][3]
+                )
+            )
+        )
+        .baselineValuesForSingleColumn(
+            asList( // row
+                asList( // [0]
+                    asList(-56, 9),//[0][0]
+                    asList(39, 5)//[0][1]
+                ),
+                asList( // [1]
+                    asList(28, 88, -28)//[1][0]
+                )
+            )
+        ).go();
+  }
+
+  @Test
+  public void intArrayInJoin() throws Exception {
+    testBuilder()
+        .sqlQuery("SELECT a.rid as gid, a.arr_n_0 as an0, b.arr_n_0 as bn0 " +
+            "FROM hive.int_array a " +
+            "INNER JOIN hive.int_array b " +
+            "ON a.rid=b.rid WHERE a.rid=1")
+        .unOrdered()
+        .baselineColumns("gid", "an0", "bn0")
+        .baselineValues(1, asList(-1, 0, 1), asList(-1, 0, 1))
+        .go();
+    testBuilder()
+        .sqlQuery("SELECT * FROM (SELECT a.rid as gid, a.arr_n_0 as an0, b.arr_n_0 as bn0,c.arr_n_0 as cn0 " +
+            "FROM hive.int_array a,hive.int_array b, hive.int_array c " +
+            "WHERE a.rid=b.rid AND a.rid=c.rid) WHERE gid=1")
+        .unOrdered()
+        .baselineColumns("gid", "an0", "bn0", "cn0")
+        .baselineValues(1, asList(-1, 0, 1), asList(-1, 0, 1), asList(-1, 0, 1))
+        .go();
+  }
+
+  @Test
+  public void intArrayByIndex() throws Exception {
+    // arr_n_0 array<int>, arr_n_1 array<array<int>>
+    testBuilder()
+        .sqlQuery("SELECT arr_n_0[0], arr_n_0[1], arr_n_1[0], arr_n_1[1], arr_n_0[3], arr_n_1[3] FROM hive.`int_array`")
+        .unOrdered()
+        .baselineColumns("EXPR$0", "EXPR$1", "EXPR$2", "EXPR$3", "EXPR$4", "EXPR$5")
+        .baselineValues(-1, 0, asList(-1, 0, 1), asList(-2, 1), null, emptyList())
+        .baselineValues(null, null, emptyList(), emptyList(), null, emptyList())
+        .baselineValues(100500, null, asList(100500, 500100), emptyList(), null, emptyList())
+        .go();
+  }
+
+  @Test
+  public void intArrayFlatten() throws Exception {
+    // arr_n_0 array<int>, arr_n_1 array<array<int>>
+    testBuilder()
+        .sqlQuery("SELECT rid, FLATTEN(arr_n_0) FROM hive.`int_array`")
+        .unOrdered()
+        .baselineColumns("rid", "EXPR$1")
+        .baselineValues(1, -1)
+        .baselineValues(1, 0)
+        .baselineValues(1, 1)
+        .baselineValues(3, 100500)
+        .go();
+
+    testBuilder()
+        .sqlQuery("SELECT rid, FLATTEN(arr_n_1) FROM hive.`int_array`")
+        .unOrdered()
+        .baselineColumns("rid", "EXPR$1")
+        .baselineValues(1, asList(-1, 0, 1))
+        .baselineValues(1, asList(-2, 1))
+        .baselineValues(2, emptyList())
+        .baselineValues(2, emptyList())
+        .baselineValues(3, asList(100500, 500100))
+        .go();
+
+    testBuilder()
+        .sqlQuery("SELECT rid, FLATTEN(FLATTEN(arr_n_1)) FROM hive.`int_array`")
+        .unOrdered()
+        .baselineColumns("rid", "EXPR$1")
+        .baselineValues(1, -1)
+        .baselineValues(1, 0)
+        .baselineValues(1, 1)
+        .baselineValues(1, -2)
+        .baselineValues(1, 1)
+        .baselineValues(3, 100500)
+        .baselineValues(3, 500100)
+        .go();
+  }
+
+  @Test
+  public void intArrayRepeatedCount() throws Exception {
+    testBuilder()
+        .sqlQuery("SELECT rid, REPEATED_COUNT(arr_n_0), REPEATED_COUNT(arr_n_1) FROM hive.`int_array`")
+        .unOrdered()
+        .baselineColumns("rid", "EXPR$1", "EXPR$2")
+        .baselineValues(1, 3, 2)
+        .baselineValues(2, 0, 2)
+        .baselineValues(3, 1, 1)
+        .go();
+  }
+
+  @Test
+  public void intArrayRepeatedContains() throws Exception {
+    testBuilder()
+        .sqlQuery("SELECT rid FROM hive.`int_array` WHERE REPEATED_CONTAINS(arr_n_0, 100500)")
+        .unOrdered()
+        .baselineColumns("rid")
+        .baselineValues(3)
+        .go();
+  }
+
+  @Test
+  public void intArrayDescribe() throws Exception {
+    testBuilder()
+        .sqlQuery("DESCRIBE hive.`int_array` arr_n_0")
+        .unOrdered()
+        .baselineColumns("COLUMN_NAME", "DATA_TYPE", "IS_NULLABLE")
+        .baselineValues("arr_n_0", "ARRAY", "YES") //todo: fix to ARRAY<INTEGER>
+        .go();
+    testBuilder()
+        .sqlQuery("DESCRIBE hive.`int_array` arr_n_1")
+        .unOrdered()
+        .baselineColumns("COLUMN_NAME", "DATA_TYPE", "IS_NULLABLE")
+        .baselineValues("arr_n_1", "ARRAY", "YES") // todo: ARRAY<ARRAY<INTEGER>>
+        .go();
+  }
+
+  @Test
+  public void intArrayTypeOfKindFunctions() throws Exception {
+    testBuilder()
+        .sqlQuery("select " +
+            "sqlTypeOf(arr_n_0), sqlTypeOf(arr_n_1),  " +
+            "typeOf(arr_n_0), typeOf(arr_n_1), " +
+            "modeOf(arr_n_0), modeOf(arr_n_1), " +
+            "drillTypeOf(arr_n_0), drillTypeOf(arr_n_1) " +
+            "from hive.`int_array` limit 1")
+        .unOrdered()
+        .baselineColumns(
+            "EXPR$0", "EXPR$1",
+            "EXPR$2", "EXPR$3",
+            "EXPR$4", "EXPR$5",
+            "EXPR$6", "EXPR$7"
+        )
+        .baselineValues(
+            "ARRAY", "ARRAY", // why not ARRAY<INTEGER> | ARRAY<ARRAY<INTEGER>> ?
+            "INT", "LIST",    // todo: is it ok ?
+            "ARRAY", "ARRAY",
+            "INT", "LIST"    // todo: is it ok ?
+        )
+        .go();
+  }
+
+  @Test
+  public void stringArray() throws Exception {
+    // Nesting 0: reading ARRAY<STRING>
+    testBuilder()
+        .sqlQuery("SELECT arr_n_0 FROM hive.`string_array`")
+        .unOrdered()
+        .baselineColumns("arr_n_0")
+        .baselineValuesForSingleColumn(asList(new Text("First Value Of Array"), new Text("komlnp"), new Text("The Last Value")))
+        .baselineValuesForSingleColumn(emptyList())
+        .baselineValuesForSingleColumn(Collections.singletonList(new Text("ABCaBcA-1-2-3")))
+        .go();
+
+    // Nesting 1: reading ARRAY<ARRAY<STRING>>
+    testBuilder()
+        .sqlQuery("SELECT arr_n_1 FROM hive.`string_array`")
+        .unOrdered()
+        .baselineColumns("arr_n_1")
+        .baselineValuesForSingleColumn(asList(asList(new Text("Array 0, Value 0"), new Text("Array 0, Value 1")), asList(new Text("Array 1"))))
+        .baselineValuesForSingleColumn(asList(emptyList(), emptyList()))
+        .baselineValuesForSingleColumn(asList(asList(new Text("One"))))
+        .go();
+
+    // Nesting 2: reading ARRAY<ARRAY<ARRAY<STRING>>>
+    testBuilder()
+        .sqlQuery("SELECT arr_n_2 FROM hive.`string_array` order by rid")
+        .ordered()
+        .baselineColumns("arr_n_2")
+        .baselineValuesForSingleColumn(
+            asList( // row
+                asList( // [0]
+                    asList(new Text("dhMGOr1QVO"), new Text("NZpzBl"), new Text("LC8mjYyOJ7l8dHUpk"))//[0][0]
+                ),
+                asList( // [1]
+                    asList(new Text("JH")),//[1][0]
+                    asList(new Text("aVxgfxAu")),//[1][1]
+                    asList(new Text("fF amN8z8"))//[1][2]
+                ),
+                asList( // [2]
+                    asList(new Text("denwte5R39dSb2PeG"), new Text("Gbosj97RXTvBK1w"), new Text("S3whFvN")),//[2][0]
+                    asList(new Text("2sNbYGQhkt303Gnu"), new Text("rwG"), new Text("SQH766A8XwHg2pTA6a"))//[2][1]
+                ),
+                asList( // [3]
+                    asList(new Text("L"), new Text("khGFDtDluFNoo5hT")),//[3][0]
+                    asList(new Text("b8")),//[3][1]
+                    asList(new Text("Z"))//[3][2]
+                ),
+                asList( // [4]
+                    asList(new Text("DTEuW"), new Text("b0Wt84hIl"), new Text("A1H")),//[4][0]
+                    asList(new Text("h2zXh3Qc"), new Text("NOcgU8"), new Text("RGfVgv2rvDG")),//[4][1]
+                    asList(new Text("Hfn1ov9hB7fZN"), new Text("0ZgCD3"))//[4][2]
+                )
+            )
+        )
+        .baselineValuesForSingleColumn(
+            asList( // row
+                asList( // [0]
+                    asList(new Text("nk"), new Text("HA"), new Text("CgAZCxTbTrFWJL3yM")),//[0][0]
+                    asList(new Text("T7fGXYwtBb"), new Text("G6vc")),//[0][1]
+                    asList(new Text("GrwB5j3LBy9")),//[0][2]
+                    asList(new Text("g7UreegD1H97"), new Text("dniQ5Ehhps7c1pBuM"), new Text("S wSNMGj7c")),//[0][3]
+                    asList(new Text("iWTEJS0"), new Text("4F"))//[0][4]
+                ),
+                asList( // [1]
+                    asList(new Text("YpRcC01u6i6KO"), new Text("ujpMrvEfUWfKm"), new Text("2d")),//[1][0]
+                    asList(new Text("2"), new Text("HVDH"), new Text("5Qx Q6W112"))//[1][1]
+                )
+            )
+        )
+        .baselineValuesForSingleColumn(
+            asList( // row
+                asList( // [0]
+                    asList(new Text("S8d2vjNu680hSim6iJ")),//[0][0]
+                    asList(new Text("lRLaT9RvvgzhZ3C"), new Text("igSX1CP"), new Text("FFZMwMvAOod8")),//[0][1]
+                    asList(new Text("iBX"), new Text("sG")),//[0][2]
+                    asList(new Text("ChRjuDPz99WeU9"), new Text("2gBBmMUXV9E5E"), new Text(" VkEARI2upO"))//[0][3]
+                ),
+                asList( // [1]
+                    asList(new Text("UgMok3Q5wmd")),//[1][0]
+                    asList(new Text("8Zf9CLfUSWK"), new Text(""), new Text("NZ7v")),//[1][1]
+                    asList(new Text("vQE3I5t26"), new Text("251BeQJue"))//[1][2]
+                ),
+                asList( // [2]
+                    asList(new Text("Rpo8"))//[2][0]
+                ),
+                asList( // [3]
+                    asList(new Text("jj3njyupewOM Ej0pu"), new Text("aePLtGgtyu4aJ5"), new Text("cKHSvNbImH1MkQmw0Cs")),//[3][0]
+                    asList(new Text("VSO5JgI2x7TnK31L5"), new Text("hIub"), new Text("eoBSa0zUFlwroSucU")),//[3][1]
+                    asList(new Text("V8Gny91lT"), new Text("5hBncDZ"))//[3][2]
+                ),
+                asList( // [4]
+                    asList(new Text("Y3"), new Text("StcgywfU"), new Text("BFTDChc")),//[4][0]
+                    asList(new Text("5JNwXc2UHLld7"), new Text("v")),//[4][1]
+                    asList(new Text("9UwBhJMSDftPKuGC")),//[4][2]
+                    asList(new Text("E hQ9NJkc0GcMlB"), new Text("IVND1Xp1Nnw26DrL9"))//[4][3]
+                )
+            )
+        ).go();
+  }
+
+  @Test
+  public void stringArrayByIndex() throws Exception {
+    // arr_n_0 array<string>, arr_n_1 array<array<string>>
+    testBuilder()
+        .sqlQuery("SELECT arr_n_0[0], arr_n_0[1], arr_n_1[0], arr_n_1[1], arr_n_0[3], arr_n_1[3] FROM hive.`string_array`")
+        .unOrdered()
+        .baselineColumns(
+            "EXPR$0",
+            "EXPR$1",
+            "EXPR$2",
+            "EXPR$3",
+            "EXPR$4",
+            "EXPR$5")
+        .baselineValues(
+            "First Value Of Array",
+            "komlnp",
+            asList(new Text("Array 0, Value 0"), new Text("Array 0, Value 1")),
+            asList(new Text("Array 1")),
+            null,
+            emptyList()
+        )
+        .baselineValues(
+            null,
+            null,
+            emptyList(),
+            emptyList(),
+            null,
+            emptyList())
+        .baselineValues(
+            "ABCaBcA-1-2-3",
+            null,
+            asList(new Text("One")),
+            emptyList(),
+            null,
+            emptyList())
+        .go();
+  }
+
+  @Test
+  public void varcharArray() throws Exception {
+    // Nesting 0: reading ARRAY<VARCHAR(5)>
+    testBuilder()
+        .sqlQuery("SELECT arr_n_0 FROM hive.`varchar_array`")
+        .unOrdered()
+        .baselineColumns("arr_n_0")
+        .baselineValuesForSingleColumn(asList(new Text("Five"), new Text("One"), new Text("T")))
+        .baselineValuesForSingleColumn(emptyList())
+        .baselineValuesForSingleColumn(asList(new Text("ZZ0"), new Text("-c54g"), new Text("ooo"), new Text("k22k")))
+        .go();
+
+    // Nesting 1: reading ARRAY<ARRAY<VARCHAR(5)>>
+    testBuilder()
+        .sqlQuery("SELECT arr_n_1 FROM hive.`varchar_array`")
+        .unOrdered()
+        .baselineColumns("arr_n_1")
+        .baselineValuesForSingleColumn(asList(
+            asList(new Text("Five"), new Text("One"), new Text("$42")),
+            asList(new Text("T"), new Text("K"), new Text("O"))
+        ))
+        .baselineValuesForSingleColumn(asList(emptyList(), emptyList()))
+        .baselineValuesForSingleColumn(asList(asList(new Text("-c54g"))))
+        .go();
+
+    // Nesting 2: reading ARRAY<ARRAY<ARRAY<VARCHAR(5)>>>
+    testBuilder()
+        .sqlQuery("SELECT arr_n_2 FROM hive.`varchar_array` order by rid")
+        .ordered()
+        .baselineColumns("arr_n_2")
+        .baselineValuesForSingleColumn(
+            asList( // row
+                asList( // [0]
+                    asList(new Text("")),//[0][0]
+                    asList(new Text("Gt"), new Text(""), new Text("")),//[0][1]
+                    asList(new Text("9R3y")),//[0][2]
+                    asList(new Text("X3a4"))//[0][3]
+                ),
+                asList( // [1]
+                    asList(new Text("o"), new Text("6T"), new Text("QKAZ")),//[1][0]
+                    asList(new Text(""), new Text("xf8r"), new Text("As")),//[1][1]
+                    asList(new Text("5kS3"))//[1][2]
+                ),
+                asList( // [2]
+                    asList(new Text(""), new Text("S7Gx")),//[2][0]
+                    asList(new Text("ml"), new Text("27pL"), new Text("VPxr")),//[2][1]
+                    asList(new Text("")),//[2][2]
+                    asList(new Text("e"), new Text("Dj"))//[2][3]
+                ),
+                asList( // [3]
+                    asList(new Text(""), new Text("XYO"), new Text("fEWz")),//[3][0]
+                    asList(new Text(""), new Text("oU")),//[3][1]
+                    asList(new Text("o 8"), new Text(""), new Text("")),//[3][2]
+                    asList(new Text("giML"), new Text("H7g")),//[3][3]
+                    asList(new Text("SWX9"), new Text("H"), new Text("emwt"))//[3][4]
+                ),
+                asList( // [4]
+                    asList(new Text("Sp"))//[4][0]
+                )
+            )
+        )
+        .baselineValuesForSingleColumn(
+            asList( // row
+                asList( // [0]
+                    asList(new Text("GCx")),//[0][0]
+                    asList(new Text(""), new Text("V")),//[0][1]
+                    asList(new Text("pF"), new Text("R7"), new Text("")),//[0][2]
+                    asList(new Text(""), new Text("AKal"))//[0][3]
+                )
+            )
+        )
+        .baselineValuesForSingleColumn(
+            asList( // row
+                asList( // [0]
+                    asList(new Text("m"), new Text("MBAv"), new Text("7R9F")),//[0][0]
+                    asList(new Text("ovv")),//[0][1]
+                    asList(new Text("p 7l"))//[0][2]
+                )
+            )
+        )
+        .go();
+
+  }
+
+  @Test
+  public void varcharArrayByIndex() throws Exception {
+    // arr_n_0 array<varchar>, arr_n_1 array<array<varchar>>
+    testBuilder()
+        .sqlQuery("SELECT arr_n_0[0], arr_n_0[1], arr_n_1[0], arr_n_1[1], arr_n_0[3], arr_n_1[3] FROM hive.`varchar_array`")
+        .unOrdered()
+        .baselineColumns("EXPR$0", "EXPR$1", "EXPR$2", "EXPR$3", "EXPR$4", "EXPR$5")
+        .baselineValues(
+            "Five",
+            "One",
+            asList(new Text("Five"), new Text("One"), new Text("$42")),
+            asList(new Text("T"), new Text("K"), new Text("O")),
+            null,
+            emptyList())
+        .baselineValues(
+            null,
+            null,
+            emptyList(),
+            emptyList(),
+            null,
+            emptyList())
+        .baselineValues(
+            "ZZ0",
+            "-c54g",
+            asList(new Text("-c54g")),
+            emptyList(),
+            "k22k",
+            emptyList())
+        .go();
+  }
+
+  @Test
+  public void charArray() throws Exception {
+    // Nesting 0: reading ARRAY<CHAR(2)>
+    testBuilder()
+        .sqlQuery("SELECT arr_n_0 FROM hive.`char_array`")
+        .unOrdered()
+        .baselineColumns("arr_n_0")
+        .baselineValuesForSingleColumn(asList(new Text("aa"), new Text("cc"), new Text("ot")))
+        .baselineValuesForSingleColumn(emptyList())
+        .baselineValuesForSingleColumn(asList(new Text("+a"), new Text("-c"), new Text("*t")))
+        .go();
+
+    // Nesting 1: reading ARRAY<ARRAY<CHAR(2)>>
+    testBuilder()
+        .sqlQuery("SELECT arr_n_1 FROM hive.`char_array`")
+        .unOrdered()
+        .baselineColumns("arr_n_1")
+        .baselineValuesForSingleColumn(asList(
+            asList(new Text("aa")),
+            asList(new Text("cc"), new Text("ot"))))
+        .baselineValuesForSingleColumn(asList(emptyList(), emptyList()))
+        .baselineValuesForSingleColumn(asList(asList(new Text("*t"))))
+        .go();
+
+    // Nesting 2: reading ARRAY<ARRAY<ARRAY<CHAR(2)>>>
+    testBuilder()
+        .sqlQuery("SELECT arr_n_2 FROM hive.`char_array` order by rid")
+        .ordered()
+        .baselineColumns("arr_n_2")
+        .baselineValuesForSingleColumn(
+            asList( // row
+                asList( // [0]
+                    asList(new Text("eT"))//[0][0]
+                ),
+                asList( // [1]
+                    asList(new Text("w9"), new Text("fC"), new Text("ww")),//[1][0]
+                    asList(new Text("3o"), new Text("f7"), new Text("Za")),//[1][1]
+                    asList(new Text("lX"), new Text("iv"), new Text("jI"))//[1][2]
+                ),
+                asList( // [2]
+                    asList(new Text("S3"), new Text("Qa"), new Text("aG")),//[2][0]
+                    asList(new Text("bj"), new Text("gc"), new Text("NO"))//[2][1]
+                )
+            )
+        )
+        .baselineValuesForSingleColumn(
+            asList( // row
+                asList( // [0]
+                    asList(new Text("PV"), new Text("tH"), new Text("B7")),//[0][0]
+                    asList(new Text("uL")),//[0][1]
+                    asList(new Text("7b"), new Text("uf")),//[0][2]
+                    asList(new Text("zj")),//[0][3]
+                    asList(new Text("sA"), new Text("hf"), new Text("hR"))//[0][4]
+                )
+            )
+        )
+        .baselineValuesForSingleColumn(
+            asList( // row
+                asList( // [0]
+                    asList(new Text("W1"), new Text("FS")),//[0][0]
+                    asList(new Text("le"), new Text("c0")),//[0][1]
+                    asList(new Text(""), new Text("0v"))//[0][2]
+                ),
+                asList( // [1]
+                    asList(new Text("gj"))//[1][0]
+                )
+            )
+        )
+        .go();
+  }
+
+  @Test
+  public void charArrayByIndex() throws Exception {
+    // arr_n_0 array<char>, arr_n_1 array<array<char>>
+    testBuilder()
+        .sqlQuery("SELECT arr_n_0[0], arr_n_0[1], arr_n_1[0], arr_n_1[1], arr_n_0[3], arr_n_1[3] FROM hive.`char_array`")
+        .unOrdered()
+        .baselineColumns("EXPR$0", "EXPR$1", "EXPR$2", "EXPR$3", "EXPR$4", "EXPR$5")
+        .baselineValues(
+            "aa",
+            "cc",
+            asList(new Text("aa")),
+            asList(new Text("cc"), new Text("ot")),
+            null,
+            emptyList())
+        .baselineValues(
+            null,
+            null,
+            emptyList(),
+            emptyList(),
+            null,
+            emptyList())
+        .baselineValues(
+            "+a",
+            "-c",
+            asList(new Text("*t")),
+            emptyList(),
+            null,
+            emptyList())
+        .go();
+  }
+
+  @Test
+  public void tinyintArray() throws Exception {
+    // Nesting 0: reading ARRAY<TINYINT>
+    testBuilder()
+        .sqlQuery("SELECT arr_n_0 FROM hive.`tinyint_array`")
+        .unOrdered()
+        .baselineColumns("arr_n_0")
+        .baselineValuesForSingleColumn(asList(-128, 0, 127))
+        .baselineValuesForSingleColumn(emptyList())
+        .baselineValuesForSingleColumn(asList(-101))
+        .go();
+
+    // Nesting 1: reading ARRAY<ARRAY<TINYINT>>
+    testBuilder()
+        .sqlQuery("SELECT arr_n_1 FROM hive.`tinyint_array`")
+        .unOrdered()
+        .baselineColumns("arr_n_1")
+        .baselineValuesForSingleColumn(asList(asList(-128, -127), asList(0, 1), asList(127, 126)))
+        .baselineValuesForSingleColumn(asList(emptyList(), emptyList()))
+        .baselineValuesForSingleColumn(asList(asList(-102)))
+        .go();
+
+    // Nesting 2: reading ARRAY<ARRAY<ARRAY<TINYINT>>>
+    testBuilder()
+        .sqlQuery("SELECT arr_n_2 FROM hive.`tinyint_array` order by rid")
+        .ordered()
+        .baselineColumns("arr_n_2")
+        .baselineValuesForSingleColumn(
+            asList( // row
+                asList( // [0]
+                    asList(31, 65, 54),//[0][0]
+                    asList(66),//[0][1]
+                    asList(22),//[0][2]
+                    asList(-33, -125, 116)//[0][3]
+                ),
+                asList( // [1]
+                    asList(-5, -10)//[1][0]
+                ),
+                asList( // [2]
+                    asList(78),//[2][0]
+                    asList(86),//[2][1]
+                    asList(90, 34),//[2][2]
+                    asList(32)//[2][3]
+                ),
+                asList( // [3]
+                    asList(103, -49, -33),//[3][0]
+                    asList(-30),//[3][1]
+                    asList(107, 24, 74),//[3][2]
+                    asList(16, -58)//[3][3]
+                ),
+                asList( // [4]
+                    asList(-119, -8),//[4][0]
+                    asList(50, -99, 26),//[4][1]
+                    asList(-119)//[4][2]
+                )
+            )
+        )
+        .baselineValuesForSingleColumn(
+            asList( // row
+                asList( // [0]
+                    asList(-90, -113),//[0][0]
+                    asList(71, -65)//[0][1]
+                ),
+                asList( // [1]
+                    asList(88, -83)//[1][0]
+                ),
+                asList( // [2]
+                    asList(11),//[2][0]
+                    asList(121, -57)//[2][1]
+                ),
+                asList( // [3]
+                    asList(-79),//[3][0]
+                    asList(16, -111, -111),//[3][1]
+                    asList(90, 106),//[3][2]
+                    asList(33, 29, 42),//[3][3]
+                    asList(74)//[3][4]
+                )
+            )
+        )
+        .baselineValuesForSingleColumn(
+            asList( // row
+                asList( // [0]
+                    asList(74, -115),//[0][0]
+                    asList(19, 85, 3)//[0][1]
+                )
+            )
+        )
+        .go();
+  }
+
+  @Test
+  public void tinyintArrayByIndex() throws Exception {
+    // arr_n_0 array<tinyint>, arr_n_1 array<array<tinyint>>
+    testBuilder()
+        .sqlQuery("SELECT arr_n_0[0], arr_n_0[1], arr_n_1[0], arr_n_1[1], arr_n_0[3], arr_n_1[3] FROM hive.`tinyint_array`")
+        .unOrdered()
+        .baselineColumns("EXPR$0", "EXPR$1", "EXPR$2", "EXPR$3", "EXPR$4", "EXPR$5")
+        .baselineValues(-128, 0, asList(-128, -127), asList(0, 1), null, emptyList())
+        .baselineValues(null, null, emptyList(), emptyList(), null, emptyList())
+        .baselineValues(-101, null, asList(-102), emptyList(), null, emptyList())
+        .go();
+  }
+
+  @Test
+  public void smallintArray() throws Exception {
+    // Nesting 0: reading ARRAY<SMALLINT>
+    testBuilder()
+        .sqlQuery("SELECT arr_n_0 FROM hive.`smallint_array`")
+        .unOrdered()
+        .baselineColumns("arr_n_0")
+        .baselineValuesForSingleColumn(asList(-32768, 0, 32767))
+        .baselineValuesForSingleColumn(emptyList())
+        .baselineValuesForSingleColumn(asList(10500))
+        .go();
+
+    // Nesting 1: reading ARRAY<ARRAY<SMALLINT>>
+    testBuilder()
+        .sqlQuery("SELECT arr_n_1 FROM hive.`smallint_array`")
+        .unOrdered()
+        .baselineColumns("arr_n_1")
+        .baselineValuesForSingleColumn(asList(asList(-32768, -32768), asList(0, 0), asList(32767, 32767)))
+        .baselineValuesForSingleColumn(asList(emptyList(), emptyList()))
+        .baselineValuesForSingleColumn(asList(asList(10500, 5010)))
+        .go();
+
+    // Nesting 2: reading ARRAY<ARRAY<ARRAY<SMALLINT>>>
+    testBuilder()
+        .sqlQuery("SELECT arr_n_2 FROM hive.`smallint_array` order by rid")
+        .ordered()
+        .baselineColumns("arr_n_2")
+        .baselineValuesForSingleColumn(
+            asList( // row
+                asList( // [0]
+                    asList(-28752)//[0][0]
+                ),
+                asList( // [1]
+                    asList(17243, 15652),//[1][0]
+                    asList(-9684),//[1][1]
+                    asList(10176, 18123),//[1][2]
+                    asList(-15404, 15420),//[1][3]
+                    asList(11136, -19435)//[1][4]
+                ),
+                asList( // [2]
+                    asList(-29634, -12695),//[2][0]
+                    asList(4350, -24289, -10889)//[2][1]
+                ),
+                asList( // [3]
+                    asList(13731),//[3][0]
+                    asList(27661, -15794, 21784),//[3][1]
+                    asList(14341, -4635),//[3][2]
+                    asList(1601, -29973),//[3][3]
+                    asList(2750, 30373, -11630)//[3][4]
+                ),
+                asList( // [4]
+                    asList(-11383)//[4][0]
+                )
+            )
+        )
+        .baselineValuesForSingleColumn(
+            asList( // row
+                asList( // [0]
+                    asList(23860),//[0][0]
+                    asList(-27345, 19068),//[0][1]
+                    asList(-7174, 286, 14673)//[0][2]
+                ),
+                asList( // [1]
+                    asList(14844, -9087),//[1][0]
+                    asList(-25185, 219),//[1][1]
+                    asList(26875),//[1][2]
+                    asList(-4699),//[1][3]
+                    asList(-3853, -15729, 11472)//[1][4]
+                ),
+                asList( // [2]
+                    asList(-29142),//[2][0]
+                    asList(-13859),//[2][1]
+                    asList(-23073, 31368, -26542)//[2][2]
+                ),
+                asList( // [3]
+                    asList(14914, 14656),//[3][0]
+                    asList(4636, 6289)//[3][1]
+                )
+            )
+        )
+        .baselineValuesForSingleColumn(
+            asList( // row
+                asList( // [0]
+                    asList(10426, 31865),//[0][0]
+                    asList(-19088),//[0][1]
+                    asList(-4774),//[0][2]
+                    asList(17988)//[0][3]
+                ),
+                asList( // [1]
+                    asList(-6214, -26836, 30715)//[1][0]
+                ),
+                asList( // [2]
+                    asList(-4231),//[2][0]
+                    asList(31742, -661),//[2][1]
+                    asList(-22842, 4203),//[2][2]
+                    asList(18278)//[2][3]
+                )
+            )
+        )
+        .go();
+  }
+
+  @Test
+  public void decimalArray() throws Exception {
+    // Nesting 0: reading ARRAY<DECIMAL(9,3)>
+    testBuilder()
+        .sqlQuery("SELECT arr_n_0 FROM hive.`decimal_array`")
+        .unOrdered()
+        .baselineColumns("arr_n_0")
+        .baselineValuesForSingleColumn(asList(new BigDecimal("-100000.000"), new BigDecimal("102030.001"), new BigDecimal("0.001")))
+        .baselineValuesForSingleColumn(emptyList())
+        .baselineValuesForSingleColumn(Collections.singletonList(new BigDecimal("-10.500")))
+        .go();
+
+    // Nesting 1: reading ARRAY<ARRAY<DECIMAL(9,3)>>
+    testBuilder()
+        .sqlQuery("SELECT arr_n_1 FROM hive.`decimal_array`")
+        .unOrdered()
+        .baselineColumns("arr_n_1")
+        .baselineValuesForSingleColumn(asList(
+            asList(new BigDecimal("-100000.000"), new BigDecimal("102030.001")),
+            asList(new BigDecimal("0.101"), new BigDecimal("0.102")),
+            asList(new BigDecimal("0.001"), new BigDecimal("327670.001"))))
+        .baselineValuesForSingleColumn(asList(emptyList(), emptyList()))
+        .baselineValuesForSingleColumn(asList(asList(new BigDecimal("10.500"), new BigDecimal("5.010"))))
+        .go();
+
+    // Nesting 2: reading ARRAY<ARRAY<ARRAY<DECIMAL(9,3)>>>
+    testBuilder()
+        .sqlQuery("SELECT arr_n_2 FROM hive.`decimal_array` order by rid")
+        .ordered()
+        .baselineColumns("arr_n_2")
+        .baselineValuesForSingleColumn(
+            asList( // row
+                asList( // [0]
+                    asList(new BigDecimal("9.453")),//[0][0]
+                    asList(new BigDecimal("8.233"), new BigDecimal("-146577.465")),//[0][1]
+                    asList(new BigDecimal("-911144.423"), new BigDecimal("-862766.866"), new BigDecimal("-129948.784"))//[0][2]
+                ),
+                asList( // [1]
+                    asList(new BigDecimal("931346.867"))//[1][0]
+                ),
+                asList( // [2]
+                    asList(new BigDecimal("81.750")),//[2][0]
+                    asList(new BigDecimal("587225.077"), new BigDecimal("-3.930")),//[2][1]
+                    asList(new BigDecimal("0.042")),//[2][2]
+                    asList(new BigDecimal("-342346.511"))//[2][3]
+                )
+            )
+        )
+        .baselineValuesForSingleColumn(
+            asList( // row
+                asList( // [0]
+                    asList(new BigDecimal("375098.406"), new BigDecimal("84.509")),//[0][0]
+                    asList(new BigDecimal("-446325.287"), new BigDecimal("3.671")),//[0][1]
+                    asList(new BigDecimal("286958.380"), new BigDecimal("314821.890"), new BigDecimal("18513.303")),//[0][2]
+                    asList(new BigDecimal("-444023.971"), new BigDecimal("827746.528"), new BigDecimal("-54.986")),//[0][3]
+                    asList(new BigDecimal("-44520.406"))//[0][4]
+                )
+            )
+        )
+        .baselineValuesForSingleColumn(
+            asList( // row
+                asList( // [0]
+                    asList(new BigDecimal("906668.849"), new BigDecimal("1.406")),//[0][0]
+                    asList(new BigDecimal("-494177.333"), new BigDecimal("952997.058"))//[0][1]
+                ),
+                asList( // [1]
+                    asList(new BigDecimal("642385.159"), new BigDecimal("369753.830"), new BigDecimal("634889.981")),//[1][0]
+                    asList(new BigDecimal("83970.515"), new BigDecimal("-847315.758"), new BigDecimal("-0.600")),//[1][1]
+                    asList(new BigDecimal("73013.870")),//[1][2]
+                    asList(new BigDecimal("337872.675"), new BigDecimal("375940.114"), new BigDecimal("-2.670")),//[1][3]
+                    asList(new BigDecimal("-7.899"), new BigDecimal("755611.538"))//[1][4]
+                )
+            )
+        )
+        .go();
+  }
+
+  @Test
+  public void booleanArray() throws Exception {
+    // Nesting 0: reading ARRAY<BOOLEAN>
+    testBuilder()
+        .sqlQuery("SELECT arr_n_0 FROM hive.`boolean_array`")
+        .unOrdered()
+        .baselineColumns("arr_n_0")
+        .baselineValuesForSingleColumn(asList(false, true, false, true, false))
+        .baselineValuesForSingleColumn(emptyList())
+        .baselineValuesForSingleColumn(Collections.singletonList(true))
+        .go();
+
+    // Nesting 1: reading ARRAY<ARRAY<BOOLEAN>>
+    testBuilder()
+        .sqlQuery("SELECT arr_n_1 FROM hive.`boolean_array`")
+        .unOrdered()
+        .baselineColumns("arr_n_1")
+        .baselineValuesForSingleColumn(asList(asList(true, false, true), asList(false, false)))
+        .baselineValuesForSingleColumn(asList(emptyList(), emptyList()))
+        .baselineValuesForSingleColumn(asList(asList(false, true)))
+        .go();
+
+    // Nesting 2: reading ARRAY<ARRAY<ARRAY<BOOLEAN>>>
+    testBuilder()
+        .sqlQuery("SELECT arr_n_2 FROM hive.`boolean_array` order by rid")
+        .ordered()
+        .baselineColumns("arr_n_2")
+        .baselineValuesForSingleColumn(
+            asList( // row
+                asList( // [0]
+                    asList(false, true)//[0][0]
+                ),
+                asList( // [1]
+                    asList(true),//[1][0]
+                    asList(false, true),//[1][1]
+                    asList(true),//[1][2]
+                    asList(true)//[1][3]
+                ),
+                asList( // [2]
+                    asList(false),//[2][0]
+                    asList(true, false, false),//[2][1]
+                    asList(true, true),//[2][2]
+                    asList(false, true, false)//[2][3]
+                ),
+                asList( // [3]
+                    asList(false, true),//[3][0]
+                    asList(true, false),//[3][1]
+                    asList(true, false, true)//[3][2]
+                ),
+                asList( // [4]
+                    asList(false),//[4][0]
+                    asList(false),//[4][1]
+                    asList(false)//[4][2]
+                )
+            )
+        )
+        .baselineValuesForSingleColumn(
+            asList( // row
+                asList( // [0]
+                    asList(false, true),//[0][0]
+                    asList(false),//[0][1]
+                    asList(false, false),//[0][2]
+                    asList(true, true, true),//[0][3]
+                    asList(false)//[0][4]
+                ),
+                asList( // [1]
+                    asList(false, false, true)//[1][0]
+                ),
+                asList( // [2]
+                    asList(false, true),//[2][0]
+                    asList(true, false)//[2][1]
+                )
+            )
+        )
+        .baselineValuesForSingleColumn(
+            asList( // row
+                asList( // [0]
+                    asList(true, true),//[0][0]
+                    asList(false, true, false),//[0][1]
+                    asList(true),//[0][2]
+                    asList(true, true, false)//[0][3]
+                ),
+                asList( // [1]
+                    asList(false),//[1][0]
+                    asList(false, true),//[1][1]
+                    asList(false),//[1][2]
+                    asList(false)//[1][3]
+                ),
+                asList( // [2]
+                    asList(true, true, true),//[2][0]
+                    asList(true, true, true),//[2][1]
+                    asList(false),//[2][2]
+                    asList(false)//[2][3]
+                ),
+                asList( // [3]
+                    asList(false, false)//[3][0]
+                )
+            )
+        )
+        .go();
+  }
+
+  @Test
+  public void bigintArray() throws Exception {
+    // Nesting 0: reading ARRAY<BIGINT>
+    testBuilder()
+        .sqlQuery("SELECT arr_n_0 FROM hive.`bigint_array`")
+        .unOrdered()
+        .baselineColumns("arr_n_0")
+        .baselineValuesForSingleColumn(asList(-9223372036854775808L, 0L, 10000000010L, 9223372036854775807L))
+        .baselineValuesForSingleColumn(emptyList())
+        .baselineValuesForSingleColumn(Collections.singletonList(10005000L))
+        .go();
+
+    // Nesting 1: reading ARRAY<ARRAY<BIGINT>>
+    testBuilder()
+        .sqlQuery("SELECT arr_n_1 FROM hive.`bigint_array`")
+        .unOrdered()
+        .baselineColumns("arr_n_1")
+        .baselineValuesForSingleColumn(asList(
+            asList(-9223372036854775808L, 0L, 10000000010L),
+            asList(9223372036854775807L, 9223372036854775807L)))
+        .baselineValuesForSingleColumn(asList(emptyList(), emptyList()))
+        .baselineValuesForSingleColumn(asList(asList(10005000L, 100050010L)))
+        .go();
+
+    // Nesting 2: reading ARRAY<ARRAY<ARRAY<BIGINT>>>
+    testBuilder()
+        .sqlQuery("SELECT arr_n_2 FROM hive.`bigint_array` order by rid")
+        .ordered()
+        .baselineColumns("arr_n_2")
+        .baselineValuesForSingleColumn(
+            asList( // row
+                asList( // [0]
+                    asList(7345032157033769004L),//[0][0]
+                    asList(-2306607274383855051L, 3656249581579032003L)//[0][1]
+                ),
+                asList( // [1]
+                    asList(6044100897358387146L, 4737705104728607904L)//[1][0]
+                )
+            )
+        )
+        .baselineValuesForSingleColumn(
+            asList( // row
+                asList( // [0]
+                    asList(4833583793282587107L, -8917877693351417844L, -3226305034926780974L)//[0][0]
+                )
+            )
+        )
+        .baselineValuesForSingleColumn(
+            asList( // row
+                asList( // [0]
+                    asList(8679405200896733338L, 8581721713860760451L, 1150622751848016114L),//[0][0]
+                    asList(-6672104994192826124L, 4807952216371616134L),//[0][1]
+                    asList(-7874492057876324257L)//[0][2]
+                ),
+                asList( // [1]
+                    asList(8197656735200560038L),//[1][0]
+                    asList(7643173300425098029L, -3186442699228156213L, -8370345321491335247L),//[1][1]
+                    asList(8781633305391982544L, -7187468334864189662L)//[1][2]
+                ),
+                asList( // [2]
+                    asList(6685428436181310098L),//[2][0]
+                    asList(1358587806266610826L),//[2][1]
+                    asList(-2077124879355227614L, -6787493227661516341L),//[2][2]
+                    asList(3713296190482954025L, -3890396613053404789L),//[2][3]
+                    asList(4636761050236625699L, 5268453104977816600L)//[2][4]
+                )
+            )
+        )
+        .go();
+  }
+
+  @Test
+  public void floatArray() throws Exception {
+    // Nesting 0: reading ARRAY<FLOAT>
+    testBuilder()
+        .sqlQuery("SELECT arr_n_0 FROM hive.`float_array`")
+        .unOrdered()
+        .baselineColumns("arr_n_0")
+        .baselineValuesForSingleColumn(asList(-32.058f, 94.47389f, 16.107912f))
+        .baselineValuesForSingleColumn(emptyList())
+        .baselineValuesForSingleColumn(Collections.singletonList(25.96484f))
+        .go();
+
+    // Nesting 1: reading ARRAY<ARRAY<FLOAT>>
+    testBuilder()
+        .sqlQuery("SELECT arr_n_1 FROM hive.`float_array`")
+        .unOrdered()
+        .baselineColumns("arr_n_1")
+        .baselineValuesForSingleColumn(asList(asList(-82.399826f, 12.633938f, 86.19402f), asList(-13.03544f, 64.65487f)))
+        .baselineValuesForSingleColumn(asList(emptyList(), emptyList()))
+        .baselineValuesForSingleColumn(asList(asList(15.259451f, -15.259451f)))
+        .go();
+
+    // Nesting 2: reading ARRAY<ARRAY<ARRAY<FLOAT>>>
+    testBuilder()
+        .sqlQuery("SELECT arr_n_2 FROM hive.`float_array` order by rid")
+        .ordered()
+        .baselineColumns("arr_n_2")
+        .baselineValuesForSingleColumn(
+            asList( // row
+                asList( // [0]
+                    asList(-5.6506114f),//[0][0]
+                    asList(26.546333f, 3724.8389f),//[0][1]
+                    asList(-53.65775f, 686.8335f, -0.99032f)//[0][2]
+                )
+            )
+        )
+        .baselineValuesForSingleColumn(
+            asList( // row
+                asList( // [0]
+                    asList(29.042528f),//[0][0]
+                    asList(3524.3398f, -8856.58f, 6.8508215f)//[0][1]
+                ),
+                asList( // [1]
+                    asList(-0.73994386f, -2.0008986f),//[1][0]
+                    asList(-9.903006f, -271.26172f),//[1][1]
+                    asList(-131.80347f),//[1][2]
+                    asList(39.721367f, -4870.5444f),//[1][3]
+                    asList(-1.4830998f, -766.3066f, -0.1659732f)//[1][4]
+                ),
+                asList( // [2]
+                    asList(3467.0298f, -240.64255f),//[2][0]
+                    asList(2.4072556f, -85.89145f)//[2][1]
+                )
+            )
+        )
+        .baselineValuesForSingleColumn(
+            asList( // row
+                asList( // [0]
+                    asList(-888.68243f, -38.09065f),//[0][0]
+                    asList(-6948.154f, -185.64319f, 0.7401936f),//[0][1]
+                    asList(-705.2718f, -932.4041f)//[0][2]
+                ),
+                asList( // [1]
+                    asList(-2.581712f, 0.28686252f, -0.98652786f),//[1][0]
+                    asList(-57.448563f, -0.0057083773f, -0.21712556f),//[1][1]
+                    asList(-8.076653f, -8149.519f, -7.5968184f),//[1][2]
+                    asList(8.823492f),//[1][3]
+                    asList(-9134.323f, 467.53275f, -59.763447f)//[1][4]
+                ),
+                asList( // [2]
+                    asList(0.33596575f, 6805.2256f, -3087.9531f),//[2][0]
+                    asList(9816.865f, -164.90712f, -1.9071647f)//[2][1]
+                ),
+                asList( // [3]
+                    asList(-0.23883149f),//[3][0]
+                    asList(-5.3763375f, -4.7661624f)//[3][1]
+                ),
+                asList( // [4]
+                    asList(-52.42167f, 247.91452f),//[4][0]
+                    asList(9499.771f),//[4][1]
+                    asList(-0.6549191f, 4340.83f)//[4][2]
+                )
+            )
+        )
+        .go();
+  }
+
+  @Test
+  public void doubleArray() throws Exception {
+    // Nesting 0: reading ARRAY<DOUBLE>
+    testBuilder()
+        .sqlQuery("SELECT arr_n_0 FROM hive.`double_array`")
+        .unOrdered()
+        .baselineColumns("arr_n_0")
+        .baselineValuesForSingleColumn(asList(-13.241563769628, 0.3436367772981237, 9.73366))
+        .baselineValuesForSingleColumn(emptyList())
+        .baselineValuesForSingleColumn(asList(15.581409176959358))
+        .go();
+
+    // Nesting 1: reading ARRAY<ARRAY<DOUBLE>>
+    testBuilder()
+        .sqlQuery("SELECT arr_n_1 FROM hive.`double_array`")
+        .unOrdered()
+        .baselineColumns("arr_n_1")
+        .baselineValuesForSingleColumn(asList(asList(-24.049666910012498, 14.975034200, 1.19975056092457), asList(-2.293376758961259, 80.783)))
+        .baselineValuesForSingleColumn(asList(emptyList(), emptyList()))
+        .baselineValuesForSingleColumn(asList(asList(0.47745359256854, -0.47745359256854)))
+        .go();
+
+    // Nesting 2: reading ARRAY<ARRAY<ARRAY<DOUBLE>>>
+    testBuilder()
+        .sqlQuery("SELECT arr_n_2 FROM hive.`double_array` order by rid")
+        .ordered()
+        .baselineColumns("arr_n_2")
+        .baselineValuesForSingleColumn(
+            asList( // row
+                asList( // [0]
+                    asList(-9.269519394436928),//[0][0]
+                    asList(0.7319990286742192, 55.53357952933713, -4.450389221972496)//[0][1]
+                ),
+                asList( // [1]
+                    asList(0.8453724066773386)//[1][0]
+                )
+            )
+        )
+        .baselineValuesForSingleColumn(
+            asList( // row
+                asList( // [0]
+                    asList(-7966.1700155142025, 2519.664646202656),//[0][0]
+                    asList(-0.4584683555041169),//[0][1]
+                    asList(-860.4673046946417, 6.371900064750405, 0.4722917366204724)//[0][2]
+                ),
+                asList( // [1]
+                    asList(-62.76596817199298),//[1][0]
+                    asList(712.7880069076203, -5.14172156610055),//[1][1]
+                    asList(3891.128276893486, -0.5008908018575201)//[1][2]
+                ),
+                asList( // [2]
+                    asList(246.42074787345825, -0.7252828610111548),//[2][0]
+                    asList(-845.6633966327038, -436.5267842528363)//[2][1]
+                ),
+                asList( // [3]
+                    asList(5.177407969462521),//[3][0]
+                    asList(0.10545048230228471, 0.7364424942282094),//[3][1]
+                    asList(-373.3798205258425, -79.65616885610245)//[3][2]
+                ),
+                asList( // [4]
+                    asList(-744.3464669962211, 3.8376055596419754),//[4][0]
+                    asList(5784.252615154324, -4792.10612059247, -2535.4093308546435)//[4][1]
+                )
+            )
+        )
+        .baselineValuesForSingleColumn(
+            asList( // row
+                asList( // [0]
+                    asList(0.054727088545119096, 0.3289046600776335, -183.0613955159468)//[0][0]
+                ),
+                asList( // [1]
+                    asList(-1653.1119499932845, 5132.117249049659),//[1][0]
+                    asList(735.8474815185632, -5.4205625353286795),//[1][1]
+                    asList(2.9513430741605107, -7513.09536433704),//[1][2]
+                    asList(1660.4238619967039),//[1][3]
+                    asList(472.7475322920831)//[1][4]
+                )
+            )
+        )
+        .go();
+  }
+
+  @Test
+  public void dateArray() throws Exception {
+
+    // Nesting 0: reading ARRAY<DATE>
+    testBuilder()
+        .sqlQuery("SELECT arr_n_0 FROM hive.`date_array`")
+        .unOrdered()
+        .baselineColumns("arr_n_0")
+        .baselineValuesForSingleColumn(asList(
+            parseLocalDate("2018-10-21"),
+            parseLocalDate("2017-07-11"),
+            parseLocalDate("2018-09-23")))
+        .baselineValuesForSingleColumn(emptyList())
+        .baselineValuesForSingleColumn(asList(parseLocalDate("2018-07-14")))
+        .go();
+
+    // Nesting 1: reading ARRAY<ARRAY<DATE>>
+    testBuilder()
+        .sqlQuery("SELECT arr_n_1 FROM hive.`date_array`")
+        .unOrdered()
+        .baselineColumns("arr_n_1")
+        .baselineValuesForSingleColumn(asList(
+            asList(parseLocalDate("2017-03-21"), parseLocalDate("2017-09-10"), parseLocalDate("2018-01-17")),
+            asList(parseLocalDate("2017-03-24"), parseLocalDate("2018-09-22"))))
+        .baselineValuesForSingleColumn(asList(emptyList(), emptyList()))
+        .baselineValuesForSingleColumn(asList(asList(parseLocalDate("2017-08-09"), parseLocalDate("2017-08-28"))))
+        .go();
+
+    // Nesting 2: reading ARRAY<ARRAY<ARRAY<DATE>>>
+    testBuilder()
+        .sqlQuery("SELECT arr_n_2 FROM hive.`date_array` order by rid")
+        .ordered()
+        .baselineColumns("arr_n_2")
+        .baselineValuesForSingleColumn(
+            asList( // row
+                asList( // [0]
+                    asList(parseLocalDate("1952-08-24")),//[0][0]
+                    asList(parseLocalDate("1968-10-05"), parseLocalDate("1951-07-27")),//[0][1]
+                    asList(parseLocalDate("1943-11-18"), parseLocalDate("1991-04-27"))//[0][2]
+                ),
+                asList( // [1]
+                    asList(parseLocalDate("1981-12-27"), parseLocalDate("1984-02-03")),//[1][0]
+                    asList(parseLocalDate("1953-04-15"), parseLocalDate("2002-08-15"), parseLocalDate("1926-12-10")),//[1][1]
+                    asList(parseLocalDate("2009-08-09"), parseLocalDate("1919-08-30"), parseLocalDate("1906-04-10")),//[1][2]
+                    asList(parseLocalDate("1995-10-28"), parseLocalDate("1989-09-07")),//[1][3]
+                    asList(parseLocalDate("2002-01-03"), parseLocalDate("1929-03-17"), parseLocalDate("1939-10-23"))//[1][4]
+                )
+            )
+        )
+        .baselineValuesForSingleColumn(
+            asList( // row
+                asList( // [0]
+                    asList(parseLocalDate("1936-05-05"), parseLocalDate("1941-04-12"), parseLocalDate("1914-04-15"))//[0][0]
+                ),
+                asList( // [1]
+                    asList(parseLocalDate("1944-05-09"), parseLocalDate("2002-02-11"))//[1][0]
+                )
+            )
+        )
+        .baselineValuesForSingleColumn(
+            asList( // row
+                asList( // [0]
+                    asList(parseLocalDate("1965-04-18"), parseLocalDate("2012-11-07"), parseLocalDate("1961-03-15")),//[0][0]
+                    asList(parseLocalDate("1922-05-22"), parseLocalDate("1978-03-25")),//[0][1]
+                    asList(parseLocalDate("1935-05-29"))//[0][2]
+                ),
+                asList( // [1]
+                    asList(parseLocalDate("1904-07-08"), parseLocalDate("1968-05-23"), parseLocalDate("1946-03-31")),//[1][0]
+                    asList(parseLocalDate("2014-01-28")),//[1][1]
+                    asList(parseLocalDate("1938-09-20"), parseLocalDate("1920-07-09"), parseLocalDate("1990-12-31")),//[1][2]
+                    asList(parseLocalDate("1984-07-20"), parseLocalDate("1988-11-25")),//[1][3]
+                    asList(parseLocalDate("1941-12-21"), parseLocalDate("1939-01-16"), parseLocalDate("2012-09-19"))//[1][4]
+                ),
+                asList( // [2]
+                    asList(parseLocalDate("2020-12-28")),//[2][0]
+                    asList(parseLocalDate("1930-11-13")),//[2][1]
+                    asList(parseLocalDate("2014-05-02"), parseLocalDate("1935-02-16"), parseLocalDate("1919-01-17")),//[2][2]
+                    asList(parseLocalDate("1972-04-20"), parseLocalDate("1951-05-30"), parseLocalDate("1963-01-11"))//[2][3]
+                ),
+                asList( // [3]
+                    asList(parseLocalDate("1993-03-20"), parseLocalDate("1978-12-31")),//[3][0]
+                    asList(parseLocalDate("1965-12-15"), parseLocalDate("1970-09-02"), parseLocalDate("2010-05-25"))//[3][1]
+                )
+            )
+        )
+        .go();
+  }
+
+  @Test
+  public void timestampArray() throws Exception {
+    // Nesting 0: reading ARRAY<TIMESTAMP>
+    testBuilder()
+        .sqlQuery("SELECT arr_n_0 FROM hive.`timestamp_array`")
+        .unOrdered()
+        .baselineColumns("arr_n_0")
+        .baselineValuesForSingleColumn(asList(
+            parseBest("2018-10-21 04:51:36"),
+            parseBest("2017-07-11 09:26:48"),
+            parseBest("2018-09-23 03:02:33")))
+        .baselineValuesForSingleColumn(emptyList())
+        .baselineValuesForSingleColumn(asList(parseBest("2018-07-14 05:20:34")))
+        .go();
+
+    // Nesting 1: reading ARRAY<ARRAY<TIMESTAMP>>
+    testBuilder()
+        .sqlQuery("SELECT arr_n_1 FROM hive.`timestamp_array`")
+        .unOrdered()
+        .baselineColumns("arr_n_1")
+        .baselineValuesForSingleColumn(asList(
+            asList(parseBest("2017-03-21 12:52:33"), parseBest("2017-09-10 01:29:24"), parseBest("2018-01-17 04:45:23")),
+            asList(parseBest("2017-03-24 01:03:23"), parseBest("2018-09-22 05:00:26"))))
+        .baselineValuesForSingleColumn(asList(emptyList(), emptyList()))
+        .baselineValuesForSingleColumn(asList(asList(parseBest("2017-08-09 08:26:08"), parseBest("2017-08-28 09:47:23"))))
+        .go();
+
+    // Nesting 2: reading ARRAY<ARRAY<ARRAY<TIMESTAMP>>>
+    testBuilder()
+        .sqlQuery("SELECT arr_n_2 FROM hive.`timestamp_array` order by rid")
+        .ordered()
+        .baselineColumns("arr_n_2")
+        .baselineValuesForSingleColumn(
+            asList( // row
+                asList( // [0]
+                    asList(parseBest("1929-01-08 19:31:47")),//[0][0]
+                    asList(parseBest("1968-07-02 15:13:55"), parseBest("1990-01-25 21:05:51"), parseBest("1950-10-26 19:16:10")),//[0][1]
+                    asList(parseBest("1946-09-03 03:03:50"), parseBest("1987-03-29 11:27:05")),//[0][2]
+                    asList(parseBest("1979-11-29 09:01:14"))//[0][3]
+                ),
+                asList( // [1]
+                    asList(parseBest("2010-08-26 12:08:51"), parseBest("2012-02-05 02:34:22")),//[1][0]
+                    asList(parseBest("1955-02-24 19:45:33")),//[1][1]
+                    asList(parseBest("1994-06-19 09:33:56"), parseBest("1971-11-05 06:27:55"), parseBest("1925-04-11 13:55:48")),//[1][2]
+                    asList(parseBest("1916-10-02 05:09:18"), parseBest("1995-04-11 18:05:51"), parseBest("1973-11-17 06:06:53"))//[1][3]
+                ),
+                asList( // [2]
+                    asList(parseBest("1929-12-19 16:49:08"), parseBest("1942-10-28 04:55:13"), parseBest("1936-12-01 13:01:37")),//[2][0]
+                    asList(parseBest("1926-12-09 07:34:14"), parseBest("1971-07-23 15:01:00"), parseBest("2014-01-07 06:29:03")),//[2][1]
+                    asList(parseBest("2012-08-25 23:26:10")),//[2][2]
+                    asList(parseBest("2010-03-04 08:31:54"), parseBest("1950-07-20 19:26:08"), parseBest("1953-03-16 16:13:24"))//[2][3]
+                )
+            )
+        )
+        .baselineValuesForSingleColumn(
+            asList( // row
+                asList( // [0]
+                    asList(parseBest("1904-12-10 00:39:14")),//[0][0]
+                    asList(parseBest("1994-04-12 23:06:07")),//[0][1]
+                    asList(parseBest("1954-07-05 23:48:09"), parseBest("1913-03-03 18:47:14"), parseBest("1960-04-30 22:35:28")),//[0][2]
+                    asList(parseBest("1962-09-26 17:11:12"), parseBest("1906-06-18 04:05:21"), parseBest("2003-06-19 05:15:24"))//[0][3]
+                ),
+                asList( // [1]
+                    asList(parseBest("1929-03-20 06:33:40"), parseBest("1939-02-12 07:03:07"), parseBest("1945-02-16 21:18:16"))//[1][0]
+                ),
+                asList( // [2]
+                    asList(parseBest("1969-08-11 22:25:31"), parseBest("1944-08-11 02:57:58")),//[2][0]
+                    asList(parseBest("1989-03-18 13:33:56"), parseBest("1961-06-06 04:44:50"))//[2][1]
+                )
+            )
+        )
+        .baselineValuesForSingleColumn(
+            asList( // row
+                asList( // [0]
+                    asList(parseBest("1999-12-07 01:16:45")),//[0][0]
+                    asList(parseBest("1903-12-11 04:28:20"), parseBest("2007-01-03 19:27:28")),//[0][1]
+                    asList(parseBest("2018-03-16 15:43:19"), parseBest("2002-09-16 08:58:40"), parseBest("1956-05-16 17:47:44")),//[0][2]
+                    asList(parseBest("2006-09-19 18:38:19"), parseBest("2016-01-21 12:39:30"))//[0][3]
+                )
+            )
+        )
+        .go();
+  }
+
+  @Test
+  public void binaryArray() throws Exception {
+    // Nesting 0: reading ARRAY<BINARY>
+    testBuilder()
+        .sqlQuery("SELECT arr_n_0 FROM hive.`binary_array`")
+        .unOrdered()
+        .baselineColumns("arr_n_0")
+        .baselineValuesForSingleColumn(asList(new StringBytes("First"), new StringBytes("Second"), new StringBytes("Third")))
+        .baselineValuesForSingleColumn(asList(new StringBytes("First")))
+        .go();
+  }
+
+  @Test
+  public void arrayViewDefinedInHive() throws Exception {
+    testBuilder()
+        .sqlQuery("SELECT * FROM hive.`arr_view` WHERE vwrid=1")
+        .unOrdered()
+        .baselineColumns("vwrid", "int_n0", "int_n1", "string_n0", "string_n1",
+            "varchar_n0", "varchar_n1", "char_n0", "char_n1", "tinyint_n0",
+            "tinyint_n1", "smallint_n0", "smallint_n1", "decimal_n0", "decimal_n1",
+            "boolean_n0", "boolean_n1", "bigint_n0", "bigint_n1", "float_n0", "float_n1",
+            "double_n0", "double_n1", "date_n0", "date_n1", "timestamp_n0", "timestamp_n1")
+        .baselineValues(
+            1,
+
+            asList(-1, 0, 1),
+            asList(asList(-1, 0, 1), asList(-2, 1)),
+
+            asList(new Text("First Value Of Array"), new Text("komlnp"), new Text("The Last Value")),
+            asList(asList(new Text("Array 0, Value 0"), new Text("Array 0, Value 1")), asList(new Text("Array 1"))),
+
+            asList(new Text("Five"), new Text("One"), new Text("T")),
+            asList(asList(new Text("Five"), new Text("One"), new Text("$42")), asList(new Text("T"), new Text("K"), new Text("O"))),
+
+            asList(new Text("aa"), new Text("cc"), new Text("ot")),
+            asList(asList(new Text("aa")), asList(new Text("cc"), new Text("ot"))),
+
+            asList(-128, 0, 127),
+            asList(asList(-128, -127), asList(0, 1), asList(127, 126)),
+
+            asList(-32768, 0, 32767),
+            asList(asList(-32768, -32768), asList(0, 0), asList(32767, 32767)),
+
+            asList(new BigDecimal("-100000.000"), new BigDecimal("102030.001"), new BigDecimal("0.001")),
+            asList(asList(new BigDecimal("-100000.000"), new BigDecimal("102030.001")), asList(new BigDecimal("0.101"), new BigDecimal("0.102")),
+                asList(new BigDecimal("0.001"), new BigDecimal("327670.001"))),
+
+            asList(false, true, false, true, false),
+            asList(asList(true, false, true), asList(false, false)),
+
+            asList(-9223372036854775808L, 0L, 10000000010L, 9223372036854775807L),
+            asList(asList(-9223372036854775808L, 0L, 10000000010L), asList(9223372036854775807L, 9223372036854775807L)),
+
+            asList(-32.058f, 94.47389f, 16.107912f),
+            asList(asList(-82.399826f, 12.633938f, 86.19402f), asList(-13.03544f, 64.65487f)),
+
+            asList(-13.241563769628, 0.3436367772981237, 9.73366),
+            asList(asList(-24.049666910012498, 14.975034200, 1.19975056092457), asList(-2.293376758961259, 80.783)),
+
+            asList(parseLocalDate("2018-10-21"), parseLocalDate("2017-07-11"), parseLocalDate("2018-09-23")),
+            asList(asList(parseLocalDate("2017-03-21"), parseLocalDate("2017-09-10"), parseLocalDate("2018-01-17")),
+                asList(parseLocalDate("2017-03-24"), parseLocalDate("2018-09-22"))),
+
+            asList(parseBest("2018-10-21 04:51:36"), parseBest("2017-07-11 09:26:48"), parseBest("2018-09-23 03:02:33")),
+            asList(asList(parseBest("2017-03-21 12:52:33"), parseBest("2017-09-10 01:29:24"), parseBest("2018-01-17 04:45:23")),
+                asList(parseBest("2017-03-24 01:03:23"), parseBest("2018-09-22 05:00:26")))
+        )
+        .go();
+  }
+
+  @Test
+  public void arrayViewDefinedInDrill() throws Exception {
+    queryBuilder().sql(
+        "CREATE VIEW " + StoragePluginTestUtils.DFS_TMP_SCHEMA + ".`dfs_arr_vw` AS " +
+            "SELECT " +
+            "   t1.rid as vwrid," +
+            "   t1.arr_n_0 as int_n0," +
+            "   t1.arr_n_1 as int_n1," +
+            "   t2.arr_n_0 as string_n0," +
+            "   t2.arr_n_1 as string_n1," +
+            "   t3.arr_n_0 as varchar_n0," +
+            "   t3.arr_n_1 as varchar_n1," +
+            "   t4.arr_n_0 as char_n0," +
+            "   t4.arr_n_1 as char_n1," +
+            "   t5.arr_n_0 as tinyint_n0," +
+            "   t5.arr_n_1 as tinyint_n1," +
+            "   t6.arr_n_0 as smallint_n0," +
+            "   t6.arr_n_1 as smallint_n1," +
+            "   t7.arr_n_0 as decimal_n0," +
+            "   t7.arr_n_1 as decimal_n1," +
+            "   t8.arr_n_0 as boolean_n0," +
+            "   t8.arr_n_1 as boolean_n1," +
+            "   t9.arr_n_0 as bigint_n0," +
+            "   t9.arr_n_1 as bigint_n1," +
+            "   t10.arr_n_0 as float_n0," +
+            "   t10.arr_n_1 as float_n1," +
+            "   t11.arr_n_0 as double_n0," +
+            "   t11.arr_n_1 as double_n1," +
+            "   t12.arr_n_0 as date_n0," +
+            "   t12.arr_n_1 as date_n1," +
+            "   t13.arr_n_0 as timestamp_n0," +
+            "   t13.arr_n_1 as timestamp_n1 " +
+            "FROM " +
+            "   hive.int_array t1," +
+            "   hive.string_array t2," +
+            "   hive.varchar_array t3," +
+            "   hive.char_array t4," +
+            "   hive.tinyint_array t5," +
+            "   hive.smallint_array t6," +
+            "   hive.decimal_array t7," +
+            "   hive.boolean_array t8," +
+            "   hive.bigint_array t9," +
+            "   hive.float_array t10," +
+            "   hive.double_array t11," +
+            "   hive.date_array t12," +
+            "   hive.timestamp_array t13 " +
+            "WHERE " +
+            "   t1.rid=t2.rid AND" +
+            "   t1.rid=t3.rid AND" +
+            "   t1.rid=t4.rid AND" +
+            "   t1.rid=t5.rid AND" +
+            "   t1.rid=t6.rid AND" +
+            "   t1.rid=t7.rid AND" +
+            "   t1.rid=t8.rid AND" +
+            "   t1.rid=t9.rid AND" +
+            "   t1.rid=t10.rid AND" +
+            "   t1.rid=t11.rid AND" +
+            "   t1.rid=t12.rid AND" +
+            "   t1.rid=t13.rid "
+    ).run();
+
+    testBuilder()
+        .sqlQuery("SELECT * FROM " + StoragePluginTestUtils.DFS_TMP_SCHEMA + ".`dfs_arr_vw` WHERE vwrid=1")
+        .unOrdered()
+        .baselineColumns("vwrid", "int_n0", "int_n1", "string_n0", "string_n1",
+            "varchar_n0", "varchar_n1", "char_n0", "char_n1", "tinyint_n0",
+            "tinyint_n1", "smallint_n0", "smallint_n1", "decimal_n0", "decimal_n1",
+            "boolean_n0", "boolean_n1", "bigint_n0", "bigint_n1", "float_n0", "float_n1",
+            "double_n0", "double_n1", "date_n0", "date_n1", "timestamp_n0", "timestamp_n1")
+        .baselineValues(
+            1,
+
+            asList(-1, 0, 1),
+            asList(asList(-1, 0, 1), asList(-2, 1)),
+
+            asList(new Text("First Value Of Array"), new Text("komlnp"), new Text("The Last Value")),
+            asList(asList(new Text("Array 0, Value 0"), new Text("Array 0, Value 1")), asList(new Text("Array 1"))),
+
+            asList(new Text("Five"), new Text("One"), new Text("T")),
+            asList(asList(new Text("Five"), new Text("One"), new Text("$42")), asList(new Text("T"), new Text("K"), new Text("O"))),
+
+            asList(new Text("aa"), new Text("cc"), new Text("ot")),
+            asList(asList(new Text("aa")), asList(new Text("cc"), new Text("ot"))),
+
+            asList(-128, 0, 127),
+            asList(asList(-128, -127), asList(0, 1), asList(127, 126)),
+
+            asList(-32768, 0, 32767),
+            asList(asList(-32768, -32768), asList(0, 0), asList(32767, 32767)),
+
+            asList(new BigDecimal("-100000.000"), new BigDecimal("102030.001"), new BigDecimal("0.001")),
+            asList(asList(new BigDecimal("-100000.000"), new BigDecimal("102030.001")), asList(new BigDecimal("0.101"), new BigDecimal("0.102")),
+                asList(new BigDecimal("0.001"), new BigDecimal("327670.001"))),
+
+            asList(false, true, false, true, false),
+            asList(asList(true, false, true), asList(false, false)),
+
+            asList(-9223372036854775808L, 0L, 10000000010L, 9223372036854775807L),
+            asList(asList(-9223372036854775808L, 0L, 10000000010L), asList(9223372036854775807L, 9223372036854775807L)),
+
+            asList(-32.058f, 94.47389f, 16.107912f),
+            asList(asList(-82.399826f, 12.633938f, 86.19402f), asList(-13.03544f, 64.65487f)),
+
+            asList(-13.241563769628, 0.3436367772981237, 9.73366),
+            asList(asList(-24.049666910012498, 14.975034200, 1.19975056092457), asList(-2.293376758961259, 80.783)),
+
+            asList(parseLocalDate("2018-10-21"), parseLocalDate("2017-07-11"), parseLocalDate("2018-09-23")),
+            asList(asList(parseLocalDate("2017-03-21"), parseLocalDate("2017-09-10"), parseLocalDate("2018-01-17")),
+                asList(parseLocalDate("2017-03-24"), parseLocalDate("2018-09-22"))),
+
+            asList(parseBest("2018-10-21 04:51:36"), parseBest("2017-07-11 09:26:48"), parseBest("2018-09-23 03:02:33")),
+            asList(asList(parseBest("2017-03-21 12:52:33"), parseBest("2017-09-10 01:29:24"), parseBest("2018-01-17 04:45:23")),
+                asList(parseBest("2017-03-24 01:03:23"), parseBest("2018-09-22 05:00:26")))
+        )
+        .go();
+  }
+
+  /**
+   * Workaround {@link StringBytes#equals(Object)} implementation
+   * used to compare binary array elements.
+   * See {@link TestHiveArrays#binaryArray()} for sample usage.
+   */
+  private static final class StringBytes {
+
+    private final byte[] bytes;
+
+    private StringBytes(String s) {
+      bytes = s.getBytes();
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+      if (obj instanceof byte[]) {
+        return Arrays.equals(bytes, (byte[]) obj);
+      }
+      return (obj == this) || (obj instanceof StringBytes
+          && Arrays.equals(bytes, ((StringBytes) obj).bytes));
+    }
+
+    @Override
+    public String toString() {
+      return new String(bytes);
+    }
+
+  }
+
+}
diff --git a/contrib/storage-hive/core/src/test/resources/complex_types/array/bigint_array.json b/contrib/storage-hive/core/src/test/resources/complex_types/array/bigint_array.json
new file mode 100644
index 0000000..c3b0a2d
--- /dev/null
+++ b/contrib/storage-hive/core/src/test/resources/complex_types/array/bigint_array.json
@@ -0,0 +1,3 @@
+{"rid":1,"arr_n_0":[-9223372036854775808, 0, 10000000010, 9223372036854775807],"arr_n_1":[[-9223372036854775808, 0, 10000000010],[9223372036854775807,9223372036854775807]],"arr_n_2":[[[7345032157033769004], [-2306607274383855051, 3656249581579032003]], [[6044100897358387146, 4737705104728607904]]]}
+{"rid":2,"arr_n_0":[],"arr_n_1":[[],[]],"arr_n_2":[[[4833583793282587107, -8917877693351417844, -3226305034926780974]]]}
+{"rid":3,"arr_n_0":[10005000],"arr_n_1":[[10005000,100050010]],"arr_n_2":[[[8679405200896733338, 8581721713860760451, 1150622751848016114], [-6672104994192826124, 4807952216371616134], [-7874492057876324257]], [[8197656735200560038], [7643173300425098029, -3186442699228156213, -8370345321491335247], [8781633305391982544, -7187468334864189662]], [[6685428436181310098], [1358587806266610826], [-2077124879355227614, -6787493227661516341], [3713296190482954025, -3890396613053404789], [463676 [...]
diff --git a/contrib/storage-hive/core/src/test/resources/complex_types/array/boolean_array.json b/contrib/storage-hive/core/src/test/resources/complex_types/array/boolean_array.json
new file mode 100644
index 0000000..4b3193a
--- /dev/null
+++ b/contrib/storage-hive/core/src/test/resources/complex_types/array/boolean_array.json
@@ -0,0 +1,3 @@
+{"rid":1,"arr_n_0":[false,true,false,true,false],"arr_n_1":[[true,false,true],[false,false]],"arr_n_2":[[[false, true]], [[true], [false, true], [true], [true]], [[false], [true, false, false], [true, true], [false, true, false]], [[false, true], [true, false], [true, false, true]], [[false], [false], [false]]]}
+{"rid":2,"arr_n_0":[],"arr_n_1":[[],[]],"arr_n_2":[[[false, true], [false], [false, false], [true, true, true], [false]], [[false, false, true]], [[false, true], [true, false]]]}
+{"rid":3,"arr_n_0":[true],"arr_n_1":[[false,true]],"arr_n_2":[[[true, true], [false, true, false], [true], [true, true, false]], [[false], [false, true], [false], [false]], [[true, true, true], [true, true, true], [false], [false]], [[false, false]]]}
diff --git a/contrib/storage-hive/core/src/test/resources/complex_types/array/char_array.json b/contrib/storage-hive/core/src/test/resources/complex_types/array/char_array.json
new file mode 100644
index 0000000..d060685
--- /dev/null
+++ b/contrib/storage-hive/core/src/test/resources/complex_types/array/char_array.json
@@ -0,0 +1,3 @@
+{"rid":1,"arr_n_0":["aa","cc","ot"], "arr_n_1":[["aa"],["cc","ot"]],"arr_n_2":[[["eT"]], [["w9", "fC", "ww"], ["3o", "f7", "Za"], ["lX", "iv", "jI"]], [["S3", "Qa", "aG"], ["bj", "gc", "NO"]]]}
+{"rid":2,"arr_n_0":[],"arr_n_1":[[],[]],"arr_n_2":[[["PV", "tH", "B7"], ["uL"], ["7b", "uf"], ["zj"], ["sA", "hf", "hR"]]]}
+{"rid":3,"arr_n_0":["+a","-c","*t"], "arr_n_1":[["*t"]],"arr_n_2":[[["W1", "FS"], ["le", "c0"], ["  ", "0v"]], [["gj"]]]}
diff --git a/contrib/storage-hive/core/src/test/resources/complex_types/array/date_array.json b/contrib/storage-hive/core/src/test/resources/complex_types/array/date_array.json
new file mode 100644
index 0000000..e1d1b38
--- /dev/null
+++ b/contrib/storage-hive/core/src/test/resources/complex_types/array/date_array.json
@@ -0,0 +1,3 @@
+{"rid":1,"arr_n_0":["2018-10-21","2017-07-11","2018-09-23"],"arr_n_1":[["2017-03-21","2017-09-10","2018-01-17"],["2017-03-24","2018-09-22"]],"arr_n_2":[[["1952-08-24"], ["1968-10-05", "1951-07-27"], ["1943-11-18", "1991-04-27"]], [["1981-12-27", "1984-02-03"], ["1953-04-15", "2002-08-15", "1926-12-10"], ["2009-08-09", "1919-08-30", "1906-04-10"], ["1995-10-28", "1989-09-07"], ["2002-01-03", "1929-03-17", "1939-10-23"]]]}
+{"rid":2,"arr_n_0":[],"arr_n_1":[[],[]],"arr_n_2":[[["1936-05-05", "1941-04-12", "1914-04-15"]], [["1944-05-09", "2002-02-11"]]]}
+{"rid":3,"arr_n_0":["2018-07-14"],"arr_n_1":[["2017-08-09","2017-08-28"]],"arr_n_2":[[["1965-04-18", "2012-11-07", "1961-03-15"], ["1922-05-22", "1978-03-25"], ["1935-05-29"]], [["1904-07-08", "1968-05-23", "1946-03-31"], ["2014-01-28"], ["1938-09-20", "1920-07-09", "1990-12-31"], ["1984-07-20", "1988-11-25"], ["1941-12-21", "1939-01-16", "2012-09-19"]], [["2020-12-28"], ["1930-11-13"], ["2014-05-02", "1935-02-16", "1919-01-17"], ["1972-04-20", "1951-05-30", "1963-01-11"]], [["1993-03-20 [...]
diff --git a/contrib/storage-hive/core/src/test/resources/complex_types/array/decimal_array.json b/contrib/storage-hive/core/src/test/resources/complex_types/array/decimal_array.json
new file mode 100644
index 0000000..4b23bc5
--- /dev/null
+++ b/contrib/storage-hive/core/src/test/resources/complex_types/array/decimal_array.json
@@ -0,0 +1,3 @@
+{"rid":1,"arr_n_0":[-100000.000,102030.001,0.001],"arr_n_1":[[-100000.000,102030.001],[0.101,0.102],[0.001,327670.001]],"arr_n_2":[[[9.453], [8.233, -146577.465], [-911144.423, -862766.866, -129948.784]], [[931346.867]], [[81.750], [587225.077, -3.930], [0.042], [-342346.511]]]}
+{"rid":2,"arr_n_0":[],"arr_n_1":[[],[]],"arr_n_2":[[[375098.406, 84.509], [-446325.287, 3.671], [286958.380, 314821.890, 18513.303], [-444023.971, 827746.528, -54.986], [-44520.406]]]}
+{"rid":3,"arr_n_0":[-10.500],"arr_n_1":[[10.500,5.010]],"arr_n_2":[[[906668.849, 1.406], [-494177.333, 952997.058]], [[642385.159, 369753.830, 634889.981], [83970.515, -847315.758, -0.600], [73013.870], [337872.675, 375940.114, -2.670], [-7.899, 755611.538]]]}
diff --git a/contrib/storage-hive/core/src/test/resources/complex_types/array/double_array.json b/contrib/storage-hive/core/src/test/resources/complex_types/array/double_array.json
new file mode 100644
index 0000000..32c4321
--- /dev/null
+++ b/contrib/storage-hive/core/src/test/resources/complex_types/array/double_array.json
@@ -0,0 +1,3 @@
+{"rid":1,"arr_n_0":[-13.241563769628,0.3436367772981237,9.73366],"arr_n_1":[[-24.049666910012498,14.975034200,1.19975056092457],[-2.293376758961259,80.783]],"arr_n_2":[[[-9.269519394436928], [0.7319990286742192, 55.53357952933713, -4.450389221972496]], [[0.8453724066773386]]]}
+{"rid":2,"arr_n_0":[],"arr_n_1":[[],[]],"arr_n_2":[[[-7966.1700155142025, 2519.664646202656], [-0.4584683555041169], [-860.4673046946417, 6.371900064750405, 0.4722917366204724]], [[-62.76596817199298], [712.7880069076203, -5.14172156610055], [3891.128276893486, -0.5008908018575201]], [[246.42074787345825, -0.7252828610111548], [-845.6633966327038, -436.5267842528363]], [[5.177407969462521], [0.10545048230228471, 0.7364424942282094], [-373.3798205258425, -79.65616885610245]], [[-744.34646 [...]
+{"rid":3,"arr_n_0":[15.581409176959358],"arr_n_1":[[0.47745359256854,-0.47745359256854]],"arr_n_2":[[[0.054727088545119096, 0.3289046600776335, -183.0613955159468]], [[-1653.1119499932845, 5132.117249049659], [735.8474815185632, -5.4205625353286795], [2.9513430741605107, -7513.09536433704], [1660.4238619967039], [472.7475322920831]]]}
\ No newline at end of file
diff --git a/contrib/storage-hive/core/src/test/resources/complex_types/array/float_array.json b/contrib/storage-hive/core/src/test/resources/complex_types/array/float_array.json
new file mode 100644
index 0000000..ddfe09a
--- /dev/null
+++ b/contrib/storage-hive/core/src/test/resources/complex_types/array/float_array.json
@@ -0,0 +1,3 @@
+{"rid":1,"arr_n_0":[-32.058,94.47389,16.107912],"arr_n_1":[[-82.399826,12.633938,86.19402],[-13.03544,64.65487]],"arr_n_2":[[[-5.6506114], [26.546333, 3724.8389], [-53.65775, 686.8335, -0.99032]]]}
+{"rid":2,"arr_n_0":[],"arr_n_1":[[],[]],"arr_n_2":[[[29.042528], [3524.3398, -8856.58, 6.8508215]], [[-0.73994386, -2.0008986], [-9.903006, -271.26172], [-131.80347], [39.721367, -4870.5444], [-1.4830998, -766.3066, -0.1659732]], [[3467.0298, -240.64255], [2.4072556, -85.89145]]]}
+{"rid":3,"arr_n_0":[25.96484],"arr_n_1":[[15.259451,-15.259451]],"arr_n_2":[[[-888.68243, -38.09065], [-6948.154, -185.64319, 0.7401936], [-705.2718, -932.4041]], [[-2.581712, 0.28686252, -0.98652786], [-57.448563, -0.0057083773, -0.21712556], [-8.076653, -8149.519, -7.5968184], [8.823492], [-9134.323, 467.53275, -59.763447]], [[0.33596575, 6805.2256, -3087.9531], [9816.865, -164.90712, -1.9071647]], [[-0.23883149], [-5.3763375, -4.7661624]], [[-52.42167, 247.91452], [9499.771], [-0.6549 [...]
diff --git a/contrib/storage-hive/core/src/test/resources/complex_types/array/int_array.json b/contrib/storage-hive/core/src/test/resources/complex_types/array/int_array.json
new file mode 100644
index 0000000..36bc481
--- /dev/null
+++ b/contrib/storage-hive/core/src/test/resources/complex_types/array/int_array.json
@@ -0,0 +1,3 @@
+{"rid":1,"arr_n_0":[-1,0,1],"arr_n_1":[[-1,0,1],[-2,1]],"arr_n_2":[[[7, 81], [-92, 54, -83], [-10, -59]], [[-43, -80]], [[-70, -62]]]}
+{"rid":2,"arr_n_0":[],"arr_n_1":[[],[]],"arr_n_2":[[[34, -18]], [[-87, 87], [52, 58], [58, 20, -81], [-94, -93]]]}
+{"rid":3,"arr_n_0":[100500],"arr_n_1":[[100500,500100]],"arr_n_2":[[[-56, 9], [39, 5]], [[28, 88, -28]]]}
diff --git a/contrib/storage-hive/core/src/test/resources/complex_types/array/smallint_array.json b/contrib/storage-hive/core/src/test/resources/complex_types/array/smallint_array.json
new file mode 100644
index 0000000..f829fee
--- /dev/null
+++ b/contrib/storage-hive/core/src/test/resources/complex_types/array/smallint_array.json
@@ -0,0 +1,3 @@
+{"rid":1,"arr_n_0":[-32768,0,32767],"arr_n_1":[[-32768,-32768],[0,0],[32767,32767]],"arr_n_2":[[[-28752]], [[17243, 15652], [-9684], [10176, 18123], [-15404, 15420], [11136, -19435]], [[-29634, -12695], [4350, -24289, -10889]], [[13731], [27661, -15794, 21784], [14341, -4635], [1601, -29973], [2750, 30373, -11630]], [[-11383]]]}
+{"rid":2,"arr_n_0":[],"arr_n_1":[[],[]],"arr_n_2":[[[23860], [-27345, 19068], [-7174, 286, 14673]], [[14844, -9087], [-25185, 219], [26875], [-4699], [-3853, -15729, 11472]], [[-29142], [-13859], [-23073, 31368, -26542]], [[14914, 14656], [4636, 6289]]]}
+{"rid":3,"arr_n_0":[10500],"arr_n_1":[[10500,5010]],"arr_n_2":[[[10426, 31865], [-19088], [-4774], [17988]], [[-6214, -26836, 30715]], [[-4231], [31742, -661], [-22842, 4203], [18278]]]}
diff --git a/contrib/storage-hive/core/src/test/resources/complex_types/array/string_array.json b/contrib/storage-hive/core/src/test/resources/complex_types/array/string_array.json
new file mode 100644
index 0000000..23a5b19
--- /dev/null
+++ b/contrib/storage-hive/core/src/test/resources/complex_types/array/string_array.json
@@ -0,0 +1,3 @@
+{"rid":1,"arr_n_0":["First Value Of Array", "komlnp", "The Last Value"], "arr_n_1":[["Array 0, Value 0","Array 0, Value 1"], ["Array 1"]],"arr_n_2":[[["dhMGOr1QVO", "NZpzBl", "LC8mjYyOJ7l8dHUpk"]], [["JH"], ["aVxgfxAu"], ["fF amN8z8"]], [["denwte5R39dSb2PeG", "Gbosj97RXTvBK1w", "S3whFvN"], ["2sNbYGQhkt303Gnu", "rwG", "SQH766A8XwHg2pTA6a"]], [["L", "khGFDtDluFNoo5hT"], ["b8"], ["Z"]], [["DTEuW", "b0Wt84hIl", "A1H"], ["h2zXh3Qc", "NOcgU8", "RGfVgv2rvDG"], ["Hfn1ov9hB7fZN", "0ZgCD3"]]]}
+{"rid":2,"arr_n_0":[], "arr_n_1":[[],[]],"arr_n_2":[[["nk", "HA", "CgAZCxTbTrFWJL3yM"], ["T7fGXYwtBb", "G6vc"], ["GrwB5j3LBy9"], ["g7UreegD1H97", "dniQ5Ehhps7c1pBuM", "S wSNMGj7c"], ["iWTEJS0", "4F"]], [["YpRcC01u6i6KO", "ujpMrvEfUWfKm", "2d"], ["2", "HVDH", "5Qx Q6W112"]]]}
+{"rid":3,"arr_n_0":["ABCaBcA-1-2-3"], "arr_n_1":[["One"]],"arr_n_2":[[["S8d2vjNu680hSim6iJ"], ["lRLaT9RvvgzhZ3C", "igSX1CP", "FFZMwMvAOod8"], ["iBX", "sG"], ["ChRjuDPz99WeU9", "2gBBmMUXV9E5E", " VkEARI2upO"]], [["UgMok3Q5wmd"], ["8Zf9CLfUSWK", "", "NZ7v"], ["vQE3I5t26", "251BeQJue"]], [["Rpo8"]], [["jj3njyupewOM Ej0pu", "aePLtGgtyu4aJ5", "cKHSvNbImH1MkQmw0Cs"], ["VSO5JgI2x7TnK31L5", "hIub", "eoBSa0zUFlwroSucU"], ["V8Gny91lT", "5hBncDZ"]], [["Y3", "StcgywfU", "BFTDChc"], ["5JNwXc2UHLld7", [...]
diff --git a/contrib/storage-hive/core/src/test/resources/complex_types/array/timestamp_array.json b/contrib/storage-hive/core/src/test/resources/complex_types/array/timestamp_array.json
new file mode 100644
index 0000000..e0c5442
--- /dev/null
+++ b/contrib/storage-hive/core/src/test/resources/complex_types/array/timestamp_array.json
@@ -0,0 +1,3 @@
+{"rid":1,"arr_n_0":["2018-10-21 04:51:36","2017-07-11 09:26:48","2018-09-23 03:02:33"], "arr_n_1":[["2017-03-21 12:52:33","2017-09-10 01:29:24","2018-01-17 04:45:23"], ["2017-03-24 01:03:23", "2018-09-22 05:00:26"]],"arr_n_2":[[["1929-01-08 19:31:47"], ["1968-07-02 15:13:55", "1990-01-25 21:05:51", "1950-10-26 19:16:10"], ["1946-09-03 03:03:50", "1987-03-29 11:27:05"], ["1979-11-29 09:01:14"]], [["2010-08-26 12:08:51", "2012-02-05 02:34:22"], ["1955-02-24 19:45:33"], ["1994-06-19 09:33:5 [...]
+{"rid":2,"arr_n_0":[],"arr_n_1":[[],[]],"arr_n_2":[[["1904-12-10 00:39:14"], ["1994-04-12 23:06:07"], ["1954-07-05 23:48:09", "1913-03-03 18:47:14", "1960-04-30 22:35:28"], ["1962-09-26 17:11:12", "1906-06-18 04:05:21", "2003-06-19 05:15:24"]], [["1929-03-20 06:33:40", "1939-02-12 07:03:07", "1945-02-16 21:18:16"]], [["1969-08-11 22:25:31", "1944-08-11 02:57:58"], ["1989-03-18 13:33:56", "1961-06-06 04:44:50"]]]}
+{"rid":3,"arr_n_0":["2018-07-14 05:20:34"],"arr_n_1":[["2017-08-09 08:26:08","2017-08-28 09:47:23"]],"arr_n_2":[[["1999-12-07 01:16:45"], ["1903-12-11 04:28:20", "2007-01-03 19:27:28"], ["2018-03-16 15:43:19", "2002-09-16 08:58:40", "1956-05-16 17:47:44"], ["2006-09-19 18:38:19", "2016-01-21 12:39:30"]]]}
diff --git a/contrib/storage-hive/core/src/test/resources/complex_types/array/tinyint_array.json b/contrib/storage-hive/core/src/test/resources/complex_types/array/tinyint_array.json
new file mode 100644
index 0000000..a3dd978
--- /dev/null
+++ b/contrib/storage-hive/core/src/test/resources/complex_types/array/tinyint_array.json
@@ -0,0 +1,3 @@
+{"rid":1,"arr_n_0":[-128,0,127], "arr_n_1":[[-128,-127],[0,1],[127,126]],"arr_n_2":[[[31, 65, 54], [66], [22], [-33, -125, 116]], [[-5, -10]], [[78], [86], [90, 34], [32]], [[103, -49, -33], [-30], [107, 24, 74], [16, -58]], [[-119, -8], [50, -99, 26], [-119]]]}
+{"rid":2,"arr_n_0":[], "arr_n_1":[[],[]],"arr_n_2":[[[-90, -113], [71, -65]], [[88, -83]], [[11], [121, -57]], [[-79], [16, -111, -111], [90, 106], [33, 29, 42], [74]]]}
+{"rid":3,"arr_n_0":[-101], "arr_n_1":[[-102]],"arr_n_2":[[[74, -115], [19, 85, 3]]]}
diff --git a/contrib/storage-hive/core/src/test/resources/complex_types/array/varchar_array.json b/contrib/storage-hive/core/src/test/resources/complex_types/array/varchar_array.json
new file mode 100644
index 0000000..f7e7217
--- /dev/null
+++ b/contrib/storage-hive/core/src/test/resources/complex_types/array/varchar_array.json
@@ -0,0 +1,3 @@
+{"rid":1,"arr_n_0":["Five","One", "T"], "arr_n_1":[["Five","One","$42"], ["T","K","O"]],"arr_n_2":[[[""], ["Gt", "", ""], ["9R3y"], ["X3a4"]], [["o", "6T", "QKAZ"], ["", "xf8r", "As"], ["5kS3"]], [["", "S7Gx"], ["ml", "27pL", "VPxr"], [""], ["e", "Dj"]], [["", "XYO", "fEWz"], ["", "oU"], ["o 8", "", ""], ["giML", "H7g"], ["SWX9", "H", "emwt"]], [["Sp"]]]}
+{"rid":2,"arr_n_0":[], "arr_n_1":[[], []],"arr_n_2":[[["GCx"], ["", "V"], ["pF", "R7", ""], ["", "AKal"]]]}
+{"rid":3,"arr_n_0":["ZZ0","-c54g","ooo", "k22k"], "arr_n_1":[["-c54g"]],"arr_n_2":[[["m", "MBAv", "7R9F"], ["ovv"], ["p 7l"]]]}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/dotdrill/View.java b/exec/java-exec/src/main/java/org/apache/drill/exec/dotdrill/View.java
index 91900de..fc125c1 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/dotdrill/View.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/dotdrill/View.java
@@ -347,6 +347,12 @@ public class View {
          */
         type = factory.createSqlType(SqlTypeName.ANY);
       }
+    } else if (typeName == SqlTypeName.ARRAY) {
+      /*
+       * Treat array type as any to avoid generation of CAST(fieldName,'ARRAY')
+       * unsupported in Drill.
+       */
+      type = factory.createSqlType(SqlTypeName.ANY);
     } else {
       type = factory.createSqlType(field.getType());
     }
diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/DrillTestWrapper.java b/exec/java-exec/src/test/java/org/apache/drill/test/DrillTestWrapper.java
index 88bb03b..db1e023 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/test/DrillTestWrapper.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/test/DrillTestWrapper.java
@@ -862,6 +862,7 @@ public class DrillTestWrapper {
     for (Map<String, Object> expectedRecord : expectedRecords) {
       i = 0;
       found = false;
+      StringBuilder mismatchHistory = new StringBuilder();
       findMatch:
       for (Map<String, Object> actualRecord : actualRecords) {
         for (String s : actualRecord.keySet()) {
@@ -870,6 +871,9 @@ public class DrillTestWrapper {
           }
           if (! compareValues(expectedRecord.get(s), actualRecord.get(s), counter, s, approximateEquality, tolerance)) {
             i++;
+            mismatchHistory.append("column: ").append(s)
+                .append(" exp: |").append(expectedRecord.get(s))
+                .append("| act: |").append(actualRecord.get(s)).append("|\n");
             continue findMatch;
           }
         }
@@ -895,6 +899,7 @@ public class DrillTestWrapper {
         }
         String actualRecordExamples = sb.toString();
         throw new Exception(String.format("After matching %d records, did not find expected record in result set:\n %s\n\n" +
+                "Mismatch column: \n" + mismatchHistory + "\n" +
             "Some examples of expected records:\n%s\n\n Some examples of records returned by the test query:\n%s",
             counter, printRecord(expectedRecord), expectedRecordExamples, actualRecordExamples));
       } else {
@@ -916,11 +921,10 @@ public class DrillTestWrapper {
   }
 
   private String printRecord(Map<String, ?> record) {
-    String ret = "";
-    for (String s : record.keySet()) {
-      ret += s + " : "  + record.get(s) + ", ";
-    }
-    return ret + "\n";
+    StringBuilder sb = new StringBuilder();
+    record.keySet().stream().sorted()
+        .forEach(key -> sb.append(key).append(" : ").append(record.get(key)).append(", "));
+    return sb.append(System.lineSeparator()).toString();
   }
 
   private void test(String query) throws Exception {
diff --git a/exec/vector/src/main/codegen/templates/ComplexWriters.java b/exec/vector/src/main/codegen/templates/ComplexWriters.java
index f46b795..434f618 100644
--- a/exec/vector/src/main/codegen/templates/ComplexWriters.java
+++ b/exec/vector/src/main/codegen/templates/ComplexWriters.java
@@ -103,6 +103,13 @@ public class ${eName}WriterImpl extends AbstractFieldWriter {
   }
   </#if>
 
+  <#if minor.class?contains("VarDecimal")>
+  public void writeVarDecimal(BigDecimal value) {
+    mutator.addSafe(idx(), value.unscaledValue().toByteArray());
+    vector.getMutator().setValueCount(idx() + 1);
+  }
+  </#if>
+
   public void setPosition(int idx) {
     super.setPosition(idx);
     mutator.startNewValue(idx);