You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ha...@apache.org on 2020/05/10 01:37:23 UTC
[hive] branch master updated: HIVE-22769 : Incorrect query results
and query failure during split generation for compressed text files (Panos
G via Ashutosh Chauhan ,
Mustafa Iman) Change-Id: Ifb68bd9e3b103424aed2b9d6871b536a5437a095
Signed-off-by: Ashutosh Chauhan
This is an automated email from the ASF dual-hosted git repository.
hashutosh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push:
new c4d5dc6 HIVE-22769 : Incorrect query results and query failure during split generation for compressed text files (Panos G via Ashutosh Chauhan , Mustafa Iman) Change-Id: Ifb68bd9e3b103424aed2b9d6871b536a5437a095 Signed-off-by: Ashutosh Chauhan <ha...@apache.org>
c4d5dc6 is described below
commit c4d5dc68843b3764b22fc8bccf211962abf6549d
Author: Panos Garefalakis <pg...@cloudera.com>
AuthorDate: Tue Apr 28 14:24:25 2020 +0100
HIVE-22769 : Incorrect query results and query failure during split generation for compressed text files (Panos G via Ashutosh Chauhan , Mustafa Iman)
Change-Id: Ifb68bd9e3b103424aed2b9d6871b536a5437a095
Signed-off-by: Ashutosh Chauhan <ha...@apache.org>
---
data/files/compressed_4line_file1.csv | 3 +
data/files/compressed_4line_file1.csv.bz2 | Bin 0 -> 55 bytes
data/files/compressed_4line_file2.csv | 3 +
data/files/compressed_4line_file2.csv.bz2 | Bin 0 -> 60 bytes
.../test/resources/testconfiguration.properties | 5 +-
.../hive/llap/io/encoded/LineRrOffsetReader.java | 20 +-
.../hive/llap/io/encoded/PassThruOffsetReader.java | 65 ++++++-
.../llap/io/encoded/SerDeEncodedDataReader.java | 19 +-
.../hive/ql/io/HiveContextAwareRecordReader.java | 2 +
.../apache/hadoop/hive/ql/io/HiveInputFormat.java | 21 ++-
.../hadoop/hive/ql/io/SkippingTextInputFormat.java | 14 +-
.../{LineBufferTest.java => TestLineBuffer.java} | 2 +-
.../hive/ql/io/TestSkippingTextInputFormat.java | 205 +++++++++++++++++++++
.../compressed_skip_header_footer_aggr.q | 28 +++
...ter_aggregation.q => skip_header_footer_aggr.q} | 0
.../{skiphf_aggr2.q => skip_header_footer_proj.q} | 0
.../llap/compressed_skip_header_footer_aggr.q.out | 64 +++++++
...egation.q.out => skip_header_footer_aggr.q.out} | 0
...f_aggr2.q.out => skip_header_footer_proj.q.out} | 16 +-
19 files changed, 435 insertions(+), 32 deletions(-)
diff --git a/data/files/compressed_4line_file1.csv b/data/files/compressed_4line_file1.csv
new file mode 100644
index 0000000..efe52db
--- /dev/null
+++ b/data/files/compressed_4line_file1.csv
@@ -0,0 +1,3 @@
+1,2019-12-31
+2,2019-12-31
+3,2019-12-31
diff --git a/data/files/compressed_4line_file1.csv.bz2 b/data/files/compressed_4line_file1.csv.bz2
new file mode 100644
index 0000000..ada697d
Binary files /dev/null and b/data/files/compressed_4line_file1.csv.bz2 differ
diff --git a/data/files/compressed_4line_file2.csv b/data/files/compressed_4line_file2.csv
new file mode 100644
index 0000000..629a850
--- /dev/null
+++ b/data/files/compressed_4line_file2.csv
@@ -0,0 +1,3 @@
+1,2019-12-31 00
+2,2019-12-31 01
+3,2019-12-31 02
diff --git a/data/files/compressed_4line_file2.csv.bz2 b/data/files/compressed_4line_file2.csv.bz2
new file mode 100644
index 0000000..4b5353a
Binary files /dev/null and b/data/files/compressed_4line_file2.csv.bz2 differ
diff --git a/itests/src/test/resources/testconfiguration.properties b/itests/src/test/resources/testconfiguration.properties
index de14c81..39e78d6 100644
--- a/itests/src/test/resources/testconfiguration.properties
+++ b/itests/src/test/resources/testconfiguration.properties
@@ -406,7 +406,9 @@ minillap.query.files=acid_bucket_pruning.q,\
reduce_deduplicate_distinct.q, \
remote_script.q,\
file_with_header_footer.q,\
- file_with_header_footer_aggregation.q,\
+ skip_header_footer_aggr.q,\
+ skip_header_footer_proj.q,\
+ compressed_skip_header_footer_aggr.q,\
external_table_purge.q,\
external_table_with_space_in_location_path.q,\
import_exported_table.q,\
@@ -1041,7 +1043,6 @@ minillaplocal.query.files=\
smb_mapjoin_15.q,\
vectorized_nested_mapjoin.q,\
skiphf_aggr.q,\
- skiphf_aggr2.q,\
multi_insert_lateral_view.q,\
smb_mapjoin_4.q,\
cbo_udf_udaf.q,\
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/LineRrOffsetReader.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/LineRrOffsetReader.java
index 3fc1fa2..a0f949b 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/LineRrOffsetReader.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/LineRrOffsetReader.java
@@ -24,6 +24,7 @@ import java.lang.reflect.Method;
import org.apache.hadoop.hive.llap.io.api.impl.LlapIoImpl;
import org.apache.hadoop.hive.llap.io.encoded.SerDeEncodedDataReader.ReaderWithOffsets;
import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.LineRecordReader;
final class LineRrOffsetReader extends PassThruOffsetReader {
@@ -43,24 +44,29 @@ final class LineRrOffsetReader extends PassThruOffsetReader {
isCompressedMethod = isCompressedMethodTmp;
}
- static ReaderWithOffsets create(LineRecordReader sourceReader) {
- if (isCompressedMethod == null) return new PassThruOffsetReader(sourceReader);
+ static ReaderWithOffsets create(LineRecordReader sourceReader, JobConf jobConf, int skipHeaderCnt, int skipFooterCnt) {
+ // File not compressed, skipping is already done as part of SkippingTextInputFormat
+ if (isCompressedMethod == null) {
+ return new PassThruOffsetReader(sourceReader, jobConf, 0, 0);
+ }
Boolean isCompressed = null;
try {
isCompressed = (Boolean)isCompressedMethod.invoke(sourceReader);
} catch (IllegalAccessException | IllegalArgumentException | InvocationTargetException e) {
LlapIoImpl.LOG.error("Cannot check the reader for compression; offsets not supported", e);
- return new PassThruOffsetReader(sourceReader);
+ return new PassThruOffsetReader(sourceReader, jobConf, 0, 0);
}
if (isCompressed) {
+ // Cannot slice compressed files - do header/footer skipping within the Reader
LlapIoImpl.LOG.info("Reader is compressed; offsets not supported");
- return new PassThruOffsetReader(sourceReader); // Cannot slice compressed files.
+ return new PassThruOffsetReader(sourceReader, jobConf, skipHeaderCnt, skipFooterCnt);
}
- return new LineRrOffsetReader(sourceReader);
+ // For non-compressed Text Files Header/Footer Skipping is already done as part of SkippingTextInputFormat
+ return new LineRrOffsetReader(sourceReader, jobConf);
}
- private LineRrOffsetReader(LineRecordReader sourceReader) {
- super(sourceReader);
+ private LineRrOffsetReader(LineRecordReader sourceReader, JobConf jobConf) {
+ super(sourceReader, jobConf, 0, 0);
this.lrReader = sourceReader;
this.posKey = (LongWritable)key;
}
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/PassThruOffsetReader.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/PassThruOffsetReader.java
index ba2b52d..cab13ee 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/PassThruOffsetReader.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/PassThruOffsetReader.java
@@ -20,23 +20,80 @@ package org.apache.hadoop.hive.llap.io.encoded;
import java.io.IOException;
import org.apache.hadoop.hive.llap.io.encoded.SerDeEncodedDataReader.ReaderWithOffsets;
+import org.apache.hadoop.hive.ql.exec.FooterBuffer;
+import org.apache.hadoop.hive.ql.exec.Utilities;
import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.RecordReader;
@SuppressWarnings("rawtypes") class PassThruOffsetReader implements ReaderWithOffsets {
protected final RecordReader sourceReader;
protected final Object key;
protected final Writable value;
+ protected final JobConf jobConf;
+ protected final int skipHeaderCnt;
+ protected final int skipFooterCnt;
+ private transient FooterBuffer footerBuffer;
+ private transient boolean initialized = false;
- PassThruOffsetReader(RecordReader sourceReader) {
+ PassThruOffsetReader(RecordReader sourceReader, JobConf jobConf, int headerCnt, int footerCnt) {
this.sourceReader = sourceReader;
- key = sourceReader.createKey();
- value = (Writable)sourceReader.createValue();
+ this.key = sourceReader.createKey();
+ this.value = (Writable)sourceReader.createValue();
+ this.jobConf = jobConf;
+ this.skipHeaderCnt = headerCnt;
+ this.skipFooterCnt = footerCnt;
}
@Override
public boolean next() throws IOException {
- return sourceReader.next(key, value);
+ try {
+ boolean opNotEOF = true;
+ /**
+ * Start reading a new file.
+ * If file contains header, skip header lines before reading the records.
+ * If file contains footer, used FooterBuffer to cache and remove footer
+ * records at the end of the file.
+ */
+ if (!initialized) {
+ // Skip header lines.
+ opNotEOF = Utilities.skipHeader(sourceReader, skipFooterCnt, key, value);
+
+ // Initialize footer buffer.
+ if (opNotEOF && skipFooterCnt > 0) {
+ footerBuffer = new FooterBuffer();
+ opNotEOF = footerBuffer.initializeBuffer(jobConf, sourceReader, skipFooterCnt, (WritableComparable) key, value);
+ }
+ this.initialized = true;
+ }
+
+ if (opNotEOF && footerBuffer == null) {
+ /**
+ * When file doesn't end after skipping header line
+ * and there is NO footer lines, read normally.
+ */
+ opNotEOF = sourceReader.next(key, value);
+ }
+
+ if (opNotEOF && footerBuffer != null) {
+ /**
+ * When file doesn't end after skipping header line
+ * and there IS footer lines, update footerBuffer
+ */
+ opNotEOF = footerBuffer.updateBuffer(jobConf, sourceReader, (WritableComparable) key, value);
+ }
+
+ if (opNotEOF) {
+ // File reached the end
+ return true;
+ } else {
+ // Done reading
+ return false;
+ }
+ } catch (Exception e) {
+ throw new IOException(e);
+ }
}
@Override
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/SerDeEncodedDataReader.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/SerDeEncodedDataReader.java
index f489dda..6d3443a 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/SerDeEncodedDataReader.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/SerDeEncodedDataReader.java
@@ -58,9 +58,11 @@ import org.apache.hadoop.hive.llap.io.api.impl.LlapIoImpl;
import org.apache.hadoop.hive.llap.io.decode.GenericColumnVectorProducer.SerDeStripeMetadata;
import org.apache.hadoop.hive.llap.io.decode.OrcEncodedDataConsumer;
import org.apache.hadoop.hive.llap.io.encoded.VectorDeserializeOrcWriter.AsyncCallback;
+import org.apache.hadoop.hive.ql.exec.Utilities;
import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
import org.apache.hadoop.hive.ql.io.HdfsUtils;
+import org.apache.hadoop.hive.ql.io.HiveFileFormatUtils;
import org.apache.hadoop.hive.ql.io.orc.OrcFile;
import org.apache.hadoop.hive.ql.io.orc.OrcFile.WriterOptions;
import org.apache.hadoop.hive.ql.io.orc.OrcInputFormat;
@@ -70,6 +72,7 @@ import org.apache.hadoop.hive.ql.io.orc.encoded.CacheChunk;
import org.apache.hadoop.hive.ql.io.orc.encoded.Reader.OrcEncodedColumnBatch;
import org.apache.hadoop.hive.ql.io.orc.encoded.StoppableAllocator;
import org.apache.hadoop.hive.ql.plan.PartitionDesc;
+import org.apache.hadoop.hive.ql.plan.TableDesc;
import org.apache.hadoop.hive.serde2.Deserializer;
import org.apache.hadoop.hive.serde2.SerDeException;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
@@ -1424,8 +1427,10 @@ public class SerDeEncodedDataReader extends CallableWithNdc<Void>
ReaderWithOffsets offsetReader = null;
@SuppressWarnings("rawtypes")
RecordReader sourceReader = sourceInputFormat.getRecordReader(split, jobConf, reporter);
+ Path path = split.getPath().getFileSystem(daemonConf).makeQualified(split.getPath());
+ PartitionDesc partDesc = HiveFileFormatUtils.getFromPathRecursively(parts, path, null);
try {
- offsetReader = createOffsetReader(sourceReader);
+ offsetReader = createOffsetReader(sourceReader, partDesc.getTableDesc());
sourceReader = null;
} finally {
if (sourceReader != null) {
@@ -1633,16 +1638,20 @@ public class SerDeEncodedDataReader extends CallableWithNdc<Void>
}
}
- private ReaderWithOffsets createOffsetReader(RecordReader<?, ?> sourceReader) {
+ private ReaderWithOffsets createOffsetReader(RecordReader<?, ?> sourceReader, TableDesc tableDesc)
+ throws IOException {
+ int headerCount = Utilities.getHeaderCount(tableDesc);
+ int footerCount = Utilities.getFooterCount(tableDesc, jobConf);
if (LlapIoImpl.LOG.isDebugEnabled()) {
- LlapIoImpl.LOG.debug("Using " + sourceReader.getClass().getSimpleName() + " to read data");
+ LlapIoImpl.LOG.debug("Using {} to read data with skip.header.line.count {} and skip.footer.line.count {}",
+ sourceReader.getClass().getSimpleName(), headerCount, footerCount);
}
// Handle the special cases here. Perhaps we could have a more general structure, or even
// a configurable set (like storage handlers), but for now we only have one.
if (isLrrEnabled && sourceReader instanceof LineRecordReader) {
- return LineRrOffsetReader.create((LineRecordReader)sourceReader);
+ return LineRrOffsetReader.create((LineRecordReader)sourceReader, jobConf, headerCount, footerCount);
}
- return new PassThruOffsetReader(sourceReader);
+ return new PassThruOffsetReader(sourceReader, jobConf, headerCount, footerCount);
}
private static String[] extractHosts(FileSplit split, boolean isInMemory) throws IOException {
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/HiveContextAwareRecordReader.java b/ql/src/java/org/apache/hadoop/hive/ql/io/HiveContextAwareRecordReader.java
index 38b226f..2e05563 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/HiveContextAwareRecordReader.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/HiveContextAwareRecordReader.java
@@ -341,6 +341,8 @@ public abstract class HiveContextAwareRecordReader<K extends WritableComparable,
part = null;
}
TableDesc table = (part == null) ? null : part.getTableDesc();
+ // In TextFormat, skipping is already taken care of as part of SkippingTextInputFormat.
+ // This code will be also called from LLAP when pipeline is non-vectorized and cannot create wrapper.
if (table != null && !TextInputFormat.class.isAssignableFrom(part.getInputFileFormatClass())) {
headerCount = Utilities.getHeaderCount(table);
footerCount = Utilities.getFooterCount(table, jobConf);
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java
index 62ef0c6..ca234cf 100755
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java
@@ -56,6 +56,7 @@ import org.apache.hadoop.hive.serde2.ColumnProjectionUtils;
import org.apache.hadoop.hive.serde2.Deserializer;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.io.compress.CompressionCodecFactory;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileSplit;
import org.apache.hadoop.mapred.InputFormat;
@@ -112,6 +113,7 @@ public class HiveInputFormat<K extends WritableComparable, V extends Writable>
= new ConcurrentHashMap<Class, InputFormat<WritableComparable, Writable>>();
private JobConf job;
+ private CompressionCodecFactory compressionCodecs;
// both classes access by subclasses
protected Map<Path, PartitionDesc> pathToPartitionInfo;
@@ -241,6 +243,7 @@ public class HiveInputFormat<K extends WritableComparable, V extends Writable>
@Override
public void configure(JobConf job) {
this.job = job;
+ this.compressionCodecs = new CompressionCodecFactory(job);
}
public static InputFormat<WritableComparable, Writable> wrapForLlap(
@@ -530,17 +533,18 @@ public class HiveInputFormat<K extends WritableComparable, V extends Writable>
conf.setInputFormat(inputFormat.getClass());
int headerCount = 0;
int footerCount = 0;
+ boolean isCompressedFormat = isCompressedInput(finalDirs);
if (table != null) {
headerCount = Utilities.getHeaderCount(table);
footerCount = Utilities.getFooterCount(table, conf);
if (headerCount != 0 || footerCount != 0) {
- if (TextInputFormat.class.isAssignableFrom(inputFormatClass)) {
+ if (TextInputFormat.class.isAssignableFrom(inputFormatClass) && !isCompressedFormat) {
SkippingTextInputFormat skippingTextInputFormat = new SkippingTextInputFormat();
skippingTextInputFormat.configure(conf, headerCount, footerCount);
inputFormat = skippingTextInputFormat;
} else {
- // if the input is not text and contains header/footer we have no way of
- // splitting them.
+ // if the input is Compressed OR not text we have no way of splitting them!
+ // In that case RecordReader should take care of header/footer skipping!
HiveConf.setLongVar(conf, ConfVars.MAPREDMINSPLITSIZE, Long.MAX_VALUE);
}
}
@@ -603,6 +607,17 @@ public class HiveInputFormat<K extends WritableComparable, V extends Writable>
return validWriteIdList;
}
+ public boolean isCompressedInput(List<Path> finalPaths) {
+ if (compressionCodecs != null) {
+ for (Path curr : finalPaths) {
+ if (this.compressionCodecs.getCodec(curr) != null) {
+ return true;
+ }
+ }
+ }
+ return false;
+ }
+
public static void processPathsForMmRead(List<Path> dirs, Configuration conf,
ValidWriteIdList validWriteIdList, List<Path> finalPaths,
List<Path> pathsWithFileOriginals) throws IOException {
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/SkippingTextInputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/SkippingTextInputFormat.java
index 7e936d5..c5431f8 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/SkippingTextInputFormat.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/SkippingTextInputFormat.java
@@ -21,6 +21,7 @@ package org.apache.hadoop.hive.ql.io;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.compress.CompressionCodecFactory;
import org.apache.hadoop.mapred.FileSplit;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.TextInputFormat;
@@ -35,11 +36,14 @@ import java.util.concurrent.ConcurrentHashMap;
* SkippingInputFormat is a header/footer aware input format. It truncates
* splits identified by TextInputFormat. Header and footers are removed
* from the splits.
+ *
+ * This InputFormat does NOT support Compressed Files!
*/
public class SkippingTextInputFormat extends TextInputFormat {
- private final Map<Path, Long> startIndexMap = new ConcurrentHashMap<Path, Long>();
- private final Map<Path, Long> endIndexMap = new ConcurrentHashMap<Path, Long>();
+ private final Map<Path, Long> startIndexMap = new ConcurrentHashMap<>();
+ private final Map<Path, Long> endIndexMap = new ConcurrentHashMap<>();
+ CompressionCodecFactory compressionCodecs = null;
private JobConf conf;
private int headerCount;
private int footerCount;
@@ -47,6 +51,7 @@ public class SkippingTextInputFormat extends TextInputFormat {
@Override
public void configure(JobConf conf) {
this.conf = conf;
+ this.compressionCodecs = new CompressionCodecFactory(conf);
super.configure(conf);
}
@@ -67,6 +72,11 @@ public class SkippingTextInputFormat extends TextInputFormat {
}
private FileSplit makeSplitInternal(Path file, long start, long length, String[] hosts, String[] inMemoryHosts) {
+ // Do not currently support compressed files!
+ if (compressionCodecs.getCodec(file) != null) {
+ LOG.error("Compressed files are not currently supported!");
+ return new NullRowsInputFormat.DummyInputSplit(file);
+ }
long cachedStart;
long cachedEnd;
try {
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/io/LineBufferTest.java b/ql/src/test/org/apache/hadoop/hive/ql/io/TestLineBuffer.java
similarity index 98%
rename from ql/src/test/org/apache/hadoop/hive/ql/io/LineBufferTest.java
rename to ql/src/test/org/apache/hadoop/hive/ql/io/TestLineBuffer.java
index 9cbdeef..71ca3ac 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/io/LineBufferTest.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/io/TestLineBuffer.java
@@ -25,7 +25,7 @@ import static org.junit.Assert.assertEquals;
/**
* LineEndBuffer simple unit test.
*/
-public class LineBufferTest {
+public class TestLineBuffer {
@Test
public void testLineEndBuffer() {
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/io/TestSkippingTextInputFormat.java b/ql/src/test/org/apache/hadoop/hive/ql/io/TestSkippingTextInputFormat.java
new file mode 100644
index 0000000..a850a40
--- /dev/null
+++ b/ql/src/test/org/apache/hadoop/hive/ql/io/TestSkippingTextInputFormat.java
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.io;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.hive.ql.plan.MapredWork;
+import org.apache.hadoop.hive.ql.plan.PartitionDesc;
+import org.apache.hadoop.hive.ql.plan.TableDesc;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.CompressionCodecFactory;
+import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.FileSplit;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapred.Reporter;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.OutputStreamWriter;
+import java.util.ArrayList;
+import java.util.LinkedHashMap;
+import java.util.List;
+
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Unittest for SkippingTextInputFormat with Skip Header/Footer.
+ */
+public class TestSkippingTextInputFormat {
+
+ private Configuration conf;
+ private JobConf job;
+ private FileSystem fileSystem;
+ private Path testDir;
+ Reporter reporter;
+
+ private Path dataDir;
+
+ private CompressionCodecFactory compressionCodecs = null;
+ private CompressionCodec codec;
+
+ @Before
+ public void setUp() throws IOException {
+ conf = new Configuration();
+ job = new JobConf(conf);
+
+ TableDesc tblDesc = Utilities.defaultTd;
+ PartitionDesc partDesc = new PartitionDesc(tblDesc, null);
+ LinkedHashMap<Path, PartitionDesc> pt = new LinkedHashMap<>();
+ pt.put(new Path("/tmp/testfolder"), partDesc);
+ MapredWork mrwork = new MapredWork();
+ mrwork.getMapWork().setPathToPartitionInfo(pt);
+ Utilities.setMapRedWork(job, mrwork,new Path("/tmp/" + System.getProperty("user.name"), "hive"));
+
+ fileSystem = FileSystem.getLocal(conf);
+ testDir = new Path(System.getProperty("test.tmp.dir", System.getProperty(
+ "user.dir", new File(".").getAbsolutePath()))
+ + "/TestSkippingTextInputFormat");
+ reporter = Reporter.NULL;
+ fileSystem.delete(testDir, true);
+
+ dataDir = new Path(testDir, "datadir");
+ fileSystem.mkdirs(dataDir);
+ }
+
+ @After
+ public void tearDown() throws IOException {
+ fileSystem.delete(testDir, true);
+ }
+
+ /**
+ * Test CSV input file with header/footer skip.
+ */
+ @Test
+ public void testSkipFileSplits() throws Exception {
+ FileSystem fs = dataDir.getFileSystem(job);
+ FileInputFormat.setInputPaths(job, dataDir);
+
+ // First Dir with 1 File
+ Path dir1_file1 = new Path(dataDir, "skipfile1.csv");
+ writeTextFile(dir1_file1,
+ "dir1_header\n" +
+ "dir1_file1_line1\n" +
+ "dir1_file1_line2\n" +
+ "dir1_footer"
+ );
+
+ SkippingTextInputFormat inputFormat = new SkippingTextInputFormat();
+ // One header and one footer line to be deducted
+ inputFormat.configure(job, 1, 1);
+
+ FileInputFormat.setInputPaths(job, dir1_file1);
+ InputSplit[] splits = inputFormat.getSplits(job, 2);
+
+ assertTrue(splits.length == 2);
+
+ // Read all values.
+ List<String> received = new ArrayList<String>();
+ for (int i=0; i < splits.length; i++) {
+ RecordReader<LongWritable, Text> reader =
+ inputFormat.getRecordReader(splits[i], job, reporter);
+
+ HiveInputFormat.HiveInputSplit hiveInputSplit =
+ new HiveInputFormat.HiveInputSplit(splits[i], inputFormat.getClass().getName());
+ assertTrue(hiveInputSplit.getInputSplit().getClass() == FileSplit.class);
+
+ LongWritable key = reader.createKey();
+ Text value = reader.createValue();
+ while (reader.next(key, value)) {
+ received.add(value.toString());
+ }
+ reader.close();
+ }
+ // make sure we skipped the header and the footer across splits
+ assertTrue(received.size() == 2);
+ assertTrue(!received.get(0).contains("header"));
+ assertTrue(!received.get(received.size()-1).contains("footer"));
+ }
+
+ /**
+ * Test compressed CSV input file with header/footer skip.
+ */
+ @Test
+ public void testSkipCompressedFileSplits() throws Exception {
+ FileSystem fs = dataDir.getFileSystem(job);
+ FileInputFormat.setInputPaths(job, dataDir);
+
+ // First Dir with 1 Compressed CSV File
+ Path dir1_file1 = new Path(dataDir, "skipfile1.csv.bz2");
+ writeTextFile(dir1_file1,
+ "dir1_header\n" +
+ "dir1_file1_line1\n" +
+ "dir1_file1_line2\n" +
+ "dir1_footer"
+ );
+
+ SkippingTextInputFormat inputFormat = new SkippingTextInputFormat();
+ // One header and one footer line to be deducted
+ inputFormat.configure(job, 1, 1);
+
+ compressionCodecs = new CompressionCodecFactory(conf);
+ codec = compressionCodecs.getCodec(dir1_file1);
+ System.out.println("Codec: "+ codec);
+
+ FileInputFormat.setInputPaths(job, dir1_file1);
+ InputSplit[] splits = inputFormat.getSplits(job, 1);
+
+ // Should not generate splits for compressed file!
+ assertTrue(splits.length == 1);
+
+ // Read all values.
+ List<String> received = new ArrayList<String>();
+ for (int i=0; i < splits.length; i++) {
+ RecordReader<LongWritable, Text> reader =
+ inputFormat.getRecordReader(splits[i], job, reporter);
+
+ HiveInputFormat.HiveInputSplit hiveInputSplit =
+ new HiveInputFormat.HiveInputSplit(splits[i], inputFormat.getClass().getName());
+ System.out.println(hiveInputSplit.getInputSplit().getClass());
+ assertTrue(NullRowsInputFormat.DummyInputSplit.class == hiveInputSplit.getInputSplit().getClass());
+ System.out.println("Split: [" +i + "] "+ hiveInputSplit.getStart() + " => " + hiveInputSplit.getLength());
+
+ LongWritable key = reader.createKey();
+ Text value = reader.createValue();
+ while (reader.next(key, value)) {
+ System.out.println("Splits:" + i + " Val: "+ value);
+ received.add(value.toString());
+ }
+ reader.close();
+ }
+ }
+
+ /**
+ * Writes the given string to the given file.
+ */
+ private void writeTextFile(Path file, String content) throws IOException {
+ OutputStreamWriter writer = new OutputStreamWriter(fileSystem.create(file));
+ writer.write(content);
+ writer.close();
+ }
+}
diff --git a/ql/src/test/queries/clientpositive/compressed_skip_header_footer_aggr.q b/ql/src/test/queries/clientpositive/compressed_skip_header_footer_aggr.q
new file mode 100644
index 0000000..25f87b2
--- /dev/null
+++ b/ql/src/test/queries/clientpositive/compressed_skip_header_footer_aggr.q
@@ -0,0 +1,28 @@
+set hive.mapred.mode=nonstrict;
+
+dfs ${system:test.dfs.mkdir} ${system:test.tmp.dir}/testcase1;
+dfs -copyFromLocal ../../data/files/compressed_4line_file1.csv ${system:test.tmp.dir}/testcase1/;
+
+
+CREATE EXTERNAL TABLE `testcase1`(id int, name string) ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.OpenCSVSerde'
+ LOCATION '${system:test.tmp.dir}/testcase1'
+ TBLPROPERTIES ("skip.header.line.count"="1", "skip.footer.line.count"="1");
+
+
+select * from testcase1;
+
+select count(*) from testcase1;
+
+
+dfs ${system:test.dfs.mkdir} ${system:test.tmp.dir}/testcase2;
+dfs -copyFromLocal ../../data/files/compressed_4line_file2.csv ${system:test.tmp.dir}/testcase2/;
+
+
+CREATE EXTERNAL TABLE `testcase2`(id int, name string) ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.OpenCSVSerde'
+ LOCATION '${system:test.tmp.dir}/testcase2'
+ TBLPROPERTIES ("skip.header.line.count"="1", "skip.footer.line.count"="1");
+
+
+select * from testcase2;
+
+select count(*) from testcase2;
diff --git a/ql/src/test/queries/clientpositive/file_with_header_footer_aggregation.q b/ql/src/test/queries/clientpositive/skip_header_footer_aggr.q
similarity index 100%
rename from ql/src/test/queries/clientpositive/file_with_header_footer_aggregation.q
rename to ql/src/test/queries/clientpositive/skip_header_footer_aggr.q
diff --git a/ql/src/test/queries/clientpositive/skiphf_aggr2.q b/ql/src/test/queries/clientpositive/skip_header_footer_proj.q
similarity index 100%
rename from ql/src/test/queries/clientpositive/skiphf_aggr2.q
rename to ql/src/test/queries/clientpositive/skip_header_footer_proj.q
diff --git a/ql/src/test/results/clientpositive/llap/compressed_skip_header_footer_aggr.q.out b/ql/src/test/results/clientpositive/llap/compressed_skip_header_footer_aggr.q.out
new file mode 100644
index 0000000..330d2d0
--- /dev/null
+++ b/ql/src/test/results/clientpositive/llap/compressed_skip_header_footer_aggr.q.out
@@ -0,0 +1,64 @@
+PREHOOK: query: CREATE EXTERNAL TABLE `testcase1`(id int, name string) ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.OpenCSVSerde'
+#### A masked pattern was here ####
+ TBLPROPERTIES ("skip.header.line.count"="1", "skip.footer.line.count"="1")
+PREHOOK: type: CREATETABLE
+PREHOOK: Input: hdfs://### HDFS PATH ###
+PREHOOK: Output: database:default
+PREHOOK: Output: default@testcase1
+POSTHOOK: query: CREATE EXTERNAL TABLE `testcase1`(id int, name string) ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.OpenCSVSerde'
+#### A masked pattern was here ####
+ TBLPROPERTIES ("skip.header.line.count"="1", "skip.footer.line.count"="1")
+POSTHOOK: type: CREATETABLE
+POSTHOOK: Input: hdfs://### HDFS PATH ###
+POSTHOOK: Output: database:default
+POSTHOOK: Output: default@testcase1
+PREHOOK: query: select * from testcase1
+PREHOOK: type: QUERY
+PREHOOK: Input: default@testcase1
+PREHOOK: Output: hdfs://### HDFS PATH ###
+POSTHOOK: query: select * from testcase1
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@testcase1
+POSTHOOK: Output: hdfs://### HDFS PATH ###
+2 2019-12-31
+PREHOOK: query: select count(*) from testcase1
+PREHOOK: type: QUERY
+PREHOOK: Input: default@testcase1
+PREHOOK: Output: hdfs://### HDFS PATH ###
+POSTHOOK: query: select count(*) from testcase1
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@testcase1
+POSTHOOK: Output: hdfs://### HDFS PATH ###
+1
+PREHOOK: query: CREATE EXTERNAL TABLE `testcase2`(id int, name string) ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.OpenCSVSerde'
+#### A masked pattern was here ####
+ TBLPROPERTIES ("skip.header.line.count"="1", "skip.footer.line.count"="1")
+PREHOOK: type: CREATETABLE
+PREHOOK: Input: hdfs://### HDFS PATH ###
+PREHOOK: Output: database:default
+PREHOOK: Output: default@testcase2
+POSTHOOK: query: CREATE EXTERNAL TABLE `testcase2`(id int, name string) ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.OpenCSVSerde'
+#### A masked pattern was here ####
+ TBLPROPERTIES ("skip.header.line.count"="1", "skip.footer.line.count"="1")
+POSTHOOK: type: CREATETABLE
+POSTHOOK: Input: hdfs://### HDFS PATH ###
+POSTHOOK: Output: database:default
+POSTHOOK: Output: default@testcase2
+PREHOOK: query: select * from testcase2
+PREHOOK: type: QUERY
+PREHOOK: Input: default@testcase2
+PREHOOK: Output: hdfs://### HDFS PATH ###
+POSTHOOK: query: select * from testcase2
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@testcase2
+POSTHOOK: Output: hdfs://### HDFS PATH ###
+2 2019-12-31 01
+PREHOOK: query: select count(*) from testcase2
+PREHOOK: type: QUERY
+PREHOOK: Input: default@testcase2
+PREHOOK: Output: hdfs://### HDFS PATH ###
+POSTHOOK: query: select count(*) from testcase2
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@testcase2
+POSTHOOK: Output: hdfs://### HDFS PATH ###
+1
diff --git a/ql/src/test/results/clientpositive/llap/file_with_header_footer_aggregation.q.out b/ql/src/test/results/clientpositive/llap/skip_header_footer_aggr.q.out
similarity index 100%
rename from ql/src/test/results/clientpositive/llap/file_with_header_footer_aggregation.q.out
rename to ql/src/test/results/clientpositive/llap/skip_header_footer_aggr.q.out
diff --git a/ql/src/test/results/clientpositive/llap/skiphf_aggr2.q.out b/ql/src/test/results/clientpositive/llap/skip_header_footer_proj.q.out
similarity index 93%
rename from ql/src/test/results/clientpositive/llap/skiphf_aggr2.q.out
rename to ql/src/test/results/clientpositive/llap/skip_header_footer_proj.q.out
index ba33a10..f78f7fb 100644
--- a/ql/src/test/results/clientpositive/llap/skiphf_aggr2.q.out
+++ b/ql/src/test/results/clientpositive/llap/skip_header_footer_proj.q.out
@@ -23,11 +23,11 @@ POSTHOOK: Lineage: hf1.b SCRIPT []
PREHOOK: query: SELECT * FROM hf1
PREHOOK: type: QUERY
PREHOOK: Input: default@hf1
-#### A masked pattern was here ####
+PREHOOK: Output: hdfs://### HDFS PATH ###
POSTHOOK: query: SELECT * FROM hf1
POSTHOOK: type: QUERY
POSTHOOK: Input: default@hf1
-#### A masked pattern was here ####
+POSTHOOK: Output: hdfs://### HDFS PATH ###
a b
c d
PREHOOK: query: DROP TABLE IF EXISTS hf2
@@ -55,11 +55,11 @@ POSTHOOK: Lineage: hf2.b SCRIPT []
PREHOOK: query: SELECT * FROM hf2
PREHOOK: type: QUERY
PREHOOK: Input: default@hf2
-#### A masked pattern was here ####
+PREHOOK: Output: hdfs://### HDFS PATH ###
POSTHOOK: query: SELECT * FROM hf2
POSTHOOK: type: QUERY
POSTHOOK: Input: default@hf2
-#### A masked pattern was here ####
+POSTHOOK: Output: hdfs://### HDFS PATH ###
x y
PREHOOK: query: DROP TABLE IF EXISTS hf3
PREHOOK: type: DROPTABLE
@@ -86,11 +86,11 @@ POSTHOOK: Lineage: hf3.b SCRIPT []
PREHOOK: query: SELECT * FROM hf3
PREHOOK: type: QUERY
PREHOOK: Input: default@hf3
-#### A masked pattern was here ####
+PREHOOK: Output: hdfs://### HDFS PATH ###
POSTHOOK: query: SELECT * FROM hf3
POSTHOOK: type: QUERY
POSTHOOK: Input: default@hf3
-#### A masked pattern was here ####
+POSTHOOK: Output: hdfs://### HDFS PATH ###
PREHOOK: query: DROP TABLE IF EXISTS hf4
PREHOOK: type: DROPTABLE
POSTHOOK: query: DROP TABLE IF EXISTS hf4
@@ -116,8 +116,8 @@ POSTHOOK: Lineage: hf4.b SCRIPT []
PREHOOK: query: SELECT * FROM hf4
PREHOOK: type: QUERY
PREHOOK: Input: default@hf4
-#### A masked pattern was here ####
+PREHOOK: Output: hdfs://### HDFS PATH ###
POSTHOOK: query: SELECT * FROM hf4
POSTHOOK: type: QUERY
POSTHOOK: Input: default@hf4
-#### A masked pattern was here ####
+POSTHOOK: Output: hdfs://### HDFS PATH ###