You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by pa...@apache.org on 2017/11/22 22:52:09 UTC

[07/12] drill git commit: DRILL-5941: Skip header / footer improvements for Hive storage plugin

http://git-wip-us.apache.org/repos/asf/drill/blob/f8bbb759/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/readers/HiveAbstractReader.java
----------------------------------------------------------------------
diff --git a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/readers/HiveAbstractReader.java b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/readers/HiveAbstractReader.java
new file mode 100644
index 0000000..9df721b
--- /dev/null
+++ b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/readers/HiveAbstractReader.java
@@ -0,0 +1,417 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.hive.readers;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+
+import com.google.common.util.concurrent.ListenableFuture;
+import io.netty.buffer.DrillBuf;
+import org.apache.drill.common.exceptions.DrillRuntimeException;
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.common.types.TypeProtos.MajorType;
+import org.apache.drill.exec.exception.SchemaChangeException;
+import org.apache.drill.exec.expr.TypeHelper;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.ops.OperatorContext;
+import org.apache.drill.exec.physical.impl.OutputMutator;
+import org.apache.drill.exec.record.MaterializedField;
+import org.apache.drill.exec.server.options.OptionManager;
+import org.apache.drill.exec.store.AbstractRecordReader;
+import org.apache.drill.exec.store.hive.HiveFieldConverter;
+import org.apache.drill.exec.store.hive.HivePartition;
+import org.apache.drill.exec.store.hive.HiveTableWithColumnCache;
+import org.apache.drill.exec.store.hive.HiveUtilities;
+import org.apache.drill.exec.vector.AllocationHelper;
+import org.apache.drill.exec.vector.ValueVector;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.serde2.ColumnProjectionUtils;
+import org.apache.hadoop.hive.serde2.SerDe;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorConverters;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorConverters.Converter;
+import org.apache.hadoop.hive.serde2.objectinspector.StructField;
+import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
+import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapred.Reporter;
+
+import com.google.common.collect.Lists;
+import org.apache.hadoop.security.UserGroupInformation;
+
+
+public abstract class HiveAbstractReader extends AbstractRecordReader {
+  protected static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(HiveAbstractReader.class);
+
+  protected final DrillBuf managedBuffer;
+
+  protected HiveTableWithColumnCache table;
+  protected HivePartition partition;
+  protected Iterator<InputSplit> inputSplitsIterator;
+  protected List<String> selectedColumnNames;
+  protected List<StructField> selectedStructFieldRefs = Lists.newArrayList();
+  protected List<TypeInfo> selectedColumnTypes = Lists.newArrayList();
+  protected List<ObjectInspector> selectedColumnObjInspectors = Lists.newArrayList();
+  protected List<HiveFieldConverter> selectedColumnFieldConverters = Lists.newArrayList();
+  protected List<String> selectedPartitionNames = Lists.newArrayList();
+  protected List<TypeInfo> selectedPartitionTypes = Lists.newArrayList();
+  protected List<Object> selectedPartitionValues = Lists.newArrayList();
+
+  // SerDe of the reading partition (or table if the table is non-partitioned)
+  protected SerDe partitionSerDe;
+
+  // ObjectInspector to read data from partitionSerDe (for a non-partitioned table this is same as the table
+  // ObjectInspector).
+  protected StructObjectInspector partitionOI;
+
+  // Final ObjectInspector. We may not use the partitionOI directly if there are schema changes between the table and
+  // partition. If there are no schema changes then this is same as the partitionOI.
+  protected StructObjectInspector finalOI;
+
+  // Converter which converts data from partition schema to table schema.
+  protected Converter partTblObjectInspectorConverter;
+
+  protected Object key;
+  protected RecordReader<Object, Object> reader;
+  protected List<ValueVector> vectors = Lists.newArrayList();
+  protected List<ValueVector> pVectors = Lists.newArrayList();
+  protected boolean empty;
+  protected HiveConf hiveConf;
+  protected FragmentContext fragmentContext;
+  protected String defaultPartitionValue;
+  protected final UserGroupInformation proxyUgi;
+  protected JobConf job;
+
+
+  public static final int TARGET_RECORD_COUNT = 4000;
+
+  public HiveAbstractReader(HiveTableWithColumnCache table, HivePartition partition, Collection<InputSplit> inputSplits, List<SchemaPath> projectedColumns,
+                            FragmentContext context, final HiveConf hiveConf,
+                            UserGroupInformation proxyUgi) throws ExecutionSetupException {
+    this.table = table;
+    this.partition = partition;
+    this.empty = (inputSplits == null || inputSplits.isEmpty());
+    this.inputSplitsIterator = empty ? Collections.<InputSplit>emptyIterator() : inputSplits.iterator();
+    this.hiveConf = hiveConf;
+    this.fragmentContext = context;
+    this.proxyUgi = proxyUgi;
+    this.managedBuffer = fragmentContext.getManagedBuffer().reallocIfNeeded(256);
+    setColumns(projectedColumns);
+  }
+
+  public abstract void internalInit(Properties tableProperties, RecordReader<Object, Object> reader);
+
+  private void init() throws ExecutionSetupException {
+    job = new JobConf(hiveConf);
+
+    // Get the configured default val
+    defaultPartitionValue = hiveConf.get(ConfVars.DEFAULTPARTITIONNAME.varname);
+
+    Properties tableProperties;
+    try {
+      tableProperties = HiveUtilities.getTableMetadata(table);
+      final Properties partitionProperties =
+          (partition == null) ?  tableProperties :
+              HiveUtilities.getPartitionMetadata(partition, table);
+      HiveUtilities.addConfToJob(job, partitionProperties);
+
+      final SerDe tableSerDe = createSerDe(job, table.getSd().getSerdeInfo().getSerializationLib(), tableProperties);
+      final StructObjectInspector tableOI = getStructOI(tableSerDe);
+
+      if (partition != null) {
+        partitionSerDe = createSerDe(job, partition.getSd().getSerdeInfo().getSerializationLib(), partitionProperties);
+        partitionOI = getStructOI(partitionSerDe);
+
+        finalOI = (StructObjectInspector)ObjectInspectorConverters.getConvertedOI(partitionOI, tableOI);
+        partTblObjectInspectorConverter = ObjectInspectorConverters.getConverter(partitionOI, finalOI);
+        job.setInputFormat(HiveUtilities.getInputFormatClass(job, partition.getSd(), table));
+      } else {
+        // For non-partitioned tables, there is no need to create converter as there are no schema changes expected.
+        partitionSerDe = tableSerDe;
+        partitionOI = tableOI;
+        partTblObjectInspectorConverter = null;
+        finalOI = tableOI;
+        job.setInputFormat(HiveUtilities.getInputFormatClass(job, table.getSd(), table));
+      }
+
+      if (logger.isTraceEnabled()) {
+        for (StructField field: finalOI.getAllStructFieldRefs()) {
+          logger.trace("field in finalOI: {}", field.getClass().getName());
+        }
+        logger.trace("partitionSerDe class is {} {}", partitionSerDe.getClass().getName());
+      }
+      // Get list of partition column names
+      final List<String> partitionNames = Lists.newArrayList();
+      for (FieldSchema field : table.getPartitionKeys()) {
+        partitionNames.add(field.getName());
+      }
+
+      // We should always get the columns names from ObjectInspector. For some of the tables (ex. avro) metastore
+      // may not contain the schema, instead it is derived from other sources such as table properties or external file.
+      // SerDe object knows how to get the schema with all the config and table properties passed in initialization.
+      // ObjectInspector created from the SerDe object has the schema.
+      final StructTypeInfo sTypeInfo = (StructTypeInfo) TypeInfoUtils.getTypeInfoFromObjectInspector(finalOI);
+      final List<String> tableColumnNames = sTypeInfo.getAllStructFieldNames();
+
+      // Select list of columns for project pushdown into Hive SerDe readers.
+      final List<Integer> columnIds = Lists.newArrayList();
+      if (isStarQuery()) {
+        selectedColumnNames = tableColumnNames;
+        for(int i=0; i<selectedColumnNames.size(); i++) {
+          columnIds.add(i);
+        }
+        selectedPartitionNames = partitionNames;
+      } else {
+        selectedColumnNames = Lists.newArrayList();
+        for (SchemaPath field : getColumns()) {
+          String columnName = field.getRootSegment().getPath();
+          if (partitionNames.contains(columnName)) {
+            selectedPartitionNames.add(columnName);
+          } else {
+            columnIds.add(tableColumnNames.indexOf(columnName));
+            selectedColumnNames.add(columnName);
+          }
+        }
+      }
+      ColumnProjectionUtils.appendReadColumns(job, columnIds, selectedColumnNames);
+
+      for (String columnName : selectedColumnNames) {
+        StructField fieldRef = finalOI.getStructFieldRef(columnName);
+        selectedStructFieldRefs.add(fieldRef);
+        ObjectInspector fieldOI = fieldRef.getFieldObjectInspector();
+
+        TypeInfo typeInfo = TypeInfoUtils.getTypeInfoFromTypeString(fieldOI.getTypeName());
+
+        selectedColumnObjInspectors.add(fieldOI);
+        selectedColumnTypes.add(typeInfo);
+        selectedColumnFieldConverters.add(HiveFieldConverter.create(typeInfo, fragmentContext));
+      }
+
+      for(int i=0; i<selectedColumnNames.size(); ++i){
+        logger.trace("inspector:typeName={}, className={}, TypeInfo: {}, converter:{}",
+            selectedColumnObjInspectors.get(i).getTypeName(),
+            selectedColumnObjInspectors.get(i).getClass().getName(),
+            selectedColumnTypes.get(i).toString(),
+            selectedColumnFieldConverters.get(i).getClass().getName());
+      }
+
+      for (int i = 0; i < table.getPartitionKeys().size(); i++) {
+        FieldSchema field = table.getPartitionKeys().get(i);
+        if (selectedPartitionNames.contains(field.getName())) {
+          TypeInfo pType = TypeInfoUtils.getTypeInfoFromTypeString(field.getType());
+          selectedPartitionTypes.add(pType);
+
+          if (partition != null) {
+            selectedPartitionValues.add(
+                HiveUtilities.convertPartitionType(pType, partition.getValues().get(i), defaultPartitionValue));
+          }
+        }
+      }
+    } catch (Exception e) {
+      throw new ExecutionSetupException("Failure while initializing Hive Reader " + this.getClass().getName(), e);
+    }
+
+    if (!empty && initNextReader(job)) {
+      internalInit(tableProperties, reader);
+    }
+  }
+
+  /**
+   * Initializes next reader if available, will close previous reader if any.
+   *
+   * @param job map / reduce job configuration.
+   * @return true if new reader was initialized, false is no more readers are available
+   * @throws ExecutionSetupException if could not init record reader
+   */
+  protected boolean initNextReader(JobConf job) throws ExecutionSetupException {
+    if (inputSplitsIterator.hasNext()) {
+      if (reader != null) {
+        closeReader();
+      }
+      InputSplit inputSplit = inputSplitsIterator.next();
+      try {
+        reader = (org.apache.hadoop.mapred.RecordReader<Object, Object>) job.getInputFormat().getRecordReader(inputSplit, job, Reporter.NULL);
+        logger.trace("hive reader created: {} for inputSplit {}", reader.getClass().getName(), inputSplit.toString());
+      } catch (Exception e) {
+        throw new ExecutionSetupException("Failed to get o.a.hadoop.mapred.RecordReader from Hive InputFormat", e);
+      }
+      return true;
+    }
+    return false;
+  }
+
+  /**
+   * Utility method which creates a SerDe object for given SerDe class name and properties.
+   */
+  private static SerDe createSerDe(final JobConf job, final String sLib, final Properties properties) throws Exception {
+    final Class<? extends SerDe> c = Class.forName(sLib).asSubclass(SerDe.class);
+    final SerDe serde = c.getConstructor().newInstance();
+    serde.initialize(job, properties);
+
+    return serde;
+  }
+
+  private static StructObjectInspector getStructOI(final SerDe serDe) throws Exception {
+    ObjectInspector oi = serDe.getObjectInspector();
+    if (oi.getCategory() != ObjectInspector.Category.STRUCT) {
+      throw new UnsupportedOperationException(String.format("%s category not supported", oi.getCategory()));
+    }
+    return (StructObjectInspector) oi;
+  }
+
+  @Override
+  public void setup(OperatorContext context, OutputMutator output)
+      throws ExecutionSetupException {
+    // initializes "reader"
+    final Callable<Void> readerInitializer = new Callable<Void>() {
+      @Override
+      public Void call() throws Exception {
+        init();
+        return null;
+      }
+    };
+
+    final ListenableFuture<Void> result = context.runCallableAs(proxyUgi, readerInitializer);
+    try {
+      result.get();
+    } catch (InterruptedException e) {
+      result.cancel(true);
+      // Preserve evidence that the interruption occurred so that code higher up on the call stack can learn of the
+      // interruption and respond to it if it wants to.
+      Thread.currentThread().interrupt();
+    } catch (ExecutionException e) {
+      throw ExecutionSetupException.fromThrowable(e.getMessage(), e);
+    }
+    try {
+      final OptionManager options = fragmentContext.getOptions();
+      for (int i = 0; i < selectedColumnNames.size(); i++) {
+        MajorType type = HiveUtilities.getMajorTypeFromHiveTypeInfo(selectedColumnTypes.get(i), options);
+        MaterializedField field = MaterializedField.create(selectedColumnNames.get(i), type);
+        Class<? extends ValueVector> vvClass = TypeHelper.getValueVectorClass(type.getMinorType(), type.getMode());
+        vectors.add(output.addField(field, vvClass));
+      }
+
+      for (int i = 0; i < selectedPartitionNames.size(); i++) {
+        MajorType type = HiveUtilities.getMajorTypeFromHiveTypeInfo(selectedPartitionTypes.get(i), options);
+        MaterializedField field = MaterializedField.create(selectedPartitionNames.get(i), type);
+        Class<? extends ValueVector> vvClass = TypeHelper.getValueVectorClass(field.getType().getMinorType(), field.getDataMode());
+        pVectors.add(output.addField(field, vvClass));
+      }
+    } catch(SchemaChangeException e) {
+      throw new ExecutionSetupException(e);
+    }
+  }
+
+  @Override
+  public abstract int next();
+
+  protected void setValueCountAndPopulatePartitionVectors(int recordCount) {
+    for (ValueVector v : vectors) {
+      v.getMutator().setValueCount(recordCount);
+    }
+
+    if (partition != null) {
+      populatePartitionVectors(recordCount);
+    }
+  }
+
+  protected void readHiveRecordAndInsertIntoRecordBatch(Object deSerializedValue, int outputRecordIndex) {
+    for (int i = 0; i < selectedStructFieldRefs.size(); i++) {
+      Object hiveValue = finalOI.getStructFieldData(deSerializedValue, selectedStructFieldRefs.get(i));
+      if (hiveValue != null) {
+        selectedColumnFieldConverters.get(i).setSafeValue(selectedColumnObjInspectors.get(i), hiveValue,
+            vectors.get(i), outputRecordIndex);
+      }
+    }
+  }
+
+  @Override
+  public void close() {
+    closeReader();
+  }
+
+  /**
+   * Will close record reader if any. Any exception will be logged as warning.
+   */
+  private void closeReader() {
+    try {
+      if (reader != null) {
+        reader.close();
+        reader = null;
+      }
+    } catch (Exception e) {
+      logger.warn("Failure while closing Hive Record reader.", e);
+    }
+  }
+
+  protected void populatePartitionVectors(int recordCount) {
+    for (int i = 0; i < pVectors.size(); i++) {
+      final ValueVector vector = pVectors.get(i);
+      final Object val = selectedPartitionValues.get(i);
+
+      AllocationHelper.allocateNew(vector, recordCount);
+
+      if (val != null) {
+        HiveUtilities.populateVector(vector, managedBuffer, val, 0, recordCount);
+      }
+
+      vector.getMutator().setValueCount(recordCount);
+    }
+  }
+
+  /**
+   * Writes value in the given value holder if next value available.
+   * If value is not, checks if there are any other available readers
+   * that may hold next value and tried to obtain value from them.
+   *
+   * @param value value holder
+   * @return true if value was written, false otherwise
+   */
+  protected boolean hasNextValue(Object value) {
+    while (true) {
+      try {
+        if (reader.next(key, value)) {
+          return true;
+        }
+
+        if (initNextReader(job)) {
+          continue;
+        }
+
+        return false;
+
+      } catch (IOException | ExecutionSetupException e) {
+        throw new DrillRuntimeException(e);
+      }
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/f8bbb759/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/readers/initilializers/AbstractReadersInitializer.java
----------------------------------------------------------------------
diff --git a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/readers/initilializers/AbstractReadersInitializer.java b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/readers/initilializers/AbstractReadersInitializer.java
new file mode 100644
index 0000000..7f9e0c0
--- /dev/null
+++ b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/readers/initilializers/AbstractReadersInitializer.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.hive.readers.initilializers;
+
+import org.apache.drill.common.exceptions.DrillRuntimeException;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.store.RecordReader;
+import org.apache.drill.exec.store.hive.HivePartition;
+import org.apache.drill.exec.store.hive.HiveSubScan;
+import org.apache.drill.exec.store.hive.HiveTableWithColumnCache;
+import org.apache.drill.exec.store.hive.readers.HiveAbstractReader;
+import org.apache.drill.exec.util.ImpersonationUtil;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.security.UserGroupInformation;
+
+import java.lang.reflect.Constructor;
+import java.util.Collection;
+import java.util.List;
+
+/**
+ * Parent class for reader initializers which create reader based on reader class.
+ * Holds common logic how to create reader constructor and reader instance.
+ * Is responsible to ensure each child class implements logic for initializing record reader.
+ */
+public abstract class AbstractReadersInitializer {
+
+  protected final HiveSubScan config;
+
+  private final FragmentContext context;
+  private final Class<? extends HiveAbstractReader> readerClass;
+  private final UserGroupInformation proxyUgi;
+
+  public AbstractReadersInitializer(FragmentContext context, HiveSubScan config, Class<? extends HiveAbstractReader> readerClass) {
+    this.config = config;
+    this.context = context;
+    this.readerClass = readerClass;
+    this.proxyUgi = ImpersonationUtil.createProxyUgi(config.getUserName(), context.getQueryUserName());
+  }
+
+  protected Constructor<? extends HiveAbstractReader> createReaderConstructor() {
+    try {
+      return readerClass.getConstructor(HiveTableWithColumnCache.class, HivePartition.class,
+          Collection.class,
+          List.class, FragmentContext.class, HiveConf.class, UserGroupInformation.class);
+    } catch (ReflectiveOperationException e) {
+      throw new DrillRuntimeException(String.format("Unable to retrieve constructor for Hive reader class [%s]", readerClass), e);
+    }
+  }
+
+  protected HiveAbstractReader createReader(Constructor<? extends HiveAbstractReader> readerConstructor, Partition partition, Object split) {
+    try {
+      return readerConstructor.newInstance(config.getTable(), partition, split, config.getColumns(), context, config.getHiveConf(), proxyUgi);
+    } catch (ReflectiveOperationException e) {
+      throw new DrillRuntimeException(String.format("Unable to create instance for Hive reader [%s]", readerConstructor), e);
+    }
+  }
+
+  /**
+   * @return list of initialized records readers
+   */
+  public abstract List<RecordReader> init();
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/f8bbb759/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/readers/initilializers/DefaultReadersInitializer.java
----------------------------------------------------------------------
diff --git a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/readers/initilializers/DefaultReadersInitializer.java b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/readers/initilializers/DefaultReadersInitializer.java
new file mode 100644
index 0000000..a161151
--- /dev/null
+++ b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/readers/initilializers/DefaultReadersInitializer.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.hive.readers.initilializers;
+
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.store.RecordReader;
+import org.apache.drill.exec.store.hive.HivePartition;
+import org.apache.drill.exec.store.hive.HiveSubScan;
+import org.apache.drill.exec.store.hive.readers.HiveAbstractReader;
+import org.apache.drill.exec.store.hive.readers.initilializers.AbstractReadersInitializer;
+import org.apache.hadoop.mapred.InputSplit;
+
+import java.lang.reflect.Constructor;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Creates separate record reader for each given input split group.
+ */
+public class DefaultReadersInitializer extends AbstractReadersInitializer {
+
+  public DefaultReadersInitializer(FragmentContext context, HiveSubScan config, Class<? extends HiveAbstractReader> readerClass) {
+    super(context, config, readerClass);
+  }
+
+  @Override
+  public List<RecordReader> init() {
+    List<List<InputSplit>> inputSplits = config.getInputSplits();
+    List<HivePartition> partitions = config.getPartitions();
+    boolean hasPartitions = partitions != null && !partitions.isEmpty();
+
+    List<RecordReader> readers = new ArrayList<>(inputSplits.size());
+    Constructor<? extends HiveAbstractReader> readerConstructor = createReaderConstructor();
+    for (int i = 0 ; i < inputSplits.size(); i++) {
+      readers.add(createReader(readerConstructor, hasPartitions ? partitions.get(i) : null, inputSplits.get(i)));
+    }
+    return readers;
+  }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/f8bbb759/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/readers/initilializers/EmptyReadersInitializer.java
----------------------------------------------------------------------
diff --git a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/readers/initilializers/EmptyReadersInitializer.java b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/readers/initilializers/EmptyReadersInitializer.java
new file mode 100644
index 0000000..cb29f19
--- /dev/null
+++ b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/readers/initilializers/EmptyReadersInitializer.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.hive.readers.initilializers;
+
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.store.RecordReader;
+import org.apache.drill.exec.store.hive.HiveSubScan;
+import org.apache.drill.exec.store.hive.readers.HiveAbstractReader;
+import org.apache.drill.exec.store.hive.readers.initilializers.AbstractReadersInitializer;
+import org.apache.hadoop.mapred.InputSplit;
+
+import java.lang.reflect.Constructor;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * If table is empty creates an empty record reader to output the schema.
+ */
+public class EmptyReadersInitializer extends AbstractReadersInitializer {
+
+  public EmptyReadersInitializer(FragmentContext context, HiveSubScan config, Class<? extends HiveAbstractReader> readerClass) {
+    super(context, config, readerClass);
+  }
+
+  @Override
+  public List<RecordReader> init() {
+    List<RecordReader> readers = new ArrayList<>(1);
+    Constructor<? extends HiveAbstractReader> readerConstructor = createReaderConstructor();
+    readers.add(createReader(readerConstructor, null, null));
+    return readers;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/f8bbb759/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/readers/initilializers/ReadersInitializer.java
----------------------------------------------------------------------
diff --git a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/readers/initilializers/ReadersInitializer.java b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/readers/initilializers/ReadersInitializer.java
new file mode 100644
index 0000000..78aaf42
--- /dev/null
+++ b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/readers/initilializers/ReadersInitializer.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.hive.readers.initilializers;
+
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.store.hive.HiveSubScan;
+import org.apache.drill.exec.store.hive.readers.HiveAbstractReader;
+import org.apache.drill.exec.store.hive.readers.HiveAvroReader;
+import org.apache.drill.exec.store.hive.readers.HiveDefaultReader;
+import org.apache.drill.exec.store.hive.readers.HiveOrcReader;
+import org.apache.drill.exec.store.hive.readers.HiveParquetReader;
+import org.apache.drill.exec.store.hive.readers.HiveRCFileReader;
+import org.apache.drill.exec.store.hive.readers.HiveTextReader;
+import org.apache.hadoop.hive.ql.io.RCFileInputFormat;
+import org.apache.hadoop.hive.ql.io.avro.AvroContainerInputFormat;
+import org.apache.hadoop.hive.ql.io.orc.OrcInputFormat;
+import org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat;
+import org.apache.hadoop.mapred.TextInputFormat;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class ReadersInitializer {
+
+  /**
+   * List of all available readers classes for a different Hive nativ formats:
+   * ORC, AVRO, RCFFile, Text and Parquet.
+   */
+  private static final Map<String, Class<? extends HiveAbstractReader>> READER_MAP = new HashMap<>();
+
+  static {
+    READER_MAP.put(OrcInputFormat.class.getCanonicalName(), HiveOrcReader.class);
+    READER_MAP.put(AvroContainerInputFormat.class.getCanonicalName(), HiveAvroReader.class);
+    READER_MAP.put(RCFileInputFormat.class.getCanonicalName(), HiveRCFileReader.class);
+    READER_MAP.put(MapredParquetInputFormat.class.getCanonicalName(), HiveParquetReader.class);
+    READER_MAP.put(TextInputFormat.class.getCanonicalName(), HiveTextReader.class);
+  }
+
+  /**
+   * Determines which reader initializer should be used got given table configuration.
+   * Decision is made based on table content and skip header / footer logic usage.
+   *
+   * @param context fragment context
+   * @param config Hive table config
+   * @return reader initializer
+   */
+  public static AbstractReadersInitializer getInitializer(FragmentContext context, HiveSubScan config) {
+    Class<? extends HiveAbstractReader> readerClass = getReaderClass(config);
+    if (config.getInputSplits().isEmpty()) {
+      return new EmptyReadersInitializer(context, config, readerClass);
+    } else {
+      return new DefaultReadersInitializer(context, config, readerClass);
+    }
+  }
+
+  /**
+   * Will try to find reader class based on Hive table input format.
+   * If reader class was not find, will use default reader class.
+   *
+   * @param config Hive table config
+   * @return reader class
+   */
+  private static Class<? extends HiveAbstractReader> getReaderClass(HiveSubScan config) {
+    final String formatName = config.getTable().getSd().getInputFormat();
+    Class<? extends HiveAbstractReader> readerClass = HiveDefaultReader.class;
+    if (READER_MAP.containsKey(formatName)) {
+      readerClass = READER_MAP.get(formatName);
+    }
+    return readerClass;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/f8bbb759/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/readers/inspectors/AbstractRecordsInspector.java
----------------------------------------------------------------------
diff --git a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/readers/inspectors/AbstractRecordsInspector.java b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/readers/inspectors/AbstractRecordsInspector.java
new file mode 100644
index 0000000..026c3d1
--- /dev/null
+++ b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/readers/inspectors/AbstractRecordsInspector.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.hive.readers.inspectors;
+
+import org.apache.drill.exec.store.hive.readers.HiveAbstractReader;
+
+/**
+ * Parent class for records inspectors which responsible for counting of processed records
+ * and managing free and used value holders.
+ */
+public abstract class AbstractRecordsInspector {
+
+  private int processedRecordCount;
+
+  /**
+   * Checks if current number of processed records does not exceed max batch size.
+   *
+   * @return true if reached max number of records in batch
+   */
+  public boolean isBatchFull() {
+    return processedRecordCount >= HiveAbstractReader.TARGET_RECORD_COUNT;
+  }
+
+  /**
+   * @return number of processed records
+   */
+  public int getProcessedRecordCount() {
+    return processedRecordCount;
+  }
+
+  /**
+   * Increments current number of processed records.
+   */
+  public void incrementProcessedRecordCount() {
+    processedRecordCount++;
+  }
+
+  /**
+   * When batch of data was sent, number of processed records should be reset.
+   */
+  public void reset() {
+    processedRecordCount = 0;
+  }
+
+  /**
+   * Returns value holder where next value will be written.
+   *
+   * @return value holder
+   */
+  public abstract Object getValueHolder();
+
+  /**
+   * @return value holder with written value
+   */
+  public abstract Object getNextValue();
+
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/f8bbb759/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/readers/inspectors/DefaultRecordsInspector.java
----------------------------------------------------------------------
diff --git a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/readers/inspectors/DefaultRecordsInspector.java b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/readers/inspectors/DefaultRecordsInspector.java
new file mode 100644
index 0000000..2ffa64a
--- /dev/null
+++ b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/readers/inspectors/DefaultRecordsInspector.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.hive.readers.inspectors;
+
+/**
+ * Default records inspector that uses the same value holder for each record.
+ * Each value once written is immediately processed thus value holder can be re-used.
+ */
+public class DefaultRecordsInspector extends AbstractRecordsInspector {
+
+  private final Object value;
+
+  public DefaultRecordsInspector(Object value) {
+    this.value = value;
+  }
+
+  @Override
+  public Object getValueHolder() {
+    return value;
+  }
+
+  @Override
+  public Object getNextValue() {
+    return value;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/f8bbb759/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/readers/inspectors/SkipFooterRecordsInspector.java
----------------------------------------------------------------------
diff --git a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/readers/inspectors/SkipFooterRecordsInspector.java b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/readers/inspectors/SkipFooterRecordsInspector.java
new file mode 100644
index 0000000..eee4df0
--- /dev/null
+++ b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/readers/inspectors/SkipFooterRecordsInspector.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.hive.readers.inspectors;
+
+import org.apache.hadoop.mapred.RecordReader;
+
+import java.util.ArrayList;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Queue;
+
+/**
+ * To implement skip footer logic this records inspector will buffer N number of incoming read records in queue
+ * and make sure they are skipped when input is fully processed. FIFO method of queuing is used for these purposes.
+ */
+public class SkipFooterRecordsInspector extends AbstractRecordsInspector {
+
+  private final int footerCount;
+  private Queue<Object> footerBuffer;
+  private final List<Object> valueHolders;
+  private long readRecordsCount;
+
+  public SkipFooterRecordsInspector(RecordReader<Object, Object> reader, int footerCount) {
+    this.footerCount = footerCount;
+    this.footerBuffer = new LinkedList<>();
+    this.valueHolders = initializeValueHolders(reader, footerCount);
+  }
+
+  /**
+   * Returns next available value holder where value should be written from the cached value holders.
+   * Current available holder is determined by getting mod for actually read records.
+   *
+   * @return value holder
+   */
+  @Override
+  public Object getValueHolder() {
+    int availableHolderIndex = (int) readRecordsCount % valueHolders.size();
+    return valueHolders.get(availableHolderIndex);
+  }
+
+  /**
+   * Buffers current value holder with written value
+   * and returns last buffered value if number of buffered values exceeds N records to skip.
+   *
+   * @return next available value holder with written value, null otherwise
+   */
+  @Override
+  public Object getNextValue() {
+    footerBuffer.add(getValueHolder());
+    readRecordsCount++;
+    if (footerBuffer.size() <= footerCount) {
+      return null;
+    }
+    return footerBuffer.poll();
+  }
+
+  /**
+   * Creates buffer of value holders, so these holders can be re-used.
+   * Holders quantity depends on number of lines to skip in the end of the file plus one.
+   *
+   * @param reader record reader
+   * @param footerCount number of lines to skip at the end of the file
+   * @return list of value holders
+   */
+  private List<Object> initializeValueHolders(RecordReader<Object, Object> reader, int footerCount) {
+    List<Object> valueHolder = new ArrayList<>(footerCount + 1);
+    for (int i = 0; i <= footerCount; i++) {
+      valueHolder.add(reader.createValue());
+    }
+    return valueHolder;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/f8bbb759/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/hive/TestHiveStorage.java
----------------------------------------------------------------------
diff --git a/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/hive/TestHiveStorage.java b/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/hive/TestHiveStorage.java
index 33b8ec0..c07c9d8 100644
--- a/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/hive/TestHiveStorage.java
+++ b/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/hive/TestHiveStorage.java
@@ -492,33 +492,43 @@ public class TestHiveStorage extends HiveTestBase {
     }
   }
 
-  @Test // DRILL-3688
-  public void testIgnoreSkipHeaderFooterForRcfile() throws Exception {
+  @Test
+  public void testTableWithHeaderOnly() throws Exception {
     testBuilder()
-        .sqlQuery("select count(1) as cnt from hive.skipper.kv_rcfile_large")
+        .sqlQuery("select count(1) as cnt from hive.skipper.kv_text_header_only")
         .unOrdered()
         .baselineColumns("cnt")
-        .baselineValues(5000L)
+        .baselineValues(0L)
         .go();
   }
 
-  @Test // DRILL-3688
-  public void testIgnoreSkipHeaderFooterForParquet() throws Exception {
+  @Test
+  public void testTableWithFooterOnly() throws Exception {
     testBuilder()
-        .sqlQuery("select count(1) as cnt from hive.skipper.kv_parquet_large")
+        .sqlQuery("select count(1) as cnt from hive.skipper.kv_text_footer_only")
         .unOrdered()
         .baselineColumns("cnt")
-        .baselineValues(5000L)
+        .baselineValues(0L)
         .go();
   }
 
-  @Test // DRILL-3688
-  public void testIgnoreSkipHeaderFooterForSequencefile() throws Exception {
+  @Test
+  public void testTableWithHeaderFooterOnly() throws Exception {
     testBuilder()
-        .sqlQuery("select count(1) as cnt from hive.skipper.kv_sequencefile_large")
+        .sqlQuery("select count(1) as cnt from hive.skipper.kv_text_header_footer_only")
         .unOrdered()
         .baselineColumns("cnt")
-        .baselineValues(5000L)
+        .baselineValues(0L)
+        .go();
+  }
+
+  @Test
+  public void testSkipHeaderFooterForPartitionedTable() throws Exception {
+    testBuilder()
+        .sqlQuery("select count(1) as cnt from hive.skipper.kv_text_with_part")
+        .unOrdered()
+        .baselineColumns("cnt")
+        .baselineValues(4980L)
         .go();
   }
 

http://git-wip-us.apache.org/repos/asf/drill/blob/f8bbb759/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/hive/TestInfoSchemaOnHiveStorage.java
----------------------------------------------------------------------
diff --git a/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/hive/TestInfoSchemaOnHiveStorage.java b/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/hive/TestInfoSchemaOnHiveStorage.java
index 8e5c77b..3e06316 100644
--- a/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/hive/TestInfoSchemaOnHiveStorage.java
+++ b/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/hive/TestInfoSchemaOnHiveStorage.java
@@ -67,9 +67,10 @@ public class TestInfoSchemaOnHiveStorage extends HiveTestBase {
         .baselineValues("hive.skipper", "kv_text_large")
         .baselineValues("hive.skipper", "kv_incorrect_skip_header")
         .baselineValues("hive.skipper", "kv_incorrect_skip_footer")
-        .baselineValues("hive.skipper", "kv_rcfile_large")
-        .baselineValues("hive.skipper", "kv_parquet_large")
-        .baselineValues("hive.skipper", "kv_sequencefile_large")
+        .baselineValues("hive.skipper", "kv_text_header_only")
+        .baselineValues("hive.skipper", "kv_text_footer_only")
+        .baselineValues("hive.skipper", "kv_text_header_footer_only")
+        .baselineValues("hive.skipper", "kv_text_with_part")
         .go();
   }
 
@@ -255,9 +256,10 @@ public class TestInfoSchemaOnHiveStorage extends HiveTestBase {
         .baselineValues("DRILL", "hive.skipper", "kv_text_large", "TABLE")
         .baselineValues("DRILL", "hive.skipper", "kv_incorrect_skip_header", "TABLE")
         .baselineValues("DRILL", "hive.skipper", "kv_incorrect_skip_footer", "TABLE")
-        .baselineValues("DRILL", "hive.skipper", "kv_rcfile_large", "TABLE")
-        .baselineValues("DRILL", "hive.skipper", "kv_parquet_large", "TABLE")
-        .baselineValues("DRILL", "hive.skipper", "kv_sequencefile_large", "TABLE")
+        .baselineValues("DRILL", "hive.skipper", "kv_text_header_only", "TABLE")
+        .baselineValues("DRILL", "hive.skipper", "kv_text_footer_only", "TABLE")
+        .baselineValues("DRILL", "hive.skipper", "kv_text_header_footer_only", "TABLE")
+        .baselineValues("DRILL", "hive.skipper", "kv_text_with_part", "TABLE")
         .go();
   }
 

http://git-wip-us.apache.org/repos/asf/drill/blob/f8bbb759/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/store/hive/HiveTestDataGenerator.java
----------------------------------------------------------------------
diff --git a/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/store/hive/HiveTestDataGenerator.java b/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/store/hive/HiveTestDataGenerator.java
index 4fafadb..924d7cb 100644
--- a/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/store/hive/HiveTestDataGenerator.java
+++ b/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/store/hive/HiveTestDataGenerator.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -518,35 +518,37 @@ public class HiveTestDataGenerator {
 
     // Create text tables with skip header and footer table property
     executeQuery(hiveDriver, "create database if not exists skipper");
-    executeQuery(hiveDriver, createTableWithHeaderFooterProperties("skipper.kv_text_small", "textfile", "1", "1"));
+    executeQuery(hiveDriver, createTableWithHeaderFooterProperties("skipper.kv_text_small", "textfile", "1", "1", false));
     executeQuery(hiveDriver, generateTestDataWithHeadersAndFooters("skipper.kv_text_small", 5, 1, 1));
 
-    executeQuery(hiveDriver, createTableWithHeaderFooterProperties("skipper.kv_text_large", "textfile", "2", "2"));
+    executeQuery(hiveDriver, createTableWithHeaderFooterProperties("skipper.kv_text_large", "textfile", "2", "2", false));
     executeQuery(hiveDriver, generateTestDataWithHeadersAndFooters("skipper.kv_text_large", 5000, 2, 2));
 
-    executeQuery(hiveDriver, createTableWithHeaderFooterProperties("skipper.kv_incorrect_skip_header", "textfile", "A", "1"));
+    executeQuery(hiveDriver, createTableWithHeaderFooterProperties("skipper.kv_incorrect_skip_header", "textfile", "A", "1", false));
     executeQuery(hiveDriver, generateTestDataWithHeadersAndFooters("skipper.kv_incorrect_skip_header", 5, 1, 1));
 
-    executeQuery(hiveDriver, createTableWithHeaderFooterProperties("skipper.kv_incorrect_skip_footer", "textfile", "1", "A"));
+    executeQuery(hiveDriver, createTableWithHeaderFooterProperties("skipper.kv_incorrect_skip_footer", "textfile", "1", "A", false));
     executeQuery(hiveDriver, generateTestDataWithHeadersAndFooters("skipper.kv_incorrect_skip_footer", 5, 1, 1));
 
-    // Create rcfile table with skip header and footer table property
-    executeQuery(hiveDriver, createTableWithHeaderFooterProperties("skipper.kv_rcfile_large", "rcfile", "1", "1"));
-    executeQuery(hiveDriver, "insert into table skipper.kv_rcfile_large select * from skipper.kv_text_large");
+    executeQuery(hiveDriver, createTableWithHeaderFooterProperties("skipper.kv_text_header_only", "textfile", "5", "0", false));
+    executeQuery(hiveDriver, generateTestDataWithHeadersAndFooters("skipper.kv_text_header_only", 0, 5, 0));
 
-    // Create parquet table with skip header and footer table property
-    executeQuery(hiveDriver, createTableWithHeaderFooterProperties("skipper.kv_parquet_large", "parquet", "1", "1"));
-    executeQuery(hiveDriver, "insert into table skipper.kv_parquet_large select * from skipper.kv_text_large");
+    executeQuery(hiveDriver, createTableWithHeaderFooterProperties("skipper.kv_text_footer_only", "textfile", "0", "5", false));
+    executeQuery(hiveDriver, generateTestDataWithHeadersAndFooters("skipper.kv_text_footer_only", 0, 0, 5));
 
-    // Create sequencefile table with skip header and footer table property
-    executeQuery(hiveDriver, createTableWithHeaderFooterProperties("skipper.kv_sequencefile_large", "sequencefile", "1", "1"));
-    executeQuery(hiveDriver, "insert into table skipper.kv_sequencefile_large select * from skipper.kv_text_large");
+    executeQuery(hiveDriver, createTableWithHeaderFooterProperties("skipper.kv_text_header_footer_only", "textfile", "5", "5", false));
+    executeQuery(hiveDriver, generateTestDataWithHeadersAndFooters("skipper.kv_text_header_footer_only", 0, 5, 5));
+
+    executeQuery(hiveDriver, createTableWithHeaderFooterProperties("skipper.kv_text_with_part", "textfile", "5", "5", true));
+    executeQuery(hiveDriver, "insert overwrite table skipper.kv_text_with_part partition (part) " +
+    "select key, value, key % 2 as part from skipper.kv_text_large");
+
+    // Create a table based on json file
+    executeQuery(hiveDriver, "create table default.simple_json(json string)");
+    final String loadData = "load data local inpath '" +
+        Resources.getResource("simple.json") + "' into table default.simple_json";
+    executeQuery(hiveDriver, loadData);
 
-      // Create a table based on json file
-      executeQuery(hiveDriver, "create table default.simple_json(json string)");
-      final String loadData = String.format("load data local inpath '" +
-          Resources.getResource("simple.json") + "' into table default.simple_json");
-      executeQuery(hiveDriver, loadData);
     ss.close();
   }
 
@@ -604,23 +606,41 @@ public class HiveTestDataGenerator {
     return file.getPath();
   }
 
-  private String createTableWithHeaderFooterProperties(String tableName, String format, String headerValue, String footerValue) {
-    return String.format("create table %s (key int, value string) stored as %s tblproperties('%s'='%s', '%s'='%s')",
-        tableName, format, serdeConstants.HEADER_COUNT, headerValue, serdeConstants.FOOTER_COUNT, footerValue);
+  private String createTableWithHeaderFooterProperties(String tableName,
+                                                       String format,
+                                                       String headerValue,
+                                                       String footerValue,
+                                                       boolean hasPartitions) {
+    StringBuilder sb = new StringBuilder();
+    sb.append("create table ").append(tableName);
+    sb.append(" (key int, value string) ");
+    if (hasPartitions) {
+      sb.append("partitioned by (part bigint) ");
+    }
+    sb.append(" stored as ").append(format);
+    sb.append(" tblproperties(");
+    sb.append("'").append(serdeConstants.HEADER_COUNT).append("'='").append(headerValue).append("'");
+    sb.append(",");
+    sb.append("'").append(serdeConstants.FOOTER_COUNT).append("'='").append(footerValue).append("'");
+    sb.append(")");
+
+    return sb.toString();
   }
 
   private String generateTestDataWithHeadersAndFooters(String tableName, int rowCount, int headerLines, int footerLines) {
     StringBuilder sb = new StringBuilder();
     sb.append("insert into table ").append(tableName).append(" (key, value) values ");
-    int length = sb.length();
     sb.append(StringUtils.repeat("('key_header', 'value_header')", ",", headerLines));
+    if (headerLines > 0) {
+      sb.append(",");
+    }
     for (int i  = 1; i <= rowCount; i++) {
-        sb.append(",(").append(i).append(",").append("'key_").append(i).append("')");
+        sb.append("(").append(i).append(",").append("'key_").append(i).append("'),");
     }
-    if (headerLines <= 0) {
-      sb.deleteCharAt(length);
+    if (footerLines <= 0) {
+      sb.deleteCharAt(sb.length() - 1);
     }
-    sb.append(StringUtils.repeat(",('key_footer', 'value_footer')", footerLines));
+    sb.append(StringUtils.repeat("('key_footer', 'value_footer')", ",", footerLines));
 
     return sb.toString();
   }

http://git-wip-us.apache.org/repos/asf/drill/blob/f8bbb759/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/store/hive/inspectors/SkipFooterRecordsInspectorTest.java
----------------------------------------------------------------------
diff --git a/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/store/hive/inspectors/SkipFooterRecordsInspectorTest.java b/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/store/hive/inspectors/SkipFooterRecordsInspectorTest.java
new file mode 100644
index 0000000..92970ed
--- /dev/null
+++ b/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/store/hive/inspectors/SkipFooterRecordsInspectorTest.java
@@ -0,0 +1,84 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements.  See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to you under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License.  You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+package org.apache.drill.exec.store.hive.inspectors;
+
+import org.apache.drill.exec.store.hive.readers.inspectors.SkipFooterRecordsInspector;
+import org.apache.hadoop.mapred.RecordReader;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class SkipFooterRecordsInspectorTest {
+
+  private static RecordReader<Object, Object> recordReader;
+
+  @BeforeClass
+  @SuppressWarnings("unchecked")
+  public static void init() {
+    recordReader = mock(RecordReader.class);
+    when(recordReader.createValue()).thenReturn(new Object());
+  }
+
+  @Test
+  public void testHolderReUsage() {
+    SkipFooterRecordsInspector inspector = new SkipFooterRecordsInspector(recordReader, 1);
+    // store first value holder
+    Object firstHolder = inspector.getValueHolder();
+
+    // return null since one record was buffered as footer
+    assertNull(inspector.getNextValue());
+
+    // store first value holder
+    Object secondHolder = inspector.getValueHolder();
+
+    // return value stored in first holder  now second holder is buffering the footer
+    assertEquals(secondHolder, inspector.getValueHolder());
+    assertEquals(firstHolder, inspector.getNextValue());
+
+    // return value stored in second holder, as now first holder is buffering the footer
+    assertEquals(firstHolder, inspector.getValueHolder());
+    assertEquals(secondHolder, inspector.getNextValue());
+  }
+
+  @Test
+  public void testReset() {
+    SkipFooterRecordsInspector inspector = new SkipFooterRecordsInspector(recordReader, 2);
+    assertEquals(0, inspector.getProcessedRecordCount());
+
+    // store second holder
+    inspector.getNextValue();
+    Object secondHolder = inspector.getValueHolder();
+    inspector.getNextValue();
+
+    // process n records and increment count, so we stop at second holder
+    for (int i = 0; i < 4; i++) {
+      inspector.getNextValue();
+      inspector.incrementProcessedRecordCount();
+    }
+    assertEquals(4, inspector.getProcessedRecordCount());
+    assertEquals(secondHolder, inspector.getValueHolder());
+
+    // reset and make sure we start from the last available holder
+    inspector.reset();
+    assertEquals(0, inspector.getProcessedRecordCount());
+    assertEquals(secondHolder, inspector.getValueHolder());
+  }
+}