You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by pa...@apache.org on 2017/11/22 22:52:10 UTC

[08/12] drill git commit: DRILL-5941: Skip header / footer improvements for Hive storage plugin

DRILL-5941: Skip header / footer improvements for Hive storage plugin

Overview:
1. When table has header / footer process input splits fo the same file in one reader (bug fix for DRILL-5941).
2. Apply skip header logic during reader initialization only once to avoid checks during reading the data (DRILL-5106).
3. Apply skip footer logic only when footer is more then 0, otherwise default processing will be done without buffering data in queue (DRILL-5106).

Code changes:
1. AbstractReadersInitializer was introduced to factor out common logic during readers intialization.
It will have two implementations:
a. Default (each input split group gets its own reader);
b. Empty (for empty tables);

2. AbstractRecordsInspector was introduced to improve performance when table has footer is less or equals to 0.
It will have two implementations:
a. Default (records will be processed one by one without buffering);
b. SkipFooter (queue will be used to buffer N records that should be skipped in the end of file processing).

3. When text table has header / footer each table file should be read as one unit. When file is being read as several input splits, they should be grouped.
For this purpose LogicalInputSplit class was introduced which replaced InputSplitWrapper class. New class stores list of grouped input splits and returns information about splits on group level.
Please note, during planning input splits are grouped only when data is being read from text table has header / footer each table, otherwise each input split is treated separately.

4. Allow HiveAbstractReader to have multiple input splits instead of one.

This closes #1030


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/f8bbb759
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/f8bbb759
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/f8bbb759

Branch: refs/heads/master
Commit: f8bbb7591235627a58a1f10584d7902d8251d221
Parents: 36abdd7
Author: Arina Ielchiieva <ar...@gmail.com>
Authored: Thu Nov 9 08:58:46 2017 +0000
Committer: Parth Chandra <pa...@apache.org>
Committed: Wed Nov 22 10:47:25 2017 -0800

----------------------------------------------------------------------
 .../core/src/main/codegen/includes/license.ftl  |   5 +-
 .../codegen/templates/HiveRecordReaders.java    | 229 +++-------
 .../exec/store/hive/HiveAbstractReader.java     | 358 ----------------
 .../hive/HiveDrillNativeParquetSubScan.java     |   4 +-
 .../hive/HiveDrillNativeScanBatchCreator.java   |  99 ++---
 .../exec/store/hive/HiveMetadataProvider.java   | 190 +++++++--
 .../apache/drill/exec/store/hive/HiveScan.java  |  52 +--
 .../exec/store/hive/HiveScanBatchCreator.java   |  69 +--
 .../drill/exec/store/hive/HiveSubScan.java      |  30 +-
 .../drill/exec/store/hive/HiveUtilities.java    |  45 +-
 .../store/hive/readers/HiveAbstractReader.java  | 417 +++++++++++++++++++
 .../AbstractReadersInitializer.java             |  78 ++++
 .../DefaultReadersInitializer.java              |  54 +++
 .../initilializers/EmptyReadersInitializer.java |  48 +++
 .../initilializers/ReadersInitializer.java      |  87 ++++
 .../inspectors/AbstractRecordsInspector.java    |  71 ++++
 .../inspectors/DefaultRecordsInspector.java     |  41 ++
 .../inspectors/SkipFooterRecordsInspector.java  |  87 ++++
 .../apache/drill/exec/hive/TestHiveStorage.java |  34 +-
 .../exec/hive/TestInfoSchemaOnHiveStorage.java  |  14 +-
 .../exec/store/hive/HiveTestDataGenerator.java  |  74 ++--
 .../SkipFooterRecordsInspectorTest.java         |  84 ++++
 22 files changed, 1394 insertions(+), 776 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/f8bbb759/contrib/storage-hive/core/src/main/codegen/includes/license.ftl
----------------------------------------------------------------------
diff --git a/contrib/storage-hive/core/src/main/codegen/includes/license.ftl b/contrib/storage-hive/core/src/main/codegen/includes/license.ftl
index 0455fd8..6117e6a 100644
--- a/contrib/storage-hive/core/src/main/codegen/includes/license.ftl
+++ b/contrib/storage-hive/core/src/main/codegen/includes/license.ftl
@@ -1,5 +1,4 @@
-/*******************************************************************************
-
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -15,4 +14,4 @@
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  * See the License for the specific language governing permissions and
  * limitations under the License.
- ******************************************************************************/
\ No newline at end of file
+*/
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/drill/blob/f8bbb759/contrib/storage-hive/core/src/main/codegen/templates/HiveRecordReaders.java
----------------------------------------------------------------------
diff --git a/contrib/storage-hive/core/src/main/codegen/templates/HiveRecordReaders.java b/contrib/storage-hive/core/src/main/codegen/templates/HiveRecordReaders.java
index 7d6733e..b4b2039 100644
--- a/contrib/storage-hive/core/src/main/codegen/templates/HiveRecordReaders.java
+++ b/contrib/storage-hive/core/src/main/codegen/templates/HiveRecordReaders.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-/**
+/*
  * This template is used to generate different Hive record reader classes for different data formats
  * to avoid JIT profile pullusion. These readers are derived from HiveAbstractReader which implements
  * codes for init and setup stage, but the repeated - and performance critical part - next() method is
@@ -32,15 +32,17 @@
 <@pp.changeOutputFile name="/org/apache/drill/exec/store/hive/Hive${entry.hiveReader}Reader.java" />
 <#include "/@includes/license.ftl" />
 
-package org.apache.drill.exec.store.hive;
+package org.apache.drill.exec.store.hive.readers;
 
-import java.io.IOException;
+import java.util.Collection;
 import java.util.List;
 import java.util.Properties;
 import org.apache.drill.common.exceptions.DrillRuntimeException;
 import org.apache.drill.common.exceptions.ExecutionSetupException;
 import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.store.hive.HivePartition;
+import org.apache.drill.exec.store.hive.HiveTableWithColumnCache;
 import org.apache.drill.exec.vector.AllocationHelper;
 import org.apache.drill.exec.vector.ValueVector;
 import org.apache.hadoop.io.Writable;
@@ -51,31 +53,26 @@ import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.serde2.SerDeException;
 
 import org.apache.hadoop.mapred.RecordReader;
+
 <#if entry.hasHeaderFooter == true>
-import org.apache.hadoop.hive.serde2.SerDe;
-import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
-import com.google.common.collect.Lists;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashSet;
-import java.util.Queue;
-import java.util.Set;
-import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
+import org.apache.drill.exec.store.hive.readers.inspectors.AbstractRecordsInspector;
+import org.apache.drill.exec.store.hive.readers.inspectors.DefaultRecordsInspector;
+import org.apache.drill.exec.store.hive.readers.inspectors.SkipFooterRecordsInspector;
+import org.apache.drill.exec.store.hive.HiveUtilities;
 import org.apache.hadoop.hive.serde.serdeConstants;
 </#if>
 
 public class Hive${entry.hiveReader}Reader extends HiveAbstractReader {
 
-  Object key;
 <#if entry.hasHeaderFooter == true>
-  SkipRecordsInspector skipRecordsInspector;
+  AbstractRecordsInspector recordsInspector;
 <#else>
   Object value;
 </#if>
 
-  public Hive${entry.hiveReader}Reader(HiveTableWithColumnCache table, HivePartition partition, InputSplit inputSplit, List<SchemaPath> projectedColumns,
-                       FragmentContext context, final HiveConf hiveConf,
-                       UserGroupInformation proxyUgi) throws ExecutionSetupException {
+  public Hive${entry.hiveReader}Reader(HiveTableWithColumnCache table, HivePartition partition, Collection<InputSplit> inputSplit, List<SchemaPath> projectedColumns,
+                      FragmentContext context, final HiveConf hiveConf,
+                      UserGroupInformation proxyUgi) throws ExecutionSetupException {
     super(table, partition, inputSplit, projectedColumns, context, hiveConf, proxyUgi);
   }
 
@@ -83,187 +80,71 @@ public class Hive${entry.hiveReader}Reader extends HiveAbstractReader {
 
     key = reader.createKey();
 <#if entry.hasHeaderFooter == true>
-    skipRecordsInspector = new SkipRecordsInspector(tableProperties, reader);
+    int skipHeaderCount = HiveUtilities.retrieveIntProperty(tableProperties, serdeConstants.HEADER_COUNT, -1);
+
+    // skip first N records to apply skip header policy
+    Object value = reader.createValue();
+    for (int i = 0; i < skipHeaderCount; i++) {
+      if (!hasNextValue(value)) {
+        // no more records to skip, we drained the table
+        empty = true;
+        break;
+      }
+    }
+
+    // if table was drained while skipping first N records, there is no need to check for skip footer logic
+    if (!empty) {
+      int skipFooterCount = HiveUtilities.retrieveIntProperty(tableProperties, serdeConstants.FOOTER_COUNT, -1);
+
+      // if we need to skip N last records, use records inspector which will buffer records while reading
+      if (skipFooterCount > 0) {
+        recordsInspector = new SkipFooterRecordsInspector(reader, skipFooterCount);
+      } else {
+        recordsInspector = new DefaultRecordsInspector(reader.createValue());
+      }
+    }
 <#else>
     value = reader.createValue();
 </#if>
 
   }
-  private void readHiveRecordAndInsertIntoRecordBatch(Object deSerializedValue, int outputRecordIndex) {
-    for (int i = 0; i < selectedStructFieldRefs.size(); i++) {
-      Object hiveValue = finalOI.getStructFieldData(deSerializedValue, selectedStructFieldRefs.get(i));
-      if (hiveValue != null) {
-        selectedColumnFieldConverters.get(i).setSafeValue(selectedColumnObjInspectors.get(i), hiveValue,
-          vectors.get(i), outputRecordIndex);
-      }
-    }
-  }
 
 <#if entry.hasHeaderFooter == true>
+
   @Override
   public int next() {
     for (ValueVector vv : vectors) {
       AllocationHelper.allocateNew(vv, TARGET_RECORD_COUNT);
     }
+
     if (empty) {
       setValueCountAndPopulatePartitionVectors(0);
       return 0;
     }
 
     try {
-      skipRecordsInspector.reset();
-      Object value;
-
-      int recordCount = 0;
-
-      while (recordCount < TARGET_RECORD_COUNT && reader.next(key, value = skipRecordsInspector.getNextValue())) {
-        if (skipRecordsInspector.doSkipHeader(recordCount++)) {
-          continue;
-        }
-        Object bufferedValue = skipRecordsInspector.bufferAdd(value);
-        if (bufferedValue != null) {
-          Object deSerializedValue = partitionSerDe.deserialize((Writable) bufferedValue);
+      // starting new batch, reset processed records count
+      recordsInspector.reset();
+
+      // process records till batch is full or all records were processed
+      while (!recordsInspector.isBatchFull() && hasNextValue(recordsInspector.getValueHolder())) {
+        Object value = recordsInspector.getNextValue();
+        if (value != null) {
+          Object deSerializedValue = partitionSerDe.deserialize((Writable) value);
           if (partTblObjectInspectorConverter != null) {
             deSerializedValue = partTblObjectInspectorConverter.convert(deSerializedValue);
           }
-          readHiveRecordAndInsertIntoRecordBatch(deSerializedValue, skipRecordsInspector.getActualCount());
-          skipRecordsInspector.incrementActualCount();
+          readHiveRecordAndInsertIntoRecordBatch(deSerializedValue, recordsInspector.getProcessedRecordCount());
+          recordsInspector.incrementProcessedRecordCount();
         }
-        skipRecordsInspector.incrementTempCount();
       }
-
-      setValueCountAndPopulatePartitionVectors(skipRecordsInspector.getActualCount());
-      skipRecordsInspector.updateContinuance();
-      return skipRecordsInspector.getActualCount();
-    } catch (IOException | SerDeException e) {
+      setValueCountAndPopulatePartitionVectors(recordsInspector.getProcessedRecordCount());
+      return recordsInspector.getProcessedRecordCount();
+    } catch (SerDeException e) {
       throw new DrillRuntimeException(e);
     }
   }
 
-/**
- * SkipRecordsInspector encapsulates logic to skip header and footer from file.
- * Logic is applicable only for predefined in constructor file formats.
- */
-protected class SkipRecordsInspector {
-
-  private final Set<Object> fileFormats;
-  private int headerCount;
-  private int footerCount;
-  private Queue<Object> footerBuffer;
-  // indicates if we continue reading the same file
-  private boolean continuance;
-  private int holderIndex;
-  private List<Object> valueHolder;
-  private int actualCount;
-  // actualCount without headerCount, used to determine holderIndex
-  private int tempCount;
-
-  protected SkipRecordsInspector(Properties tableProperties, RecordReader reader) {
-    this.fileFormats = new HashSet<Object>(Arrays.asList(org.apache.hadoop.mapred.TextInputFormat.class.getName()));
-    this.headerCount = retrievePositiveIntProperty(tableProperties, serdeConstants.HEADER_COUNT, 0);
-    this.footerCount = retrievePositiveIntProperty(tableProperties, serdeConstants.FOOTER_COUNT, 0);
-    logger.debug("skipRecordInspector: fileFormat {}, headerCount {}, footerCount {}",
-        this.fileFormats, this.headerCount, this.footerCount);
-    this.footerBuffer = Lists.newLinkedList();
-    this.continuance = false;
-    this.holderIndex = -1;
-    this.valueHolder = initializeValueHolder(reader, footerCount);
-    this.actualCount = 0;
-    this.tempCount = 0;
-  }
-
-  protected boolean doSkipHeader(int recordCount) {
-    return !continuance && recordCount < headerCount;
-  }
-
-  protected void reset() {
-    tempCount = holderIndex + 1;
-    actualCount = 0;
-    if (!continuance) {
-      footerBuffer.clear();
-    }
-  }
-
-  protected Object bufferAdd(Object value) throws SerDeException {
-    footerBuffer.add(value);
-    if (footerBuffer.size() <= footerCount) {
-      return null;
-    }
-    return footerBuffer.poll();
-  }
-
-  protected Object getNextValue() {
-    holderIndex = tempCount % getHolderSize();
-    return valueHolder.get(holderIndex);
-  }
-
-  private int getHolderSize() {
-    return valueHolder.size();
-  }
-
-  protected void updateContinuance() {
-    this.continuance = actualCount != 0;
-  }
-
-  protected int incrementTempCount() {
-    return ++tempCount;
-  }
-
-  protected int getActualCount() {
-    return actualCount;
-  }
-
-  protected int incrementActualCount() {
-    return ++actualCount;
-  }
-
-  /**
-   * Retrieves positive numeric property from Properties object by name.
-   * Return default value if
-   * 1. file format is absent in predefined file formats list
-   * 2. property doesn't exist in table properties
-   * 3. property value is negative
-   * otherwise casts value to int.
-   *
-   * @param tableProperties property holder
-   * @param propertyName    name of the property
-   * @param defaultValue    default value
-   * @return property numeric value
-   * @throws NumberFormatException if property value is non-numeric
-   */
-  protected int retrievePositiveIntProperty(Properties tableProperties, String propertyName, int defaultValue) {
-    int propertyIntValue = defaultValue;
-    if (!fileFormats.contains(tableProperties.get(hive_metastoreConstants.FILE_INPUT_FORMAT))) {
-      return propertyIntValue;
-    }
-    Object propertyObject = tableProperties.get(propertyName);
-    if (propertyObject != null) {
-      try {
-        propertyIntValue = Integer.valueOf((String) propertyObject);
-      } catch (NumberFormatException e) {
-        throw new NumberFormatException(String.format("Hive table property %s value '%s' is non-numeric", propertyName, propertyObject.toString()));
-      }
-    }
-    return propertyIntValue < 0 ? defaultValue : propertyIntValue;
-  }
-
-  /**
-   * Creates buffer of objects to be used as values, so these values can be re-used.
-   * Objects number depends on number of lines to skip in the end of the file plus one object.
-   *
-   * @param reader          RecordReader to return value object
-   * @param skipFooterLines number of lines to skip at the end of the file
-   * @return list of objects to be used as values
-   */
-  private List<Object> initializeValueHolder(RecordReader reader, int skipFooterLines) {
-    List<Object> valueHolder = new ArrayList<>(skipFooterLines + 1);
-    for (int i = 0; i <= skipFooterLines; i++) {
-      valueHolder.add(reader.createValue());
-    }
-    return valueHolder;
-  }
- }
-
 <#else>
   @Override
   public int next() {
@@ -277,7 +158,7 @@ protected class SkipRecordsInspector {
 
     try {
       int recordCount = 0;
-      while (recordCount < TARGET_RECORD_COUNT && reader.next(key, value)) {
+      while (recordCount < TARGET_RECORD_COUNT && hasNextValue(value)) {
         Object deSerializedValue = partitionSerDe.deserialize((Writable) value);
         if (partTblObjectInspectorConverter != null) {
           deSerializedValue = partTblObjectInspectorConverter.convert(deSerializedValue);
@@ -288,7 +169,7 @@ protected class SkipRecordsInspector {
 
       setValueCountAndPopulatePartitionVectors(recordCount);
       return recordCount;
-    } catch (IOException | SerDeException e) {
+    } catch (SerDeException e) {
       throw new DrillRuntimeException(e);
     }
   }

http://git-wip-us.apache.org/repos/asf/drill/blob/f8bbb759/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveAbstractReader.java
----------------------------------------------------------------------
diff --git a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveAbstractReader.java b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveAbstractReader.java
deleted file mode 100644
index 8c6df84..0000000
--- a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveAbstractReader.java
+++ /dev/null
@@ -1,358 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.store.hive;
-
-import java.util.List;
-import java.util.Properties;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutionException;
-
-import com.google.common.util.concurrent.ListenableFuture;
-import io.netty.buffer.DrillBuf;
-import org.apache.drill.common.exceptions.ExecutionSetupException;
-import org.apache.drill.common.expression.SchemaPath;
-import org.apache.drill.common.types.TypeProtos.MajorType;
-import org.apache.drill.exec.exception.SchemaChangeException;
-import org.apache.drill.exec.expr.TypeHelper;
-import org.apache.drill.exec.ops.FragmentContext;
-import org.apache.drill.exec.ops.OperatorContext;
-import org.apache.drill.exec.physical.impl.OutputMutator;
-import org.apache.drill.exec.record.MaterializedField;
-import org.apache.drill.exec.server.options.OptionManager;
-import org.apache.drill.exec.store.AbstractRecordReader;
-import org.apache.drill.exec.vector.AllocationHelper;
-import org.apache.drill.exec.vector.ValueVector;
-import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
-import org.apache.hadoop.hive.metastore.api.FieldSchema;
-import org.apache.hadoop.hive.serde2.ColumnProjectionUtils;
-import org.apache.hadoop.hive.serde2.SerDe;
-import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
-import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorConverters;
-import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorConverters.Converter;
-import org.apache.hadoop.hive.serde2.objectinspector.StructField;
-import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
-import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo;
-import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
-import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
-import org.apache.hadoop.mapred.InputSplit;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.RecordReader;
-import org.apache.hadoop.mapred.Reporter;
-
-import com.google.common.collect.Lists;
-import org.apache.hadoop.security.UserGroupInformation;
-
-
-public abstract class HiveAbstractReader extends AbstractRecordReader {
-  protected static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(HiveAbstractReader.class);
-
-  protected final DrillBuf managedBuffer;
-
-  protected HiveTableWithColumnCache table;
-  protected HivePartition partition;
-  protected InputSplit inputSplit;
-  protected List<String> selectedColumnNames;
-  protected List<StructField> selectedStructFieldRefs = Lists.newArrayList();
-  protected List<TypeInfo> selectedColumnTypes = Lists.newArrayList();
-  protected List<ObjectInspector> selectedColumnObjInspectors = Lists.newArrayList();
-  protected List<HiveFieldConverter> selectedColumnFieldConverters = Lists.newArrayList();
-  protected List<String> selectedPartitionNames = Lists.newArrayList();
-  protected List<TypeInfo> selectedPartitionTypes = Lists.newArrayList();
-  protected List<Object> selectedPartitionValues = Lists.newArrayList();
-
-  // SerDe of the reading partition (or table if the table is non-partitioned)
-  protected SerDe partitionSerDe;
-
-  // ObjectInspector to read data from partitionSerDe (for a non-partitioned table this is same as the table
-  // ObjectInspector).
-  protected StructObjectInspector partitionOI;
-
-  // Final ObjectInspector. We may not use the partitionOI directly if there are schema changes between the table and
-  // partition. If there are no schema changes then this is same as the partitionOI.
-  protected StructObjectInspector finalOI;
-
-  // Converter which converts data from partition schema to table schema.
-  protected Converter partTblObjectInspectorConverter;
-
-  protected Object key;
-  protected RecordReader<Object, Object> reader;
-  protected List<ValueVector> vectors = Lists.newArrayList();
-  protected List<ValueVector> pVectors = Lists.newArrayList();
-  protected boolean empty;
-  protected HiveConf hiveConf;
-  protected FragmentContext fragmentContext;
-  protected String defaultPartitionValue;
-  protected final UserGroupInformation proxyUgi;
-
-
-  protected static final int TARGET_RECORD_COUNT = 4000;
-
-  public HiveAbstractReader(HiveTableWithColumnCache table, HivePartition partition, InputSplit inputSplit, List<SchemaPath> projectedColumns,
-                            FragmentContext context, final HiveConf hiveConf,
-                            UserGroupInformation proxyUgi) throws ExecutionSetupException {
-    this.table = table;
-    this.partition = partition;
-    this.inputSplit = inputSplit;
-    this.empty = (inputSplit == null && partition == null);
-    this.hiveConf = hiveConf;
-    this.fragmentContext = context;
-    this.proxyUgi = proxyUgi;
-    this.managedBuffer = fragmentContext.getManagedBuffer().reallocIfNeeded(256);
-    setColumns(projectedColumns);
-  }
-
-  public abstract void internalInit(Properties tableProperties, RecordReader<Object, Object> reader);
-
-  private void init() throws ExecutionSetupException {
-    final JobConf job = new JobConf(hiveConf);
-
-    // Get the configured default val
-    defaultPartitionValue = hiveConf.get(ConfVars.DEFAULTPARTITIONNAME.varname);
-
-    Properties tableProperties;
-    try {
-      tableProperties = HiveUtilities.getTableMetadata(table);
-      final Properties partitionProperties =
-          (partition == null) ?  tableProperties :
-              HiveUtilities.getPartitionMetadata(partition, table);
-      HiveUtilities.addConfToJob(job, partitionProperties);
-
-      final SerDe tableSerDe = createSerDe(job, table.getSd().getSerdeInfo().getSerializationLib(), tableProperties);
-      final StructObjectInspector tableOI = getStructOI(tableSerDe);
-
-      if (partition != null) {
-        partitionSerDe = createSerDe(job, partition.getSd().getSerdeInfo().getSerializationLib(), partitionProperties);
-        partitionOI = getStructOI(partitionSerDe);
-
-        finalOI = (StructObjectInspector)ObjectInspectorConverters.getConvertedOI(partitionOI, tableOI);
-        partTblObjectInspectorConverter = ObjectInspectorConverters.getConverter(partitionOI, finalOI);
-        job.setInputFormat(HiveUtilities.getInputFormatClass(job, partition.getSd(), table));
-      } else {
-        // For non-partitioned tables, there is no need to create converter as there are no schema changes expected.
-        partitionSerDe = tableSerDe;
-        partitionOI = tableOI;
-        partTblObjectInspectorConverter = null;
-        finalOI = tableOI;
-        job.setInputFormat(HiveUtilities.getInputFormatClass(job, table.getSd(), table));
-      }
-
-      if (logger.isTraceEnabled()) {
-        for (StructField field: finalOI.getAllStructFieldRefs()) {
-          logger.trace("field in finalOI: {}", field.getClass().getName());
-        }
-        logger.trace("partitionSerDe class is {} {}", partitionSerDe.getClass().getName());
-      }
-      // Get list of partition column names
-      final List<String> partitionNames = Lists.newArrayList();
-      for (FieldSchema field : table.getPartitionKeys()) {
-        partitionNames.add(field.getName());
-      }
-
-      // We should always get the columns names from ObjectInspector. For some of the tables (ex. avro) metastore
-      // may not contain the schema, instead it is derived from other sources such as table properties or external file.
-      // SerDe object knows how to get the schema with all the config and table properties passed in initialization.
-      // ObjectInspector created from the SerDe object has the schema.
-      final StructTypeInfo sTypeInfo = (StructTypeInfo) TypeInfoUtils.getTypeInfoFromObjectInspector(finalOI);
-      final List<String> tableColumnNames = sTypeInfo.getAllStructFieldNames();
-
-      // Select list of columns for project pushdown into Hive SerDe readers.
-      final List<Integer> columnIds = Lists.newArrayList();
-      if (isStarQuery()) {
-        selectedColumnNames = tableColumnNames;
-        for(int i=0; i<selectedColumnNames.size(); i++) {
-          columnIds.add(i);
-        }
-        selectedPartitionNames = partitionNames;
-      } else {
-        selectedColumnNames = Lists.newArrayList();
-        for (SchemaPath field : getColumns()) {
-          String columnName = field.getRootSegment().getPath();
-          if (partitionNames.contains(columnName)) {
-            selectedPartitionNames.add(columnName);
-          } else {
-            columnIds.add(tableColumnNames.indexOf(columnName));
-            selectedColumnNames.add(columnName);
-          }
-        }
-      }
-      ColumnProjectionUtils.appendReadColumns(job, columnIds, selectedColumnNames);
-
-      for (String columnName : selectedColumnNames) {
-        StructField fieldRef = finalOI.getStructFieldRef(columnName);
-        selectedStructFieldRefs.add(fieldRef);
-        ObjectInspector fieldOI = fieldRef.getFieldObjectInspector();
-
-        TypeInfo typeInfo = TypeInfoUtils.getTypeInfoFromTypeString(fieldOI.getTypeName());
-
-        selectedColumnObjInspectors.add(fieldOI);
-        selectedColumnTypes.add(typeInfo);
-        selectedColumnFieldConverters.add(HiveFieldConverter.create(typeInfo, fragmentContext));
-      }
-
-      for(int i=0; i<selectedColumnNames.size(); ++i){
-        logger.trace("inspector:typeName={}, className={}, TypeInfo: {}, converter:{}",
-            selectedColumnObjInspectors.get(i).getTypeName(),
-            selectedColumnObjInspectors.get(i).getClass().getName(),
-            selectedColumnTypes.get(i).toString(),
-            selectedColumnFieldConverters.get(i).getClass().getName());
-      }
-
-      for (int i = 0; i < table.getPartitionKeys().size(); i++) {
-        FieldSchema field = table.getPartitionKeys().get(i);
-        if (selectedPartitionNames.contains(field.getName())) {
-          TypeInfo pType = TypeInfoUtils.getTypeInfoFromTypeString(field.getType());
-          selectedPartitionTypes.add(pType);
-
-          if (partition != null) {
-            selectedPartitionValues.add(
-                HiveUtilities.convertPartitionType(pType, partition.getValues().get(i), defaultPartitionValue));
-          }
-        }
-      }
-    } catch (Exception e) {
-      throw new ExecutionSetupException("Failure while initializing Hive Reader " + this.getClass().getName(), e);
-    }
-
-    if (!empty) {
-      try {
-        reader = (org.apache.hadoop.mapred.RecordReader<Object, Object>) job.getInputFormat().getRecordReader(inputSplit, job, Reporter.NULL);
-        logger.trace("hive reader created: {} for inputSplit {}", reader.getClass().getName(), inputSplit.toString());
-      } catch (Exception e) {
-        throw new ExecutionSetupException("Failed to get o.a.hadoop.mapred.RecordReader from Hive InputFormat", e);
-      }
-
-      internalInit(tableProperties, reader);
-    }
-  }
-
-  /**
-   * Utility method which creates a SerDe object for given SerDe class name and properties.
-   */
-  private static SerDe createSerDe(final JobConf job, final String sLib, final Properties properties) throws Exception {
-    final Class<? extends SerDe> c = Class.forName(sLib).asSubclass(SerDe.class);
-    final SerDe serde = c.getConstructor().newInstance();
-    serde.initialize(job, properties);
-
-    return serde;
-  }
-
-  private static StructObjectInspector getStructOI(final SerDe serDe) throws Exception {
-    ObjectInspector oi = serDe.getObjectInspector();
-    if (oi.getCategory() != ObjectInspector.Category.STRUCT) {
-      throw new UnsupportedOperationException(String.format("%s category not supported", oi.getCategory()));
-    }
-    return (StructObjectInspector) oi;
-  }
-
-  @Override
-  public void setup(OperatorContext context, OutputMutator output)
-      throws ExecutionSetupException {
-    // initializes "reader"
-    final Callable<Void> readerInitializer = new Callable<Void>() {
-      @Override
-      public Void call() throws Exception {
-        init();
-        return null;
-      }
-    };
-
-    final ListenableFuture<Void> result = context.runCallableAs(proxyUgi, readerInitializer);
-    try {
-      result.get();
-    } catch (InterruptedException e) {
-      result.cancel(true);
-      // Preserve evidence that the interruption occurred so that code higher up on the call stack can learn of the
-      // interruption and respond to it if it wants to.
-      Thread.currentThread().interrupt();
-    } catch (ExecutionException e) {
-      throw ExecutionSetupException.fromThrowable(e.getMessage(), e);
-    }
-    try {
-      final OptionManager options = fragmentContext.getOptions();
-      for (int i = 0; i < selectedColumnNames.size(); i++) {
-        MajorType type = HiveUtilities.getMajorTypeFromHiveTypeInfo(selectedColumnTypes.get(i), options);
-        MaterializedField field = MaterializedField.create(selectedColumnNames.get(i), type);
-        Class<? extends ValueVector> vvClass = TypeHelper.getValueVectorClass(type.getMinorType(), type.getMode());
-        vectors.add(output.addField(field, vvClass));
-      }
-
-      for (int i = 0; i < selectedPartitionNames.size(); i++) {
-        MajorType type = HiveUtilities.getMajorTypeFromHiveTypeInfo(selectedPartitionTypes.get(i), options);
-        MaterializedField field = MaterializedField.create(selectedPartitionNames.get(i), type);
-        Class<? extends ValueVector> vvClass = TypeHelper.getValueVectorClass(field.getType().getMinorType(), field.getDataMode());
-        pVectors.add(output.addField(field, vvClass));
-      }
-    } catch(SchemaChangeException e) {
-      throw new ExecutionSetupException(e);
-    }
-  }
-
-  /**
-   * To take into account Hive "skip.header.lines.count" property first N values from file are skipped.
-   * Since file can be read in batches (depends on TARGET_RECORD_COUNT), additional checks are made
-   * to determine if it's new file or continuance.
-   *
-   * To take into account Hive "skip.footer.lines.count" property values are buffered in queue
-   * until queue size exceeds number of footer lines to skip, then first value in queue is retrieved.
-   * Buffer of value objects is used to re-use value objects in order to reduce number of created value objects.
-   * For each new file queue is cleared to drop footer lines from previous file.
-   */
-  @Override
-  public abstract int next();
-
-
-
-  protected void setValueCountAndPopulatePartitionVectors(int recordCount) {
-    for (ValueVector v : vectors) {
-      v.getMutator().setValueCount(recordCount);
-    }
-
-    if (partition != null) {
-      populatePartitionVectors(recordCount);
-    }
-  }
-
-  @Override
-  public void close() {
-    try {
-      if (reader != null) {
-        reader.close();
-        reader = null;
-      }
-    } catch (Exception e) {
-      logger.warn("Failure while closing Hive Record reader.", e);
-    }
-  }
-
-  protected void populatePartitionVectors(int recordCount) {
-    for (int i = 0; i < pVectors.size(); i++) {
-      final ValueVector vector = pVectors.get(i);
-      final Object val = selectedPartitionValues.get(i);
-
-      AllocationHelper.allocateNew(vector, recordCount);
-
-      if (val != null) {
-        HiveUtilities.populateVector(vector, managedBuffer, val, 0, recordCount);
-      }
-
-      vector.getMutator().setValueCount(recordCount);
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/drill/blob/f8bbb759/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveDrillNativeParquetSubScan.java
----------------------------------------------------------------------
diff --git a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveDrillNativeParquetSubScan.java b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveDrillNativeParquetSubScan.java
index 1ae7b10..43cf98e 100644
--- a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveDrillNativeParquetSubScan.java
+++ b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveDrillNativeParquetSubScan.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -38,7 +38,7 @@ public class HiveDrillNativeParquetSubScan extends HiveSubScan {
   @JsonCreator
   public HiveDrillNativeParquetSubScan(@JacksonInject StoragePluginRegistry registry,
                                        @JsonProperty("userName") String userName,
-                                       @JsonProperty("splits") List<String> splits,
+                                       @JsonProperty("splits") List<List<String>> splits,
                                        @JsonProperty("hiveReadEntry") HiveReadEntry hiveReadEntry,
                                        @JsonProperty("splitClasses") List<String> splitClasses,
                                        @JsonProperty("columns") List<SchemaPath> columns,

http://git-wip-us.apache.org/repos/asf/drill/blob/f8bbb759/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveDrillNativeScanBatchCreator.java
----------------------------------------------------------------------
diff --git a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveDrillNativeScanBatchCreator.java b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveDrillNativeScanBatchCreator.java
index 0e5314b..60a01a5 100644
--- a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveDrillNativeScanBatchCreator.java
+++ b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveDrillNativeScanBatchCreator.java
@@ -34,6 +34,7 @@ import org.apache.drill.exec.physical.impl.BatchCreator;
 import org.apache.drill.exec.physical.impl.ScanBatch;
 import org.apache.drill.exec.record.RecordBatch;
 import org.apache.drill.exec.store.RecordReader;
+import org.apache.drill.exec.store.hive.readers.HiveDefaultReader;
 import org.apache.drill.exec.store.parquet.ParquetDirectByteBufferAllocator;
 import org.apache.drill.exec.store.parquet.ParquetReaderUtility;
 import org.apache.drill.exec.store.parquet.columnreaders.ParquetRecordReader;
@@ -61,7 +62,7 @@ public class HiveDrillNativeScanBatchCreator implements BatchCreator<HiveDrillNa
   public ScanBatch getBatch(FragmentContext context, HiveDrillNativeParquetSubScan config, List<RecordBatch> children)
       throws ExecutionSetupException {
     final HiveTableWithColumnCache table = config.getTable();
-    final List<InputSplit> splits = config.getInputSplits();
+    final List<List<InputSplit>> splits = config.getInputSplits();
     final List<HivePartition> partitions = config.getPartitions();
     final List<SchemaPath> columns = config.getColumns();
     final String partitionDesignator = context.getOptions()
@@ -102,58 +103,60 @@ public class HiveDrillNativeScanBatchCreator implements BatchCreator<HiveDrillNa
 
     Map<String, String> mapWithMaxColumns = Maps.newLinkedHashMap();
     try {
-      for (InputSplit split : splits) {
-        final FileSplit fileSplit = (FileSplit) split;
-        final Path finalPath = fileSplit.getPath();
-        final JobConf cloneJob =
-            new ProjectionPusher().pushProjectionsAndFilters(new JobConf(conf), finalPath.getParent());
-        final FileSystem fs = finalPath.getFileSystem(cloneJob);
-
-        ParquetMetadata parquetMetadata = footerCache.get(finalPath.toString());
-        if (parquetMetadata == null){
-          parquetMetadata = ParquetFileReader.readFooter(cloneJob, finalPath);
-          footerCache.put(finalPath.toString(), parquetMetadata);
-        }
-        final List<Integer> rowGroupNums = getRowGroupNumbersFromFileSplit(fileSplit, parquetMetadata);
-
-        for(int rowGroupNum : rowGroupNums) {
-          //DRILL-5009 : Skip the row group if the row count is zero
-          if (parquetMetadata.getBlocks().get(rowGroupNum).getRowCount() == 0) {
-            continue;
+      for (List<InputSplit> splitGroups : splits) {
+        for (InputSplit split : splitGroups) {
+          final FileSplit fileSplit = (FileSplit) split;
+          final Path finalPath = fileSplit.getPath();
+          final JobConf cloneJob =
+              new ProjectionPusher().pushProjectionsAndFilters(new JobConf(conf), finalPath.getParent());
+          final FileSystem fs = finalPath.getFileSystem(cloneJob);
+
+          ParquetMetadata parquetMetadata = footerCache.get(finalPath.toString());
+          if (parquetMetadata == null) {
+            parquetMetadata = ParquetFileReader.readFooter(cloneJob, finalPath);
+            footerCache.put(finalPath.toString(), parquetMetadata);
           }
-          // Drill has only ever written a single row group per file, only detect corruption
-          // in the first row group
-          ParquetReaderUtility.DateCorruptionStatus containsCorruptDates =
-              ParquetReaderUtility.detectCorruptDates(parquetMetadata, config.getColumns(), true);
-          if (logger.isDebugEnabled()) {
-            logger.debug(containsCorruptDates.toString());
-          }
-          readers.add(new ParquetRecordReader(
-                  context,
-                  Path.getPathWithoutSchemeAndAuthority(finalPath).toString(),
-                  rowGroupNum, fs,
-                  CodecFactory.createDirectCodecFactory(fs.getConf(),
-                      new ParquetDirectByteBufferAllocator(oContext.getAllocator()), 0),
-                  parquetMetadata,
-                  newColumns,
-                  containsCorruptDates)
-          );
-          Map<String, String> implicitValues = Maps.newLinkedHashMap();
-
-          if (hasPartitions) {
-            List<String> values = partitions.get(currentPartitionIndex).getValues();
-            for (int i = 0; i < values.size(); i++) {
-              if (selectAllQuery || selectedPartitionColumns.contains(i)) {
-                implicitValues.put(partitionDesignator + i, values.get(i));
+          final List<Integer> rowGroupNums = getRowGroupNumbersFromFileSplit(fileSplit, parquetMetadata);
+
+          for (int rowGroupNum : rowGroupNums) {
+            //DRILL-5009 : Skip the row group if the row count is zero
+            if (parquetMetadata.getBlocks().get(rowGroupNum).getRowCount() == 0) {
+              continue;
+            }
+            // Drill has only ever written a single row group per file, only detect corruption
+            // in the first row group
+            ParquetReaderUtility.DateCorruptionStatus containsCorruptDates =
+                ParquetReaderUtility.detectCorruptDates(parquetMetadata, config.getColumns(), true);
+            if (logger.isDebugEnabled()) {
+              logger.debug(containsCorruptDates.toString());
+            }
+            readers.add(new ParquetRecordReader(
+                context,
+                Path.getPathWithoutSchemeAndAuthority(finalPath).toString(),
+                rowGroupNum, fs,
+                CodecFactory.createDirectCodecFactory(fs.getConf(),
+                    new ParquetDirectByteBufferAllocator(oContext.getAllocator()), 0),
+                parquetMetadata,
+                newColumns,
+                containsCorruptDates)
+            );
+            Map<String, String> implicitValues = Maps.newLinkedHashMap();
+
+            if (hasPartitions) {
+              List<String> values = partitions.get(currentPartitionIndex).getValues();
+              for (int i = 0; i < values.size(); i++) {
+                if (selectAllQuery || selectedPartitionColumns.contains(i)) {
+                  implicitValues.put(partitionDesignator + i, values.get(i));
+                }
               }
             }
+            implicitColumns.add(implicitValues);
+            if (implicitValues.size() > mapWithMaxColumns.size()) {
+              mapWithMaxColumns = implicitValues;
+            }
           }
-          implicitColumns.add(implicitValues);
-          if (implicitValues.size() > mapWithMaxColumns.size()) {
-            mapWithMaxColumns = implicitValues;
-          }
+          currentPartitionIndex++;
         }
-        currentPartitionIndex++;
       }
     } catch (final IOException|RuntimeException e) {
       AutoCloseables.close(e, readers);

http://git-wip-us.apache.org/repos/asf/drill/blob/f8bbb759/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveMetadataProvider.java
----------------------------------------------------------------------
diff --git a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveMetadataProvider.java b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveMetadataProvider.java
index 8abc79a..d0259ca 100644
--- a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveMetadataProvider.java
+++ b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveMetadataProvider.java
@@ -21,6 +21,12 @@ import com.google.common.base.Preconditions;
 import com.google.common.base.Stopwatch;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
+import com.google.common.collect.Multimap;
+import com.google.common.collect.Ordering;
+import com.google.common.collect.TreeMultimap;
+import com.google.common.io.ByteArrayDataOutput;
+import com.google.common.io.ByteStreams;
+import org.apache.commons.codec.binary.Base64;
 import org.apache.drill.common.exceptions.DrillRuntimeException;
 import org.apache.drill.exec.util.ImpersonationUtil;
 import org.apache.hadoop.fs.FileSystem;
@@ -31,17 +37,24 @@ import org.apache.hadoop.hive.metastore.MetaStoreUtils;
 import org.apache.hadoop.hive.metastore.api.Partition;
 import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
 import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.FileSplit;
 import org.apache.hadoop.mapred.InputFormat;
 import org.apache.hadoop.mapred.InputSplit;
 import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.TextInputFormat;
 import org.apache.hadoop.security.UserGroupInformation;
 
 import java.io.IOException;
 import java.security.PrivilegedExceptionAction;
+import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
+import java.util.Set;
 import java.util.concurrent.TimeUnit;
 
 /**
@@ -57,9 +70,9 @@ public class HiveMetadataProvider {
   private final HiveReadEntry hiveReadEntry;
   private final UserGroupInformation ugi;
   private final boolean isPartitionedTable;
-  private final Map<Partition, List<InputSplitWrapper>> partitionInputSplitMap;
+  private final Map<Partition, List<LogicalInputSplit>> partitionInputSplitMap;
   private final HiveConf hiveConf;
-  private List<InputSplitWrapper> tableInputSplits;
+  private List<LogicalInputSplit> tableInputSplits;
 
   private final Stopwatch watch = Stopwatch.createUnstarted();
 
@@ -117,7 +130,7 @@ public class HiveMetadataProvider {
   }
 
   /** Helper method which return InputSplits for non-partitioned table */
-  private List<InputSplitWrapper> getTableInputSplits() throws Exception {
+  private List<LogicalInputSplit> getTableInputSplits() throws Exception {
     Preconditions.checkState(!isPartitionedTable, "Works only for non-partitioned tables");
     if (tableInputSplits != null) {
       return tableInputSplits;
@@ -132,34 +145,33 @@ public class HiveMetadataProvider {
   /** Helper method which returns the InputSplits for given partition. InputSplits are cached to speed up subsequent
    * metadata cache requests for the same partition(s).
    */
-  private List<InputSplitWrapper> getPartitionInputSplits(final HivePartition partition) throws Exception {
+  private List<LogicalInputSplit> getPartitionInputSplits(final HivePartition partition) throws Exception {
     if (partitionInputSplitMap.containsKey(partition)) {
       return partitionInputSplitMap.get(partition);
     }
 
     final Properties properties = HiveUtilities.getPartitionMetadata(partition, hiveReadEntry.getTable());
-    final List<InputSplitWrapper> splits = splitInputWithUGI(properties, partition.getSd(), partition);
+    final List<LogicalInputSplit> splits = splitInputWithUGI(properties, partition.getSd(), partition);
     partitionInputSplitMap.put(partition, splits);
 
     return splits;
   }
 
   /**
-   * Return {@link InputSplitWrapper}s for given {@link HiveReadEntry}. First splits are looked up in cache, if not
+   * Return {@link LogicalInputSplit}s for given {@link HiveReadEntry}. First splits are looked up in cache, if not
    * found go through {@link InputFormat#getSplits(JobConf, int)} to find the splits.
    *
    * @param hiveReadEntry Subset of the {@link HiveReadEntry} used when creating this object.
-   *
-   * @return
+   * @return list of logically grouped input splits
    */
-  public List<InputSplitWrapper> getInputSplits(final HiveReadEntry hiveReadEntry) {
+  public List<LogicalInputSplit> getInputSplits(final HiveReadEntry hiveReadEntry) {
     final Stopwatch timeGetSplits = Stopwatch.createStarted();
     try {
       if (!isPartitionedTable) {
         return getTableInputSplits();
       }
 
-      final List<InputSplitWrapper> splits = Lists.newArrayList();
+      final List<LogicalInputSplit> splits = Lists.newArrayList();
       for (HivePartition p : hiveReadEntry.getPartitions()) {
         splits.addAll(getPartitionInputSplits(p));
       }
@@ -220,41 +232,60 @@ public class HiveMetadataProvider {
   }
 
   /**
-   * Estimate the stats from the given list of InputSplits.
-   * @param inputSplits
-   * @return
-   * @throws IOException
+   * Estimate the stats from the given list of logically grouped input splits.
+   *
+   * @param inputSplits list of logically grouped input splits
+   * @return hive stats usually numRows and totalSizeInBytes which used
    */
-  private HiveStats getStatsEstimateFromInputSplits(final List<InputSplitWrapper> inputSplits) throws IOException {
+  private HiveStats getStatsEstimateFromInputSplits(final List<LogicalInputSplit> inputSplits) throws IOException {
     long data = 0;
-    for (final InputSplitWrapper split : inputSplits) {
-      data += split.getSplit().getLength();
+    for (final LogicalInputSplit split : inputSplits) {
+      data += split.getLength();
     }
 
     return new HiveStats(data/RECORD_SIZE, data);
   }
 
-  private List<InputSplitWrapper> splitInputWithUGI(final Properties properties, final StorageDescriptor sd,
-      final Partition partition) throws Exception {
+  /**
+   * Gets list of input splits based on table location.
+   * These input splits are grouped logically by file name
+   * if skip header / footer logic should be applied later on.
+   *
+   * @param properties table or partition properties
+   * @param sd storage descriptor
+   * @param partition hive partition
+   * @return list of logically grouped input splits
+   */
+  private List<LogicalInputSplit> splitInputWithUGI(final Properties properties, final StorageDescriptor sd, final Partition partition) {
     watch.start();
     try {
-      return ugi.doAs(new PrivilegedExceptionAction<List<InputSplitWrapper>>() {
-        public List<InputSplitWrapper> run() throws Exception {
-          final List<InputSplitWrapper> splits = Lists.newArrayList();
+      return ugi.doAs(new PrivilegedExceptionAction<List<LogicalInputSplit>>() {
+        public List<LogicalInputSplit> run() throws Exception {
+          final List<LogicalInputSplit> splits = Lists.newArrayList();
           final JobConf job = new JobConf(hiveConf);
           HiveUtilities.addConfToJob(job, properties);
           job.setInputFormat(HiveUtilities.getInputFormatClass(job, sd, hiveReadEntry.getTable()));
           final Path path = new Path(sd.getLocation());
           final FileSystem fs = path.getFileSystem(job);
-
           if (fs.exists(path)) {
             FileInputFormat.addInputPath(job, path);
             final InputFormat<?, ?> format = job.getInputFormat();
-            for (final InputSplit split : format.getSplits(job, 1)) {
-              splits.add(new InputSplitWrapper(split, partition));
+            InputSplit[] inputSplits = format.getSplits(job, 1);
+
+            // if current table with text input format and has header / footer,
+            // we need to make sure that splits of the same file are grouped together
+            if (TextInputFormat.class.getCanonicalName().equals(sd.getInputFormat()) &&
+                HiveUtilities.hasHeaderOrFooter(hiveReadEntry.getTable())) {
+              Multimap<Path, FileSplit> inputSplitMultimap = transformFileSplits(inputSplits);
+              for (Collection<FileSplit> logicalInputSplit : inputSplitMultimap.asMap().values()) {
+                splits.add(new LogicalInputSplit(logicalInputSplit, partition));
+              }
+            } else {
+              for (final InputSplit split : inputSplits) {
+                splits.add(new LogicalInputSplit(split, partition));
+              }
             }
           }
-
           return splits;
         }
       });
@@ -268,23 +299,114 @@ public class HiveMetadataProvider {
     }
   }
 
-  /** Contains InputSplit along with the Partition. If non-partitioned tables, the partition field is null. */
-  public static class InputSplitWrapper {
-    private InputSplit split;
-    private Partition partition;
+  /**
+   * <p>
+   * Groups input splits by file path. Each inout split group is ordered by starting bytes
+   * to ensure file parts in correct order.
+   * </p>
+   * <p>
+   * Example:
+   * <pre>
+   * hdfs:///tmp/table/file_1.txt  -> hdfs:///tmp/table/file_1.txt:0+10000
+   *                                  hdfs:///tmp/table/file_1.txt:10001+20000
+   * hdfs:///tmp/table/file_2.txt  -> hdfs:///tmp/table/file_2.txt:0+10000
+   * </pre>
+   * </p>
+   * @param inputSplits input splits
+   * @return multimap where key is file path and value is group of ordered file splits
+   */
+  private Multimap<Path, FileSplit> transformFileSplits(InputSplit[] inputSplits) {
+    Multimap<Path, FileSplit> inputSplitGroups = TreeMultimap.create(Ordering.<Path>natural(),
+        new Comparator<FileSplit>() {
+      @Override
+      public int compare(FileSplit f1, FileSplit f2) {
+        return Long.compare(f1.getStart(), f2.getStart());
+      }
+    });
 
-    public InputSplitWrapper(final InputSplit split, final Partition partition) {
-      this.split = split;
+    for (InputSplit inputSplit : inputSplits) {
+      FileSplit fileSplit = (FileSplit) inputSplit;
+      inputSplitGroups.put(fileSplit.getPath(), fileSplit);
+    }
+    return inputSplitGroups;
+  }
+
+  /**
+   * Contains group of input splits along with the partition. For non-partitioned tables, the partition field is null.
+   * Input splits can be logically grouped together, for example, in case of when table has header / footer.
+   * In this case all splits from the same file should be processed together.
+   */
+  public static class LogicalInputSplit {
+
+    private final Collection<InputSplit> inputSplits = new ArrayList<>();
+    private final Partition partition;
+
+    public LogicalInputSplit(InputSplit inputSplit, Partition partition) {
+      inputSplits.add(inputSplit);
+      this.partition = partition;
+    }
+
+    public LogicalInputSplit(Collection<? extends InputSplit> inputSplits, Partition partition) {
+      this.inputSplits.addAll(inputSplits);
       this.partition = partition;
     }
 
-    public InputSplit getSplit() {
-      return split;
+    public Collection<InputSplit> getInputSplits() {
+      return inputSplits;
     }
 
     public Partition getPartition() {
       return partition;
     }
+
+    /**
+     * @return returns total length of all stored input splits
+     */
+    public long getLength() throws IOException {
+      long length = 0L;
+      for (InputSplit inputSplit: inputSplits) {
+        length += inputSplit.getLength();
+      }
+      return length;
+    }
+
+    /**
+     * @return collection of unique locations where inout splits are stored
+     */
+    public Collection<String> getLocations() throws IOException {
+      Set<String> locations = new HashSet<>();
+      for (InputSplit inputSplit: inputSplits) {
+        Collections.addAll(locations, inputSplit.getLocations());
+      }
+      return locations;
+    }
+
+    /**
+     * Serializes each input split to string using Base64 encoding.
+     *
+     * @return list of serialized input splits
+     */
+    public List<String> serialize() throws IOException {
+      List<String> serializedInputSplits = new ArrayList<>();
+      for (InputSplit inputSplit : inputSplits) {
+        final ByteArrayDataOutput byteArrayOutputStream = ByteStreams.newDataOutput();
+        inputSplit.write(byteArrayOutputStream);
+        final String encoded = Base64.encodeBase64String(byteArrayOutputStream.toByteArray());
+        logger.debug("Encoded split string for split {} : {}", inputSplit, encoded);
+        serializedInputSplits.add(encoded);
+      }
+      return serializedInputSplits;
+    }
+
+    /**
+     * @return returns input split class name if at least one input split is present, null otherwise.
+     */
+    public String getType() {
+      if (inputSplits.isEmpty()) {
+        return null;
+      }
+      return inputSplits.iterator().next().getClass().getName();
+    }
   }
 
   /** Contains stats. Currently only numRows and totalSizeInBytes are used. */

http://git-wip-us.apache.org/repos/asf/drill/blob/f8bbb759/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveScan.java
----------------------------------------------------------------------
diff --git a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveScan.java b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveScan.java
index 42fb3e2..cf8a671 100644
--- a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveScan.java
+++ b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveScan.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -23,7 +23,6 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
-import org.apache.commons.codec.binary.Base64;
 import org.apache.drill.common.exceptions.DrillRuntimeException;
 import org.apache.drill.common.exceptions.ExecutionSetupException;
 import org.apache.drill.common.expression.SchemaPath;
@@ -38,14 +37,13 @@ import org.apache.drill.exec.proto.CoordinationProtos;
 import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
 import org.apache.drill.exec.store.StoragePluginRegistry;
 import org.apache.drill.exec.store.hive.HiveMetadataProvider.HiveStats;
-import org.apache.drill.exec.store.hive.HiveMetadataProvider.InputSplitWrapper;
+import org.apache.drill.exec.store.hive.HiveMetadataProvider.LogicalInputSplit;
 import org.apache.drill.exec.store.hive.HiveTableWrapper.HivePartitionWrapper;
 import org.apache.drill.exec.util.Utilities;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.metastore.api.FieldSchema;
 import org.apache.hadoop.hive.metastore.api.Partition;
 import org.apache.hadoop.hive.metastore.api.Table;
-import org.apache.hadoop.mapred.InputSplit;
 
 import com.fasterxml.jackson.annotation.JacksonInject;
 import com.fasterxml.jackson.annotation.JsonCreator;
@@ -54,14 +52,12 @@ import com.fasterxml.jackson.annotation.JsonProperty;
 import com.fasterxml.jackson.annotation.JsonTypeName;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
-import com.google.common.io.ByteArrayDataOutput;
-import com.google.common.io.ByteStreams;
 
 import static org.apache.drill.exec.store.hive.DrillHiveMetaStoreClient.createPartitionWithSpecColumns;
 
 @JsonTypeName("hive-scan")
 public class HiveScan extends AbstractGroupScan {
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(HiveScan.class);
+  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(HiveScan.class);
 
   private static int HIVE_SERDE_SCAN_OVERHEAD_FACTOR_PER_COLUMN = 20;
 
@@ -78,10 +74,10 @@ public class HiveScan extends AbstractGroupScan {
   protected final HiveMetadataProvider metadataProvider;
 
   @JsonIgnore
-  private List<List<InputSplitWrapper>> mappings;
+  private List<List<LogicalInputSplit>> mappings;
 
   @JsonIgnore
-  protected List<InputSplitWrapper> inputSplits;
+  protected List<LogicalInputSplit> inputSplits;
 
   @JsonCreator
   public HiveScan(@JsonProperty("userName") final String userName,
@@ -121,7 +117,7 @@ public class HiveScan extends AbstractGroupScan {
     return columns;
   }
 
-  protected List<InputSplitWrapper> getInputSplits() {
+  protected List<LogicalInputSplit> getInputSplits() {
     if (inputSplits == null) {
       inputSplits = metadataProvider.getInputSplits(hiveReadEntry);
     }
@@ -131,41 +127,33 @@ public class HiveScan extends AbstractGroupScan {
 
   @Override
   public void applyAssignments(final List<CoordinationProtos.DrillbitEndpoint> endpoints) {
-    mappings = Lists.newArrayList();
+    mappings = new ArrayList<>();
     for (int i = 0; i < endpoints.size(); i++) {
-      mappings.add(new ArrayList<InputSplitWrapper>());
+      mappings.add(new ArrayList<LogicalInputSplit>());
     }
     final int count = endpoints.size();
-    final List<InputSplitWrapper> inputSplits = getInputSplits();
+    final List<LogicalInputSplit> inputSplits = getInputSplits();
     for (int i = 0; i < inputSplits.size(); i++) {
       mappings.get(i % count).add(inputSplits.get(i));
     }
   }
 
-  public static String serializeInputSplit(final InputSplit split) throws IOException {
-    final ByteArrayDataOutput byteArrayOutputStream =  ByteStreams.newDataOutput();
-    split.write(byteArrayOutputStream);
-    final String encoded = Base64.encodeBase64String(byteArrayOutputStream.toByteArray());
-    logger.debug("Encoded split string for split {} : {}", split, encoded);
-    return encoded;
-  }
-
   @Override
   public SubScan getSpecificScan(final int minorFragmentId) throws ExecutionSetupException {
     try {
-      final List<InputSplitWrapper> splits = mappings.get(minorFragmentId);
+      final List<LogicalInputSplit> splits = mappings.get(minorFragmentId);
       List<HivePartitionWrapper> parts = Lists.newArrayList();
-      final List<String> encodedInputSplits = Lists.newArrayList();
+      final List<List<String>> encodedInputSplits = Lists.newArrayList();
       final List<String> splitTypes = Lists.newArrayList();
-      for (final InputSplitWrapper split : splits) {
+      for (final LogicalInputSplit split : splits) {
         final Partition splitPartition = split.getPartition();
         if (splitPartition != null) {
           HiveTableWithColumnCache table = hiveReadEntry.getTable();
           parts.add(createPartitionWithSpecColumns(new HiveTableWithColumnCache(table, new ColumnListsCache(table)), splitPartition));
         }
 
-        encodedInputSplits.add(serializeInputSplit(split.getSplit()));
-        splitTypes.add(split.getSplit().getClass().getName());
+        encodedInputSplits.add(split.serialize());
+        splitTypes.add(split.getType());
       }
       if (parts.size() <= 0) {
         parts = null;
@@ -193,13 +181,13 @@ public class HiveScan extends AbstractGroupScan {
     final Map<DrillbitEndpoint, EndpointAffinity> affinityMap = new HashMap<>();
     try {
       long totalSize = 0;
-      final List<InputSplitWrapper> inputSplits = getInputSplits();
-      for (final InputSplitWrapper split : inputSplits) {
-        totalSize += Math.max(1, split.getSplit().getLength());
+      final List<LogicalInputSplit> inputSplits = getInputSplits();
+      for (final LogicalInputSplit split : inputSplits) {
+        totalSize += Math.max(1, split.getLength());
       }
-      for (final InputSplitWrapper split : inputSplits) {
-        final float affinity = ((float) Math.max(1, split.getSplit().getLength())) / totalSize;
-        for (final String loc : split.getSplit().getLocations()) {
+      for (final LogicalInputSplit split : inputSplits) {
+        final float affinity = ((float) Math.max(1, split.getLength())) / totalSize;
+        for (final String loc : split.getLocations()) {
           logger.debug("split location: {}", loc);
           final DrillbitEndpoint endpoint = endpointMap.get(loc);
           if (endpoint != null) {

http://git-wip-us.apache.org/repos/asf/drill/blob/f8bbb759/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveScanBatchCreator.java
----------------------------------------------------------------------
diff --git a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveScanBatchCreator.java b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveScanBatchCreator.java
index e287f68..3df8fd9 100644
--- a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveScanBatchCreator.java
+++ b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveScanBatchCreator.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -17,81 +17,24 @@
  */
 package org.apache.drill.exec.store.hive;
 
-import java.lang.reflect.Constructor;
-import java.util.HashMap;
 import java.util.List;
-import java.util.Map;
 
 import org.apache.drill.common.exceptions.ExecutionSetupException;
 import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.physical.impl.BatchCreator;
 import org.apache.drill.exec.physical.impl.ScanBatch;
 import org.apache.drill.exec.record.RecordBatch;
-import org.apache.drill.exec.store.RecordReader;
-import org.apache.drill.exec.util.ImpersonationUtil;
-import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.ql.io.RCFileInputFormat;
-import org.apache.hadoop.hive.ql.io.avro.AvroContainerInputFormat;
-import org.apache.hadoop.hive.ql.io.orc.OrcInputFormat;
-import org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat;
-import org.apache.hadoop.mapred.InputSplit;
-
-import com.google.common.collect.Lists;
-import org.apache.hadoop.mapred.TextInputFormat;
-import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.drill.exec.store.hive.readers.initilializers.AbstractReadersInitializer;
+import org.apache.drill.exec.store.hive.readers.initilializers.ReadersInitializer;
 
 @SuppressWarnings("unused")
 public class HiveScanBatchCreator implements BatchCreator<HiveSubScan> {
 
-  /**
-   * Use different classes for different Hive native formats:
-   * ORC, AVRO, RCFFile, Text and Parquet.
-   * If input format is none of them falls to default reader.
-   */
-  static Map<String, Class> readerMap = new HashMap<>();
-  static {
-    readerMap.put(OrcInputFormat.class.getCanonicalName(), HiveOrcReader.class);
-    readerMap.put(AvroContainerInputFormat.class.getCanonicalName(), HiveAvroReader.class);
-    readerMap.put(RCFileInputFormat.class.getCanonicalName(), HiveRCFileReader.class);
-    readerMap.put(MapredParquetInputFormat.class.getCanonicalName(), HiveParquetReader.class);
-    readerMap.put(TextInputFormat.class.getCanonicalName(), HiveTextReader.class);
-  }
-
   @Override
   public ScanBatch getBatch(FragmentContext context, HiveSubScan config, List<RecordBatch> children)
       throws ExecutionSetupException {
-    List<RecordReader> readers = Lists.newArrayList();
-    HiveTableWithColumnCache table = config.getTable();
-    List<InputSplit> splits = config.getInputSplits();
-    List<HivePartition> partitions = config.getPartitions();
-    boolean hasPartitions = (partitions != null && partitions.size() > 0);
-    int i = 0;
-    final UserGroupInformation proxyUgi = ImpersonationUtil.createProxyUgi(config.getUserName(),
-      context.getQueryUserName());
-
-    final HiveConf hiveConf = config.getHiveConf();
-
-    final String formatName = table.getSd().getInputFormat();
-    Class<? extends HiveAbstractReader> readerClass = HiveDefaultReader.class;
-    if (readerMap.containsKey(formatName)) {
-      readerClass = readerMap.get(formatName);
-    }
-    Constructor<? extends HiveAbstractReader> readerConstructor = null;
-    try {
-      readerConstructor = readerClass.getConstructor(HiveTableWithColumnCache.class, HivePartition.class,
-          InputSplit.class, List.class, FragmentContext.class, HiveConf.class,
-          UserGroupInformation.class);
-      for (InputSplit split : splits) {
-        readers.add(readerConstructor.newInstance(table,
-            (hasPartitions ? partitions.get(i++) : null), split, config.getColumns(), context, hiveConf, proxyUgi));
-      }
-      if (readers.size() == 0) {
-        readers.add(readerConstructor.newInstance(
-            table, null, null, config.getColumns(), context, hiveConf, proxyUgi));
-      }
-    } catch(Exception e) {
-      logger.error("No constructor for {}, thrown {}", readerClass.getName(), e);
-    }
-    return new ScanBatch(config, context, readers);
+    AbstractReadersInitializer readersInitializer = ReadersInitializer.getInitializer(context, config);
+    return new ScanBatch(config, context, readersInitializer.init());
   }
+
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/f8bbb759/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveSubScan.java
----------------------------------------------------------------------
diff --git a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveSubScan.java b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveSubScan.java
index 107188c..a1990a0 100644
--- a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveSubScan.java
+++ b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveSubScan.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -19,6 +19,7 @@ package org.apache.drill.exec.store.hive;
 
 import java.io.IOException;
 import java.lang.reflect.Constructor;
+import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.List;
 
@@ -40,7 +41,6 @@ import com.fasterxml.jackson.annotation.JsonIgnore;
 import com.fasterxml.jackson.annotation.JsonProperty;
 import com.fasterxml.jackson.annotation.JsonTypeName;
 import com.google.common.collect.Iterators;
-import com.google.common.collect.Lists;
 import com.google.common.io.ByteArrayDataInput;
 import com.google.common.io.ByteStreams;
 
@@ -49,7 +49,7 @@ public class HiveSubScan extends AbstractBase implements SubScan {
   protected HiveReadEntry hiveReadEntry;
 
   @JsonIgnore
-  protected List<InputSplit> inputSplits = Lists.newArrayList();
+  protected List<List<InputSplit>> inputSplits = new ArrayList<>();
   @JsonIgnore
   protected HiveTableWithColumnCache table;
   @JsonIgnore
@@ -57,14 +57,14 @@ public class HiveSubScan extends AbstractBase implements SubScan {
   @JsonIgnore
   protected HiveStoragePlugin storagePlugin;
 
-  private List<String> splits;
+  private List<List<String>> splits;
   private List<String> splitClasses;
   protected List<SchemaPath> columns;
 
   @JsonCreator
   public HiveSubScan(@JacksonInject StoragePluginRegistry registry,
                      @JsonProperty("userName") String userName,
-                     @JsonProperty("splits") List<String> splits,
+                     @JsonProperty("splits") List<List<String>> splits,
                      @JsonProperty("hiveReadEntry") HiveReadEntry hiveReadEntry,
                      @JsonProperty("splitClasses") List<String> splitClasses,
                      @JsonProperty("columns") List<SchemaPath> columns,
@@ -73,7 +73,7 @@ public class HiveSubScan extends AbstractBase implements SubScan {
     this(userName, splits, hiveReadEntry, splitClasses, columns, (HiveStoragePlugin)registry.getPlugin(pluginName));
   }
 
-  public HiveSubScan(final String userName, final List<String> splits, final HiveReadEntry hiveReadEntry,
+  public HiveSubScan(final String userName, final List<List<String>> splits, final HiveReadEntry hiveReadEntry,
       final List<String> splitClasses, final List<SchemaPath> columns, final HiveStoragePlugin plugin)
     throws IOException, ReflectiveOperationException {
     super(userName);
@@ -101,7 +101,7 @@ public class HiveSubScan extends AbstractBase implements SubScan {
     return storagePlugin;
   }
 
-  public List<String> getSplits() {
+  public List<List<String>> getSplits() {
     return splits;
   }
 
@@ -121,7 +121,7 @@ public class HiveSubScan extends AbstractBase implements SubScan {
     return columns;
   }
 
-  public List<InputSplit> getInputSplits() {
+  public List<List<InputSplit>> getInputSplits() {
     return inputSplits;
   }
 
@@ -129,16 +129,20 @@ public class HiveSubScan extends AbstractBase implements SubScan {
     return hiveReadEntry;
   }
 
-  public static InputSplit deserializeInputSplit(String base64, String className) throws IOException, ReflectiveOperationException{
+  public static List<InputSplit> deserializeInputSplit(List<String> base64, String className) throws IOException, ReflectiveOperationException{
     Constructor<?> constructor = Class.forName(className).getDeclaredConstructor();
     if (constructor == null) {
       throw new ReflectiveOperationException("Class " + className + " does not implement a default constructor.");
     }
     constructor.setAccessible(true);
-    InputSplit split = (InputSplit) constructor.newInstance();
-    ByteArrayDataInput byteArrayDataInput = ByteStreams.newDataInput(Base64.decodeBase64(base64));
-    split.readFields(byteArrayDataInput);
-    return split;
+    List<InputSplit> splits = new ArrayList<>();
+    for (String str : base64) {
+      InputSplit split = (InputSplit) constructor.newInstance();
+      ByteArrayDataInput byteArrayDataInput = ByteStreams.newDataInput(Base64.decodeBase64(str));
+      split.readFields(byteArrayDataInput);
+      splits.add(split);
+    }
+    return splits;
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/drill/blob/f8bbb759/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveUtilities.java
----------------------------------------------------------------------
diff --git a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveUtilities.java b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveUtilities.java
index 1e5ea6c..4164649 100644
--- a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveUtilities.java
+++ b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveUtilities.java
@@ -51,13 +51,13 @@ import org.apache.drill.exec.vector.ValueVector;
 import org.apache.drill.exec.work.ExecErrorConstants;
 
 import org.apache.hadoop.hive.common.type.HiveDecimal;
-import org.apache.hadoop.hive.common.type.HiveVarchar;
 import org.apache.hadoop.hive.metastore.MetaStoreUtils;
 import org.apache.hadoop.hive.metastore.api.Partition;
 import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
 import org.apache.hadoop.hive.metastore.api.Table;
 import org.apache.hadoop.hive.ql.metadata.HiveStorageHandler;
 import org.apache.hadoop.hive.ql.metadata.HiveUtils;
+import org.apache.hadoop.hive.serde.serdeConstants;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector.Category;
 import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector.PrimitiveCategory;
 import org.apache.hadoop.hive.serde2.typeinfo.BaseCharTypeInfo;
@@ -80,7 +80,7 @@ import java.util.Properties;
 import static org.apache.hadoop.hive.metastore.api.hive_metastoreConstants.META_TABLE_STORAGE;
 
 public class HiveUtilities {
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(HiveUtilities.class);
+  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(HiveUtilities.class);
 
   /** Partition value is received in string format. Convert it into appropriate object based on the type. */
   public static Object convertPartitionType(TypeInfo typeInfo, String value, final String defaultPartitionValue) {
@@ -333,7 +333,7 @@ public class HiveUtilities {
         return TypeProtos.MinorType.BIT;
       case DECIMAL: {
 
-        if (options.getOption(PlannerSettings.ENABLE_DECIMAL_DATA_TYPE_KEY).bool_val == false) {
+        if (!options.getOption(PlannerSettings.ENABLE_DECIMAL_DATA_TYPE_KEY).bool_val) {
           throw UserException.unsupportedError()
               .message(ExecErrorConstants.DECIMAL_DISABLE_ERR_MSG)
               .build(logger);
@@ -397,7 +397,6 @@ public class HiveUtilities {
    *
    * @param job {@link JobConf} instance.
    * @param properties New config properties
-   * @param hiveConf HiveConf of Hive storage plugin
    */
   public static void addConfToJob(final JobConf job, final Properties properties) {
     for (Object obj : properties.keySet()) {
@@ -470,5 +469,43 @@ public class HiveUtilities {
         .message(errMsg.toString())
         .build(logger);
   }
+
+  /**
+   * Returns property value. If property is absent, return given default value.
+   * If property value is non-numeric will fail.
+   *
+   * @param tableProperties table properties
+   * @param propertyName property name
+   * @param defaultValue default value used in case if property is absent
+   * @return property value
+   * @throws NumberFormatException if property value is not numeric
+   */
+  public static int retrieveIntProperty(Properties tableProperties, String propertyName, int defaultValue) {
+    Object propertyObject = tableProperties.get(propertyName);
+    if (propertyObject == null) {
+      return defaultValue;
+    }
+
+    try {
+      return Integer.valueOf(propertyObject.toString());
+    } catch (NumberFormatException e) {
+      throw new NumberFormatException(String.format("Hive table property %s value '%s' is non-numeric",
+          propertyName, propertyObject.toString()));
+    }
+  }
+
+  /**
+   * Checks if given table has header or footer.
+   * If at least one of them has value more then zero, method will return true.
+   *
+   * @param table table with column cache instance
+   * @return true if table contains header or footer, false otherwise
+   */
+  public static boolean hasHeaderOrFooter(HiveTableWithColumnCache table) {
+    Properties tableProperties = getTableMetadata(table);
+    int skipHeader = retrieveIntProperty(tableProperties, serdeConstants.HEADER_COUNT, -1);
+    int skipFooter = retrieveIntProperty(tableProperties, serdeConstants.FOOTER_COUNT, -1);
+    return skipHeader > 0 || skipFooter > 0;
+  }
 }