You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ha...@apache.org on 2020/05/10 01:37:23 UTC

[hive] branch master updated: HIVE-22769 : Incorrect query results and query failure during split generation for compressed text files (Panos G via Ashutosh Chauhan , Mustafa Iman) Change-Id: Ifb68bd9e3b103424aed2b9d6871b536a5437a095 Signed-off-by: Ashutosh Chauhan

This is an automated email from the ASF dual-hosted git repository.

hashutosh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git


The following commit(s) were added to refs/heads/master by this push:
     new c4d5dc6  HIVE-22769 : Incorrect query results and query failure during split generation for compressed text files (Panos G via Ashutosh Chauhan , Mustafa Iman) Change-Id: Ifb68bd9e3b103424aed2b9d6871b536a5437a095 Signed-off-by: Ashutosh Chauhan <ha...@apache.org>
c4d5dc6 is described below

commit c4d5dc68843b3764b22fc8bccf211962abf6549d
Author: Panos Garefalakis <pg...@cloudera.com>
AuthorDate: Tue Apr 28 14:24:25 2020 +0100

    HIVE-22769 : Incorrect query results and query failure during split generation for compressed text files (Panos G via Ashutosh Chauhan , Mustafa Iman)
    Change-Id: Ifb68bd9e3b103424aed2b9d6871b536a5437a095
    Signed-off-by: Ashutosh Chauhan <ha...@apache.org>
---
 data/files/compressed_4line_file1.csv              |   3 +
 data/files/compressed_4line_file1.csv.bz2          | Bin 0 -> 55 bytes
 data/files/compressed_4line_file2.csv              |   3 +
 data/files/compressed_4line_file2.csv.bz2          | Bin 0 -> 60 bytes
 .../test/resources/testconfiguration.properties    |   5 +-
 .../hive/llap/io/encoded/LineRrOffsetReader.java   |  20 +-
 .../hive/llap/io/encoded/PassThruOffsetReader.java |  65 ++++++-
 .../llap/io/encoded/SerDeEncodedDataReader.java    |  19 +-
 .../hive/ql/io/HiveContextAwareRecordReader.java   |   2 +
 .../apache/hadoop/hive/ql/io/HiveInputFormat.java  |  21 ++-
 .../hadoop/hive/ql/io/SkippingTextInputFormat.java |  14 +-
 .../{LineBufferTest.java => TestLineBuffer.java}   |   2 +-
 .../hive/ql/io/TestSkippingTextInputFormat.java    | 205 +++++++++++++++++++++
 .../compressed_skip_header_footer_aggr.q           |  28 +++
 ...ter_aggregation.q => skip_header_footer_aggr.q} |   0
 .../{skiphf_aggr2.q => skip_header_footer_proj.q}  |   0
 .../llap/compressed_skip_header_footer_aggr.q.out  |  64 +++++++
 ...egation.q.out => skip_header_footer_aggr.q.out} |   0
 ...f_aggr2.q.out => skip_header_footer_proj.q.out} |  16 +-
 19 files changed, 435 insertions(+), 32 deletions(-)

diff --git a/data/files/compressed_4line_file1.csv b/data/files/compressed_4line_file1.csv
new file mode 100644
index 0000000..efe52db
--- /dev/null
+++ b/data/files/compressed_4line_file1.csv
@@ -0,0 +1,3 @@
+1,2019-12-31
+2,2019-12-31
+3,2019-12-31
diff --git a/data/files/compressed_4line_file1.csv.bz2 b/data/files/compressed_4line_file1.csv.bz2
new file mode 100644
index 0000000..ada697d
Binary files /dev/null and b/data/files/compressed_4line_file1.csv.bz2 differ
diff --git a/data/files/compressed_4line_file2.csv b/data/files/compressed_4line_file2.csv
new file mode 100644
index 0000000..629a850
--- /dev/null
+++ b/data/files/compressed_4line_file2.csv
@@ -0,0 +1,3 @@
+1,2019-12-31 00
+2,2019-12-31 01
+3,2019-12-31 02
diff --git a/data/files/compressed_4line_file2.csv.bz2 b/data/files/compressed_4line_file2.csv.bz2
new file mode 100644
index 0000000..4b5353a
Binary files /dev/null and b/data/files/compressed_4line_file2.csv.bz2 differ
diff --git a/itests/src/test/resources/testconfiguration.properties b/itests/src/test/resources/testconfiguration.properties
index de14c81..39e78d6 100644
--- a/itests/src/test/resources/testconfiguration.properties
+++ b/itests/src/test/resources/testconfiguration.properties
@@ -406,7 +406,9 @@ minillap.query.files=acid_bucket_pruning.q,\
   reduce_deduplicate_distinct.q, \
   remote_script.q,\
   file_with_header_footer.q,\
-  file_with_header_footer_aggregation.q,\
+  skip_header_footer_aggr.q,\
+  skip_header_footer_proj.q,\
+  compressed_skip_header_footer_aggr.q,\
   external_table_purge.q,\
   external_table_with_space_in_location_path.q,\
   import_exported_table.q,\
@@ -1041,7 +1043,6 @@ minillaplocal.query.files=\
   smb_mapjoin_15.q,\
   vectorized_nested_mapjoin.q,\
   skiphf_aggr.q,\
-  skiphf_aggr2.q,\
   multi_insert_lateral_view.q,\
   smb_mapjoin_4.q,\
   cbo_udf_udaf.q,\
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/LineRrOffsetReader.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/LineRrOffsetReader.java
index 3fc1fa2..a0f949b 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/LineRrOffsetReader.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/LineRrOffsetReader.java
@@ -24,6 +24,7 @@ import java.lang.reflect.Method;
 import org.apache.hadoop.hive.llap.io.api.impl.LlapIoImpl;
 import org.apache.hadoop.hive.llap.io.encoded.SerDeEncodedDataReader.ReaderWithOffsets;
 import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.LineRecordReader;
 
 final class LineRrOffsetReader extends PassThruOffsetReader {
@@ -43,24 +44,29 @@ final class LineRrOffsetReader extends PassThruOffsetReader {
     isCompressedMethod = isCompressedMethodTmp;
   }
 
-  static ReaderWithOffsets create(LineRecordReader sourceReader) {
-    if (isCompressedMethod == null) return new PassThruOffsetReader(sourceReader);
+  static ReaderWithOffsets create(LineRecordReader sourceReader, JobConf jobConf, int skipHeaderCnt, int skipFooterCnt) {
+    // File not compressed, skipping is already done as part of SkippingTextInputFormat
+    if (isCompressedMethod == null) {
+      return new PassThruOffsetReader(sourceReader, jobConf, 0, 0);
+    }
     Boolean isCompressed = null;
     try {
       isCompressed = (Boolean)isCompressedMethod.invoke(sourceReader);
     } catch (IllegalAccessException | IllegalArgumentException | InvocationTargetException e) {
       LlapIoImpl.LOG.error("Cannot check the reader for compression; offsets not supported", e);
-      return new PassThruOffsetReader(sourceReader);
+      return new PassThruOffsetReader(sourceReader, jobConf, 0, 0);
     }
     if (isCompressed) {
+      // Cannot slice compressed files - do header/footer skipping within the Reader
       LlapIoImpl.LOG.info("Reader is compressed; offsets not supported");
-      return new PassThruOffsetReader(sourceReader); // Cannot slice compressed files.
+      return new PassThruOffsetReader(sourceReader, jobConf, skipHeaderCnt, skipFooterCnt);
     }
-    return new LineRrOffsetReader(sourceReader);
+    // For non-compressed Text Files Header/Footer Skipping is already done as part of SkippingTextInputFormat
+    return new LineRrOffsetReader(sourceReader, jobConf);
   }
 
-  private LineRrOffsetReader(LineRecordReader sourceReader) {
-    super(sourceReader);
+  private LineRrOffsetReader(LineRecordReader sourceReader, JobConf jobConf) {
+    super(sourceReader, jobConf, 0, 0);
     this.lrReader = sourceReader;
     this.posKey = (LongWritable)key;
   }
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/PassThruOffsetReader.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/PassThruOffsetReader.java
index ba2b52d..cab13ee 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/PassThruOffsetReader.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/PassThruOffsetReader.java
@@ -20,23 +20,80 @@ package org.apache.hadoop.hive.llap.io.encoded;
 import java.io.IOException;
 
 import org.apache.hadoop.hive.llap.io.encoded.SerDeEncodedDataReader.ReaderWithOffsets;
+import org.apache.hadoop.hive.ql.exec.FooterBuffer;
+import org.apache.hadoop.hive.ql.exec.Utilities;
 import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.RecordReader;
 
 @SuppressWarnings("rawtypes") class PassThruOffsetReader implements ReaderWithOffsets {
   protected final RecordReader sourceReader;
   protected final Object key;
   protected final Writable value;
+  protected final JobConf jobConf;
+  protected final int skipHeaderCnt;
+  protected final int skipFooterCnt;
+  private transient FooterBuffer footerBuffer;
+  private transient boolean initialized = false;
 
-  PassThruOffsetReader(RecordReader sourceReader) {
+  PassThruOffsetReader(RecordReader sourceReader, JobConf jobConf, int headerCnt, int footerCnt) {
     this.sourceReader = sourceReader;
-    key = sourceReader.createKey();
-    value = (Writable)sourceReader.createValue();
+    this.key = sourceReader.createKey();
+    this.value = (Writable)sourceReader.createValue();
+    this.jobConf = jobConf;
+    this.skipHeaderCnt = headerCnt;
+    this.skipFooterCnt = footerCnt;
   }
 
   @Override
   public boolean next() throws IOException {
-    return sourceReader.next(key, value);
+    try {
+      boolean opNotEOF = true;
+      /**
+       * Start reading a new file.
+       * If file contains header, skip header lines before reading the records.
+       * If file contains footer, used FooterBuffer to cache and remove footer
+       * records at the end of the file.
+       */
+      if (!initialized) {
+        // Skip header lines.
+        opNotEOF = Utilities.skipHeader(sourceReader, skipFooterCnt, key, value);
+
+        // Initialize footer buffer.
+        if (opNotEOF && skipFooterCnt > 0) {
+          footerBuffer = new FooterBuffer();
+          opNotEOF = footerBuffer.initializeBuffer(jobConf, sourceReader, skipFooterCnt, (WritableComparable) key, value);
+        }
+        this.initialized = true;
+      }
+
+      if (opNotEOF && footerBuffer == null) {
+        /**
+         * When file doesn't end after skipping header line
+         * and there is NO footer lines, read normally.
+         */
+        opNotEOF = sourceReader.next(key, value);
+      }
+
+      if (opNotEOF && footerBuffer != null) {
+        /**
+         * When file doesn't end after skipping header line
+         * and there IS footer lines, update footerBuffer
+         */
+        opNotEOF = footerBuffer.updateBuffer(jobConf, sourceReader, (WritableComparable) key, value);
+      }
+
+      if (opNotEOF) {
+        // File reached the end
+        return true;
+      } else {
+        // Done reading
+        return false;
+      }
+    } catch (Exception e) {
+      throw new IOException(e);
+    }
   }
 
   @Override
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/SerDeEncodedDataReader.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/SerDeEncodedDataReader.java
index f489dda..6d3443a 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/SerDeEncodedDataReader.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/SerDeEncodedDataReader.java
@@ -58,9 +58,11 @@ import org.apache.hadoop.hive.llap.io.api.impl.LlapIoImpl;
 import org.apache.hadoop.hive.llap.io.decode.GenericColumnVectorProducer.SerDeStripeMetadata;
 import org.apache.hadoop.hive.llap.io.decode.OrcEncodedDataConsumer;
 import org.apache.hadoop.hive.llap.io.encoded.VectorDeserializeOrcWriter.AsyncCallback;
+import org.apache.hadoop.hive.ql.exec.Utilities;
 import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
 import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
 import org.apache.hadoop.hive.ql.io.HdfsUtils;
+import org.apache.hadoop.hive.ql.io.HiveFileFormatUtils;
 import org.apache.hadoop.hive.ql.io.orc.OrcFile;
 import org.apache.hadoop.hive.ql.io.orc.OrcFile.WriterOptions;
 import org.apache.hadoop.hive.ql.io.orc.OrcInputFormat;
@@ -70,6 +72,7 @@ import org.apache.hadoop.hive.ql.io.orc.encoded.CacheChunk;
 import org.apache.hadoop.hive.ql.io.orc.encoded.Reader.OrcEncodedColumnBatch;
 import org.apache.hadoop.hive.ql.io.orc.encoded.StoppableAllocator;
 import org.apache.hadoop.hive.ql.plan.PartitionDesc;
+import org.apache.hadoop.hive.ql.plan.TableDesc;
 import org.apache.hadoop.hive.serde2.Deserializer;
 import org.apache.hadoop.hive.serde2.SerDeException;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
@@ -1424,8 +1427,10 @@ public class SerDeEncodedDataReader extends CallableWithNdc<Void>
     ReaderWithOffsets offsetReader = null;
     @SuppressWarnings("rawtypes")
     RecordReader sourceReader = sourceInputFormat.getRecordReader(split, jobConf, reporter);
+    Path path = split.getPath().getFileSystem(daemonConf).makeQualified(split.getPath());
+    PartitionDesc partDesc = HiveFileFormatUtils.getFromPathRecursively(parts, path, null);
     try {
-      offsetReader = createOffsetReader(sourceReader);
+      offsetReader = createOffsetReader(sourceReader, partDesc.getTableDesc());
       sourceReader = null;
     } finally {
       if (sourceReader != null) {
@@ -1633,16 +1638,20 @@ public class SerDeEncodedDataReader extends CallableWithNdc<Void>
     }
   }
 
-  private ReaderWithOffsets createOffsetReader(RecordReader<?, ?> sourceReader) {
+  private ReaderWithOffsets createOffsetReader(RecordReader<?, ?> sourceReader, TableDesc tableDesc)
+      throws IOException {
+    int headerCount = Utilities.getHeaderCount(tableDesc);
+    int footerCount = Utilities.getFooterCount(tableDesc, jobConf);
     if (LlapIoImpl.LOG.isDebugEnabled()) {
-      LlapIoImpl.LOG.debug("Using " + sourceReader.getClass().getSimpleName() + " to read data");
+      LlapIoImpl.LOG.debug("Using {} to read data with skip.header.line.count {} and skip.footer.line.count {}",
+          sourceReader.getClass().getSimpleName(), headerCount, footerCount);
     }
     // Handle the special cases here. Perhaps we could have a more general structure, or even
     // a configurable set (like storage handlers), but for now we only have one.
     if (isLrrEnabled && sourceReader instanceof LineRecordReader) {
-      return LineRrOffsetReader.create((LineRecordReader)sourceReader);
+      return LineRrOffsetReader.create((LineRecordReader)sourceReader, jobConf, headerCount, footerCount);
     }
-    return new PassThruOffsetReader(sourceReader);
+    return new PassThruOffsetReader(sourceReader, jobConf, headerCount, footerCount);
   }
 
   private static String[] extractHosts(FileSplit split, boolean isInMemory) throws IOException {
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/HiveContextAwareRecordReader.java b/ql/src/java/org/apache/hadoop/hive/ql/io/HiveContextAwareRecordReader.java
index 38b226f..2e05563 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/HiveContextAwareRecordReader.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/HiveContextAwareRecordReader.java
@@ -341,6 +341,8 @@ public abstract class HiveContextAwareRecordReader<K extends WritableComparable,
           part = null;
         }
         TableDesc table = (part == null) ? null : part.getTableDesc();
+        // In TextFormat, skipping is already taken care of as part of SkippingTextInputFormat.
+        // This code will be also called from LLAP when pipeline is non-vectorized and cannot create wrapper.
         if (table != null && !TextInputFormat.class.isAssignableFrom(part.getInputFileFormatClass())) {
           headerCount = Utilities.getHeaderCount(table);
           footerCount = Utilities.getFooterCount(table, jobConf);
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java
index 62ef0c6..ca234cf 100755
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java
@@ -56,6 +56,7 @@ import org.apache.hadoop.hive.serde2.ColumnProjectionUtils;
 import org.apache.hadoop.hive.serde2.Deserializer;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.io.compress.CompressionCodecFactory;
 import org.apache.hadoop.mapred.FileInputFormat;
 import org.apache.hadoop.mapred.FileSplit;
 import org.apache.hadoop.mapred.InputFormat;
@@ -112,6 +113,7 @@ public class HiveInputFormat<K extends WritableComparable, V extends Writable>
     = new ConcurrentHashMap<Class, InputFormat<WritableComparable, Writable>>();
 
   private JobConf job;
+  private CompressionCodecFactory compressionCodecs;
 
   // both classes access by subclasses
   protected Map<Path, PartitionDesc> pathToPartitionInfo;
@@ -241,6 +243,7 @@ public class HiveInputFormat<K extends WritableComparable, V extends Writable>
   @Override
   public void configure(JobConf job) {
     this.job = job;
+    this.compressionCodecs = new CompressionCodecFactory(job);
   }
 
   public static InputFormat<WritableComparable, Writable> wrapForLlap(
@@ -530,17 +533,18 @@ public class HiveInputFormat<K extends WritableComparable, V extends Writable>
     conf.setInputFormat(inputFormat.getClass());
     int headerCount = 0;
     int footerCount = 0;
+    boolean isCompressedFormat = isCompressedInput(finalDirs);
     if (table != null) {
       headerCount = Utilities.getHeaderCount(table);
       footerCount = Utilities.getFooterCount(table, conf);
       if (headerCount != 0 || footerCount != 0) {
-        if (TextInputFormat.class.isAssignableFrom(inputFormatClass)) {
+        if (TextInputFormat.class.isAssignableFrom(inputFormatClass) && !isCompressedFormat) {
           SkippingTextInputFormat skippingTextInputFormat = new SkippingTextInputFormat();
           skippingTextInputFormat.configure(conf, headerCount, footerCount);
           inputFormat = skippingTextInputFormat;
         } else {
-          // if the input is not text and contains header/footer we have no way of
-          // splitting them.
+          // if the input is Compressed OR not text we have no way of splitting them!
+          // In that case RecordReader should take care of header/footer skipping!
           HiveConf.setLongVar(conf, ConfVars.MAPREDMINSPLITSIZE, Long.MAX_VALUE);
         }
       }
@@ -603,6 +607,17 @@ public class HiveInputFormat<K extends WritableComparable, V extends Writable>
     return validWriteIdList;
   }
 
+  public boolean isCompressedInput(List<Path> finalPaths) {
+    if (compressionCodecs != null) {
+      for (Path curr : finalPaths) {
+        if (this.compressionCodecs.getCodec(curr) != null) {
+          return true;
+        }
+      }
+    }
+    return false;
+  }
+
   public static void processPathsForMmRead(List<Path> dirs, Configuration conf,
       ValidWriteIdList validWriteIdList, List<Path> finalPaths,
       List<Path> pathsWithFileOriginals) throws IOException {
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/SkippingTextInputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/SkippingTextInputFormat.java
index 7e936d5..c5431f8 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/SkippingTextInputFormat.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/SkippingTextInputFormat.java
@@ -21,6 +21,7 @@ package org.apache.hadoop.hive.ql.io;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.compress.CompressionCodecFactory;
 import org.apache.hadoop.mapred.FileSplit;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.TextInputFormat;
@@ -35,11 +36,14 @@ import java.util.concurrent.ConcurrentHashMap;
  * SkippingInputFormat is a header/footer aware input format. It truncates
  * splits identified by TextInputFormat. Header and footers are removed
  * from the splits.
+ *
+ * This InputFormat does NOT support Compressed Files!
  */
 public class SkippingTextInputFormat extends TextInputFormat {
 
-  private final Map<Path, Long> startIndexMap = new ConcurrentHashMap<Path, Long>();
-  private final Map<Path, Long> endIndexMap = new ConcurrentHashMap<Path, Long>();
+  private final Map<Path, Long> startIndexMap = new ConcurrentHashMap<>();
+  private final Map<Path, Long> endIndexMap = new ConcurrentHashMap<>();
+  CompressionCodecFactory compressionCodecs = null;
   private JobConf conf;
   private int headerCount;
   private int footerCount;
@@ -47,6 +51,7 @@ public class SkippingTextInputFormat extends TextInputFormat {
   @Override
   public void configure(JobConf conf) {
     this.conf = conf;
+    this.compressionCodecs = new CompressionCodecFactory(conf);
     super.configure(conf);
   }
 
@@ -67,6 +72,11 @@ public class SkippingTextInputFormat extends TextInputFormat {
   }
 
   private FileSplit makeSplitInternal(Path file, long start, long length, String[] hosts, String[] inMemoryHosts) {
+    // Do not currently support compressed files!
+    if (compressionCodecs.getCodec(file) != null) {
+      LOG.error("Compressed files are not currently supported!");
+      return new NullRowsInputFormat.DummyInputSplit(file);
+    }
     long cachedStart;
     long cachedEnd;
     try {
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/io/LineBufferTest.java b/ql/src/test/org/apache/hadoop/hive/ql/io/TestLineBuffer.java
similarity index 98%
rename from ql/src/test/org/apache/hadoop/hive/ql/io/LineBufferTest.java
rename to ql/src/test/org/apache/hadoop/hive/ql/io/TestLineBuffer.java
index 9cbdeef..71ca3ac 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/io/LineBufferTest.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/io/TestLineBuffer.java
@@ -25,7 +25,7 @@ import static org.junit.Assert.assertEquals;
 /**
  * LineEndBuffer simple unit test.
  */
-public class LineBufferTest {
+public class TestLineBuffer {
 
   @Test
   public void testLineEndBuffer() {
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/io/TestSkippingTextInputFormat.java b/ql/src/test/org/apache/hadoop/hive/ql/io/TestSkippingTextInputFormat.java
new file mode 100644
index 0000000..a850a40
--- /dev/null
+++ b/ql/src/test/org/apache/hadoop/hive/ql/io/TestSkippingTextInputFormat.java
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.io;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.hive.ql.plan.MapredWork;
+import org.apache.hadoop.hive.ql.plan.PartitionDesc;
+import org.apache.hadoop.hive.ql.plan.TableDesc;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.CompressionCodecFactory;
+import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.FileSplit;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapred.Reporter;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.OutputStreamWriter;
+import java.util.ArrayList;
+import java.util.LinkedHashMap;
+import java.util.List;
+
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Unittest for SkippingTextInputFormat with Skip Header/Footer.
+ */
+public class TestSkippingTextInputFormat {
+
+  private Configuration conf;
+  private JobConf job;
+  private FileSystem fileSystem;
+  private Path testDir;
+  Reporter reporter;
+
+  private Path dataDir;
+
+  private CompressionCodecFactory compressionCodecs = null;
+  private CompressionCodec codec;
+
+  @Before
+  public void setUp() throws IOException {
+    conf = new Configuration();
+    job = new JobConf(conf);
+
+    TableDesc tblDesc = Utilities.defaultTd;
+    PartitionDesc partDesc = new PartitionDesc(tblDesc, null);
+    LinkedHashMap<Path, PartitionDesc> pt = new LinkedHashMap<>();
+    pt.put(new Path("/tmp/testfolder"), partDesc);
+    MapredWork mrwork = new MapredWork();
+    mrwork.getMapWork().setPathToPartitionInfo(pt);
+    Utilities.setMapRedWork(job, mrwork,new Path("/tmp/" + System.getProperty("user.name"), "hive"));
+
+    fileSystem = FileSystem.getLocal(conf);
+    testDir = new Path(System.getProperty("test.tmp.dir", System.getProperty(
+        "user.dir", new File(".").getAbsolutePath()))
+        + "/TestSkippingTextInputFormat");
+    reporter = Reporter.NULL;
+    fileSystem.delete(testDir, true);
+
+    dataDir =  new Path(testDir, "datadir");
+    fileSystem.mkdirs(dataDir);
+  }
+
+  @After
+  public void tearDown() throws IOException {
+    fileSystem.delete(testDir, true);
+  }
+
+  /**
+   * Test CSV input file with header/footer skip.
+   */
+  @Test
+  public void testSkipFileSplits() throws Exception {
+    FileSystem fs = dataDir.getFileSystem(job);
+    FileInputFormat.setInputPaths(job, dataDir);
+
+    // First Dir with 1 File
+    Path dir1_file1 = new Path(dataDir, "skipfile1.csv");
+    writeTextFile(dir1_file1,
+        "dir1_header\n" +
+            "dir1_file1_line1\n" +
+            "dir1_file1_line2\n" +
+            "dir1_footer"
+    );
+
+    SkippingTextInputFormat inputFormat = new SkippingTextInputFormat();
+    // One header and one footer line to be deducted
+    inputFormat.configure(job, 1, 1);
+
+    FileInputFormat.setInputPaths(job, dir1_file1);
+    InputSplit[] splits = inputFormat.getSplits(job, 2);
+
+    assertTrue(splits.length == 2);
+
+    // Read all values.
+    List<String> received = new ArrayList<String>();
+    for (int i=0; i < splits.length; i++) {
+      RecordReader<LongWritable, Text> reader =
+          inputFormat.getRecordReader(splits[i], job, reporter);
+
+      HiveInputFormat.HiveInputSplit hiveInputSplit =
+          new HiveInputFormat.HiveInputSplit(splits[i], inputFormat.getClass().getName());
+      assertTrue(hiveInputSplit.getInputSplit().getClass() == FileSplit.class);
+
+      LongWritable key = reader.createKey();
+      Text value = reader.createValue();
+      while (reader.next(key, value)) {
+        received.add(value.toString());
+      }
+      reader.close();
+    }
+    // make sure we skipped the header and the footer across splits
+    assertTrue(received.size() == 2);
+    assertTrue(!received.get(0).contains("header"));
+    assertTrue(!received.get(received.size()-1).contains("footer"));
+  }
+
+  /**
+   * Test compressed CSV input file with header/footer skip.
+   */
+  @Test
+  public void testSkipCompressedFileSplits() throws Exception {
+    FileSystem fs = dataDir.getFileSystem(job);
+    FileInputFormat.setInputPaths(job, dataDir);
+
+    // First Dir with 1 Compressed CSV File
+    Path dir1_file1 = new Path(dataDir, "skipfile1.csv.bz2");
+    writeTextFile(dir1_file1,
+        "dir1_header\n" +
+            "dir1_file1_line1\n" +
+            "dir1_file1_line2\n" +
+            "dir1_footer"
+    );
+
+    SkippingTextInputFormat inputFormat = new SkippingTextInputFormat();
+    // One header and one footer line to be deducted
+    inputFormat.configure(job, 1, 1);
+
+    compressionCodecs = new CompressionCodecFactory(conf);
+    codec = compressionCodecs.getCodec(dir1_file1);
+    System.out.println("Codec: "+ codec);
+
+    FileInputFormat.setInputPaths(job, dir1_file1);
+    InputSplit[] splits = inputFormat.getSplits(job, 1);
+
+    // Should not generate splits for compressed file!
+    assertTrue(splits.length == 1);
+
+    // Read all values.
+    List<String> received = new ArrayList<String>();
+    for (int i=0; i < splits.length; i++) {
+      RecordReader<LongWritable, Text> reader =
+          inputFormat.getRecordReader(splits[i], job, reporter);
+
+      HiveInputFormat.HiveInputSplit hiveInputSplit =
+          new HiveInputFormat.HiveInputSplit(splits[i], inputFormat.getClass().getName());
+      System.out.println(hiveInputSplit.getInputSplit().getClass());
+      assertTrue(NullRowsInputFormat.DummyInputSplit.class == hiveInputSplit.getInputSplit().getClass());
+      System.out.println("Split: [" +i + "] "+ hiveInputSplit.getStart() + " => " + hiveInputSplit.getLength());
+
+      LongWritable key = reader.createKey();
+      Text value = reader.createValue();
+      while (reader.next(key, value)) {
+        System.out.println("Splits:" + i + " Val: "+ value);
+        received.add(value.toString());
+      }
+      reader.close();
+    }
+  }
+
+  /**
+   * Writes the given string to the given file.
+   */
+  private void writeTextFile(Path file, String content) throws IOException {
+    OutputStreamWriter writer = new OutputStreamWriter(fileSystem.create(file));
+    writer.write(content);
+    writer.close();
+  }
+}
diff --git a/ql/src/test/queries/clientpositive/compressed_skip_header_footer_aggr.q b/ql/src/test/queries/clientpositive/compressed_skip_header_footer_aggr.q
new file mode 100644
index 0000000..25f87b2
--- /dev/null
+++ b/ql/src/test/queries/clientpositive/compressed_skip_header_footer_aggr.q
@@ -0,0 +1,28 @@
+set hive.mapred.mode=nonstrict;
+
+dfs ${system:test.dfs.mkdir} ${system:test.tmp.dir}/testcase1;
+dfs -copyFromLocal ../../data/files/compressed_4line_file1.csv  ${system:test.tmp.dir}/testcase1/;
+
+
+CREATE EXTERNAL TABLE `testcase1`(id int, name string) ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.OpenCSVSerde'
+  LOCATION '${system:test.tmp.dir}/testcase1'
+  TBLPROPERTIES ("skip.header.line.count"="1", "skip.footer.line.count"="1");
+
+
+select * from testcase1;
+
+select count(*) from testcase1;
+
+
+dfs ${system:test.dfs.mkdir} ${system:test.tmp.dir}/testcase2;
+dfs -copyFromLocal ../../data/files/compressed_4line_file2.csv  ${system:test.tmp.dir}/testcase2/;
+
+
+CREATE EXTERNAL TABLE `testcase2`(id int, name string) ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.OpenCSVSerde'
+  LOCATION '${system:test.tmp.dir}/testcase2'
+  TBLPROPERTIES ("skip.header.line.count"="1", "skip.footer.line.count"="1");
+
+
+select * from testcase2;
+
+select count(*) from testcase2;
diff --git a/ql/src/test/queries/clientpositive/file_with_header_footer_aggregation.q b/ql/src/test/queries/clientpositive/skip_header_footer_aggr.q
similarity index 100%
rename from ql/src/test/queries/clientpositive/file_with_header_footer_aggregation.q
rename to ql/src/test/queries/clientpositive/skip_header_footer_aggr.q
diff --git a/ql/src/test/queries/clientpositive/skiphf_aggr2.q b/ql/src/test/queries/clientpositive/skip_header_footer_proj.q
similarity index 100%
rename from ql/src/test/queries/clientpositive/skiphf_aggr2.q
rename to ql/src/test/queries/clientpositive/skip_header_footer_proj.q
diff --git a/ql/src/test/results/clientpositive/llap/compressed_skip_header_footer_aggr.q.out b/ql/src/test/results/clientpositive/llap/compressed_skip_header_footer_aggr.q.out
new file mode 100644
index 0000000..330d2d0
--- /dev/null
+++ b/ql/src/test/results/clientpositive/llap/compressed_skip_header_footer_aggr.q.out
@@ -0,0 +1,64 @@
+PREHOOK: query: CREATE EXTERNAL TABLE `testcase1`(id int, name string) ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.OpenCSVSerde'
+#### A masked pattern was here ####
+  TBLPROPERTIES ("skip.header.line.count"="1", "skip.footer.line.count"="1")
+PREHOOK: type: CREATETABLE
+PREHOOK: Input: hdfs://### HDFS PATH ###
+PREHOOK: Output: database:default
+PREHOOK: Output: default@testcase1
+POSTHOOK: query: CREATE EXTERNAL TABLE `testcase1`(id int, name string) ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.OpenCSVSerde'
+#### A masked pattern was here ####
+  TBLPROPERTIES ("skip.header.line.count"="1", "skip.footer.line.count"="1")
+POSTHOOK: type: CREATETABLE
+POSTHOOK: Input: hdfs://### HDFS PATH ###
+POSTHOOK: Output: database:default
+POSTHOOK: Output: default@testcase1
+PREHOOK: query: select * from testcase1
+PREHOOK: type: QUERY
+PREHOOK: Input: default@testcase1
+PREHOOK: Output: hdfs://### HDFS PATH ###
+POSTHOOK: query: select * from testcase1
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@testcase1
+POSTHOOK: Output: hdfs://### HDFS PATH ###
+2	2019-12-31
+PREHOOK: query: select count(*) from testcase1
+PREHOOK: type: QUERY
+PREHOOK: Input: default@testcase1
+PREHOOK: Output: hdfs://### HDFS PATH ###
+POSTHOOK: query: select count(*) from testcase1
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@testcase1
+POSTHOOK: Output: hdfs://### HDFS PATH ###
+1
+PREHOOK: query: CREATE EXTERNAL TABLE `testcase2`(id int, name string) ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.OpenCSVSerde'
+#### A masked pattern was here ####
+  TBLPROPERTIES ("skip.header.line.count"="1", "skip.footer.line.count"="1")
+PREHOOK: type: CREATETABLE
+PREHOOK: Input: hdfs://### HDFS PATH ###
+PREHOOK: Output: database:default
+PREHOOK: Output: default@testcase2
+POSTHOOK: query: CREATE EXTERNAL TABLE `testcase2`(id int, name string) ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.OpenCSVSerde'
+#### A masked pattern was here ####
+  TBLPROPERTIES ("skip.header.line.count"="1", "skip.footer.line.count"="1")
+POSTHOOK: type: CREATETABLE
+POSTHOOK: Input: hdfs://### HDFS PATH ###
+POSTHOOK: Output: database:default
+POSTHOOK: Output: default@testcase2
+PREHOOK: query: select * from testcase2
+PREHOOK: type: QUERY
+PREHOOK: Input: default@testcase2
+PREHOOK: Output: hdfs://### HDFS PATH ###
+POSTHOOK: query: select * from testcase2
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@testcase2
+POSTHOOK: Output: hdfs://### HDFS PATH ###
+2	2019-12-31 01
+PREHOOK: query: select count(*) from testcase2
+PREHOOK: type: QUERY
+PREHOOK: Input: default@testcase2
+PREHOOK: Output: hdfs://### HDFS PATH ###
+POSTHOOK: query: select count(*) from testcase2
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@testcase2
+POSTHOOK: Output: hdfs://### HDFS PATH ###
+1
diff --git a/ql/src/test/results/clientpositive/llap/file_with_header_footer_aggregation.q.out b/ql/src/test/results/clientpositive/llap/skip_header_footer_aggr.q.out
similarity index 100%
rename from ql/src/test/results/clientpositive/llap/file_with_header_footer_aggregation.q.out
rename to ql/src/test/results/clientpositive/llap/skip_header_footer_aggr.q.out
diff --git a/ql/src/test/results/clientpositive/llap/skiphf_aggr2.q.out b/ql/src/test/results/clientpositive/llap/skip_header_footer_proj.q.out
similarity index 93%
rename from ql/src/test/results/clientpositive/llap/skiphf_aggr2.q.out
rename to ql/src/test/results/clientpositive/llap/skip_header_footer_proj.q.out
index ba33a10..f78f7fb 100644
--- a/ql/src/test/results/clientpositive/llap/skiphf_aggr2.q.out
+++ b/ql/src/test/results/clientpositive/llap/skip_header_footer_proj.q.out
@@ -23,11 +23,11 @@ POSTHOOK: Lineage: hf1.b SCRIPT []
 PREHOOK: query: SELECT * FROM hf1
 PREHOOK: type: QUERY
 PREHOOK: Input: default@hf1
-#### A masked pattern was here ####
+PREHOOK: Output: hdfs://### HDFS PATH ###
 POSTHOOK: query: SELECT * FROM hf1
 POSTHOOK: type: QUERY
 POSTHOOK: Input: default@hf1
-#### A masked pattern was here ####
+POSTHOOK: Output: hdfs://### HDFS PATH ###
 a	b
 c	d
 PREHOOK: query: DROP TABLE IF EXISTS hf2
@@ -55,11 +55,11 @@ POSTHOOK: Lineage: hf2.b SCRIPT []
 PREHOOK: query: SELECT * FROM hf2
 PREHOOK: type: QUERY
 PREHOOK: Input: default@hf2
-#### A masked pattern was here ####
+PREHOOK: Output: hdfs://### HDFS PATH ###
 POSTHOOK: query: SELECT * FROM hf2
 POSTHOOK: type: QUERY
 POSTHOOK: Input: default@hf2
-#### A masked pattern was here ####
+POSTHOOK: Output: hdfs://### HDFS PATH ###
 x	y
 PREHOOK: query: DROP TABLE IF EXISTS hf3
 PREHOOK: type: DROPTABLE
@@ -86,11 +86,11 @@ POSTHOOK: Lineage: hf3.b SCRIPT []
 PREHOOK: query: SELECT * FROM hf3
 PREHOOK: type: QUERY
 PREHOOK: Input: default@hf3
-#### A masked pattern was here ####
+PREHOOK: Output: hdfs://### HDFS PATH ###
 POSTHOOK: query: SELECT * FROM hf3
 POSTHOOK: type: QUERY
 POSTHOOK: Input: default@hf3
-#### A masked pattern was here ####
+POSTHOOK: Output: hdfs://### HDFS PATH ###
 PREHOOK: query: DROP TABLE IF EXISTS hf4
 PREHOOK: type: DROPTABLE
 POSTHOOK: query: DROP TABLE IF EXISTS hf4
@@ -116,8 +116,8 @@ POSTHOOK: Lineage: hf4.b SCRIPT []
 PREHOOK: query: SELECT * FROM hf4
 PREHOOK: type: QUERY
 PREHOOK: Input: default@hf4
-#### A masked pattern was here ####
+PREHOOK: Output: hdfs://### HDFS PATH ###
 POSTHOOK: query: SELECT * FROM hf4
 POSTHOOK: type: QUERY
 POSTHOOK: Input: default@hf4
-#### A masked pattern was here ####
+POSTHOOK: Output: hdfs://### HDFS PATH ###