You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by pa...@apache.org on 2016/10/18 23:45:23 UTC
[03/11] drill git commit: DRILL-4203: Parquet File. Date is stored
wrongly - Added new extra field in the parquet meta info "is.date.correct =
true"; - Removed unnecessary double conversion of value with Julian day;
- Added ability to correct corrupted d
DRILL-4203: Parquet File. Date is stored wrongly - Added new extra field in the parquet meta info "is.date.correct = true"; - Removed unnecessary double conversion of value with Julian day; - Added ability to correct corrupted dates for parquet files with the second version old metadata cache file as well.
This closes #595
Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/8461d10b
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/8461d10b
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/8461d10b
Branch: refs/heads/master
Commit: 8461d10b4fd6ce56361d1d826bb3a38b6dc8473c
Parents: ae34d5c
Author: Vitalii Diravka <vi...@gmail.com>
Authored: Mon Sep 19 12:59:27 2016 +0000
Committer: Parth Chandra <pa...@apache.org>
Committed: Fri Oct 14 11:08:07 2016 -0700
----------------------------------------------------------------------
.../hive/HiveDrillNativeScanBatchCreator.java | 1 +
.../templates/ParquetOutputRecordWriter.java | 8 +-
.../drill/exec/store/parquet/Metadata.java | 34 ++-
.../exec/store/parquet/ParquetFormatConfig.java | 5 +-
.../exec/store/parquet/ParquetGroupScan.java | 19 +-
.../store/parquet/ParquetReaderUtility.java | 167 ++++++++-------
.../exec/store/parquet/ParquetRecordWriter.java | 2 +
.../store/parquet/ParquetScanBatchCreator.java | 1 +
.../columnreaders/FixedByteAlignedReader.java | 10 +-
.../NullableFixedByteAlignedReaders.java | 10 +-
.../columnreaders/ParquetRecordReader.java | 8 -
.../parquet2/DrillParquetGroupConverter.java | 10 +-
.../TestCorruptParquetDateCorrection.java | 210 ++++++++++---------
.../0_0_1.parquet | Bin 257 -> 290 bytes
.../0_0_2.parquet | Bin 257 -> 290 bytes
.../0_0_3.parquet | Bin 257 -> 290 bytes
.../0_0_4.parquet | Bin 257 -> 290 bytes
.../0_0_5.parquet | Bin 257 -> 290 bytes
.../0_0_6.parquet | Bin 257 -> 290 bytes
.../hive1dot2_fewtypes_null/000000_0 | Bin 0 -> 2569 bytes
.../4203_corrected_dates.parquet | Bin 278 -> 311 bytes
...on_partitioned_metadata.requires_replace.txt | 2 +-
22 files changed, 261 insertions(+), 226 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/drill/blob/8461d10b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveDrillNativeScanBatchCreator.java
----------------------------------------------------------------------
diff --git a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveDrillNativeScanBatchCreator.java b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveDrillNativeScanBatchCreator.java
index 1ded153..d78c620 100644
--- a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveDrillNativeScanBatchCreator.java
+++ b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveDrillNativeScanBatchCreator.java
@@ -123,6 +123,7 @@ public class HiveDrillNativeScanBatchCreator implements BatchCreator<HiveDrillNa
// in the first row group
ParquetReaderUtility.DateCorruptionStatus containsCorruptDates =
ParquetReaderUtility.detectCorruptDates(parquetMetadata, config.getColumns(), true);
+ logger.info(containsCorruptDates.toString());
readers.add(new ParquetRecordReader(
context,
Path.getPathWithoutSchemeAndAuthority(finalPath).toString(),
http://git-wip-us.apache.org/repos/asf/drill/blob/8461d10b/exec/java-exec/src/main/codegen/templates/ParquetOutputRecordWriter.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/codegen/templates/ParquetOutputRecordWriter.java b/exec/java-exec/src/main/codegen/templates/ParquetOutputRecordWriter.java
index aac0f0c..0af3527 100644
--- a/exec/java-exec/src/main/codegen/templates/ParquetOutputRecordWriter.java
+++ b/exec/java-exec/src/main/codegen/templates/ParquetOutputRecordWriter.java
@@ -16,7 +16,6 @@
* limitations under the License.
*/
-import org.joda.time.DateTimeUtils;
import org.apache.parquet.io.api.Binary;
import java.lang.Override;
@@ -49,7 +48,7 @@ import org.apache.drill.exec.record.MaterializedField;
import org.apache.drill.common.types.TypeProtos;
-import org.joda.time.DateTimeUtils;
+import org.joda.time.DateTimeConstants;
import java.io.IOException;
import java.lang.UnsupportedOperationException;
@@ -71,7 +70,6 @@ public abstract class ParquetOutputRecordWriter extends AbstractRecordWriter imp
private RecordConsumer consumer;
private MessageType schema;
- public static final long JULIAN_DAY_EPOC = DateTimeUtils.toJulianDayNumber(0);
public void setUp(MessageType schema, RecordConsumer consumer) {
this.schema = schema;
@@ -156,12 +154,12 @@ public abstract class ParquetOutputRecordWriter extends AbstractRecordWriter imp
<#elseif minor.class == "Date">
<#if mode.prefix == "Repeated" >
reader.read(i, holder);
- consumer.addInteger((int) (DateTimeUtils.toJulianDayNumber(holder.value) - JULIAN_DAY_EPOC));
+ consumer.addInteger((int) (holder.value / DateTimeConstants.MILLIS_PER_DAY));
<#else>
consumer.startField(fieldName, fieldId);
reader.read(holder);
// convert from internal Drill date format to Julian Day centered around Unix Epoc
- consumer.addInteger((int) (DateTimeUtils.toJulianDayNumber(holder.value) - JULIAN_DAY_EPOC));
+ consumer.addInteger((int) (holder.value / DateTimeConstants.MILLIS_PER_DAY));
consumer.endField(fieldName, fieldId);
</#if>
<#elseif
http://git-wip-us.apache.org/repos/asf/drill/blob/8461d10b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/Metadata.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/Metadata.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/Metadata.java
index d6a739d..f5554c9 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/Metadata.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/Metadata.java
@@ -49,13 +49,13 @@ import org.apache.parquet.schema.MessageType;
import org.apache.parquet.schema.OriginalType;
import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName;
import org.apache.parquet.schema.Type;
-import org.codehaus.jackson.annotate.JsonIgnore;
import org.apache.commons.lang3.tuple.Pair;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonSubTypes;
import com.fasterxml.jackson.annotation.JsonTypeInfo;
import com.fasterxml.jackson.annotation.JsonTypeName;
+import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.core.JsonFactory;
import com.fasterxml.jackson.core.JsonGenerator;
import com.fasterxml.jackson.core.JsonGenerator.Feature;
@@ -184,7 +184,7 @@ public class Metadata {
childFiles.add(file);
}
}
- ParquetTableMetadata_v2 parquetTableMetadata = new ParquetTableMetadata_v2();
+ ParquetTableMetadata_v2 parquetTableMetadata = new ParquetTableMetadata_v2(true);
if (childFiles.size() > 0) {
List<ParquetFileMetadata_v2> childFilesMetadata =
getParquetFileMetadata_v2(parquetTableMetadata, childFiles);
@@ -353,6 +353,7 @@ public class Metadata {
ALL_COLS.add(AbstractRecordReader.STAR_COLUMN);
boolean autoCorrectCorruptDates = formatConfig.autoCorrectCorruptDates;
ParquetReaderUtility.DateCorruptionStatus containsCorruptDates = ParquetReaderUtility.detectCorruptDates(metadata, ALL_COLS, autoCorrectCorruptDates);
+ logger.info(containsCorruptDates.toString());
for (BlockMetaData rowGroup : metadata.getBlocks()) {
List<ColumnMetadata_v2> columnMetadataList = Lists.newArrayList();
long length = 0;
@@ -596,7 +597,10 @@ public class Metadata {
@JsonIgnore public abstract OriginalType getOriginalType(String[] columnName);
@JsonIgnore public abstract ParquetTableMetadataBase clone();
+
@JsonIgnore public abstract String getDrillVersion();
+
+ @JsonIgnore public abstract boolean isDateCorrect();
}
public static abstract class ParquetFileMetadata {
@@ -635,16 +639,14 @@ public class Metadata {
*
* This object would just be immutable, but due to Drill-4203 we need to correct
* date values that had been corrupted by earlier versions of Drill.
- * @return
*/
public abstract void setMax(Object newMax);
/**
- * Set the max value recorded in the parquet metadata statistics.
+ * Set the min value recorded in the parquet metadata statistics.
*
* This object would just be immutable, but due to Drill-4203 we need to correct
* date values that had been corrupted by earlier versions of Drill.
- * @return
*/
public abstract void setMin(Object newMax);
@@ -711,10 +713,16 @@ public class Metadata {
@JsonIgnore @Override public ParquetTableMetadataBase clone() {
return new ParquetTableMetadata_v1(files, directories);
}
+
@Override
public String getDrillVersion() {
return null;
}
+
+ @JsonIgnore @Override
+ public boolean isDateCorrect() {
+ return false;
+ }
}
@@ -919,9 +927,15 @@ public class Metadata {
@JsonProperty List<ParquetFileMetadata_v2> files;
@JsonProperty List<String> directories;
@JsonProperty String drillVersion;
+ @JsonProperty boolean isDateCorrect;
public ParquetTableMetadata_v2() {
+ super();
+ }
+
+ public ParquetTableMetadata_v2(boolean isDateCorrect) {
this.drillVersion = DrillVersionInfo.getVersion();
+ this.isDateCorrect = isDateCorrect;
}
public ParquetTableMetadata_v2(ParquetTableMetadataBase parquetTable,
@@ -930,6 +944,7 @@ public class Metadata {
this.directories = directories;
this.columnTypeInfo = ((ParquetTableMetadata_v2) parquetTable).columnTypeInfo;
this.drillVersion = DrillVersionInfo.getVersion();
+ this.isDateCorrect = true;
}
public ParquetTableMetadata_v2(List<ParquetFileMetadata_v2> files, List<String> directories,
@@ -970,11 +985,16 @@ public class Metadata {
@JsonIgnore @Override public ParquetTableMetadataBase clone() {
return new ParquetTableMetadata_v2(files, directories, columnTypeInfo);
}
- @Override
+
+ @JsonIgnore @Override
public String getDrillVersion() {
return drillVersion;
}
+ @JsonIgnore @Override public boolean isDateCorrect() {
+ return isDateCorrect;
+ }
+
}
@@ -1187,7 +1207,7 @@ public class Metadata {
}
@Override public PrimitiveTypeName getPrimitiveType() {
- return null;
+ return primitiveType;
}
@Override public OriginalType getOriginalType() {
http://git-wip-us.apache.org/repos/asf/drill/blob/8461d10b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFormatConfig.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFormatConfig.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFormatConfig.java
index 9ba03df..b33186e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFormatConfig.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFormatConfig.java
@@ -17,12 +17,13 @@
*/
package org.apache.drill.exec.store.parquet;
+import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.drill.common.logical.FormatPluginConfig;
import com.fasterxml.jackson.annotation.JsonTypeName;
-@JsonTypeName("parquet")
+@JsonTypeName("parquet") @JsonInclude(JsonInclude.Include.NON_DEFAULT)
public class ParquetFormatConfig implements FormatPluginConfig{
public boolean autoCorrectCorruptDates = true;
@@ -44,6 +45,6 @@ public class ParquetFormatConfig implements FormatPluginConfig{
@Override
public int hashCode() {
- return (autoCorrectCorruptDates ? 1 : 0);
+ return (autoCorrectCorruptDates ? 1231 : 1237);
}
}
http://git-wip-us.apache.org/repos/asf/drill/blob/8461d10b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java
index 649282b..b9f0ac0 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java
@@ -80,7 +80,7 @@ import org.apache.drill.exec.vector.NullableVarCharVector;
import org.apache.drill.exec.vector.ValueVector;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
-import org.joda.time.DateTimeUtils;
+import org.joda.time.DateTimeConstants;
import org.apache.parquet.io.api.Binary;
import org.apache.parquet.schema.OriginalType;
import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName;
@@ -479,8 +479,7 @@ public class ParquetGroupScan extends AbstractFileGroupScan {
case DATE: {
NullableDateVector dateVector = (NullableDateVector) v;
Integer value = (Integer) partitionValueMap.get(f).get(column);
- dateVector.getMutator().setSafe(index, DateTimeUtils.fromJulianDay(
- value + ParquetReaderUtility.SHIFT_PARQUET_DAY_COUNT_TO_JULIAN_DAY));
+ dateVector.getMutator().setSafe(index, value * (long) DateTimeConstants.MILLIS_PER_DAY);
return;
}
case TIME: {
@@ -668,9 +667,7 @@ public class ParquetGroupScan extends AbstractFileGroupScan {
}
if (metaPath != null && fs.exists(metaPath)) {
usedMetadataCache = true;
- if (parquetTableMetadata == null) {
- parquetTableMetadata = Metadata.readBlockMeta(fs, metaPath.toString(), metaContext, formatConfig);
- }
+ parquetTableMetadata = Metadata.readBlockMeta(fs, metaPath.toString(), metaContext, formatConfig);
} else {
parquetTableMetadata = Metadata.getParquetTableMetadata(fs, p.toString(), formatConfig);
}
@@ -683,15 +680,7 @@ public class ParquetGroupScan extends AbstractFileGroupScan {
parquetTableMetadata = Metadata.readBlockMeta(fs, metaPath.toString(), metaContext, formatConfig);
}
if (fileSet != null) {
- if (parquetTableMetadata == null) {
- parquetTableMetadata = removeUnneededRowGroups(Metadata.readBlockMeta(fs, metaPath.toString(), metaContext, formatConfig));
- } else {
- parquetTableMetadata = removeUnneededRowGroups(parquetTableMetadata);
- }
- } else {
- if (parquetTableMetadata == null) {
- parquetTableMetadata = Metadata.readBlockMeta(fs, metaPath.toString(), metaContext, formatConfig);
- }
+ parquetTableMetadata = removeUnneededRowGroups(parquetTableMetadata);
}
} else {
final List<FileStatus> fileStatuses = Lists.newArrayList();
http://git-wip-us.apache.org/repos/asf/drill/blob/8461d10b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetReaderUtility.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetReaderUtility.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetReaderUtility.java
index 9d0886f..1f6dc1e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetReaderUtility.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetReaderUtility.java
@@ -23,7 +23,6 @@ import org.apache.drill.common.expression.SchemaPath;
import org.apache.drill.exec.planner.physical.PlannerSettings;
import org.apache.drill.exec.server.options.OptionManager;
import org.apache.drill.exec.store.AbstractRecordReader;
-import org.apache.drill.exec.store.ParquetOutputRecordWriter;
import org.apache.drill.exec.work.ExecErrorConstants;
import org.apache.parquet.SemanticVersion;
import org.apache.parquet.VersionParser;
@@ -39,29 +38,37 @@ import org.apache.parquet.hadoop.metadata.ColumnPath;
import org.apache.parquet.hadoop.metadata.ParquetMetadata;
import org.apache.parquet.schema.OriginalType;
import org.joda.time.Chronology;
-import org.joda.time.DateTimeUtils;
+import org.joda.time.DateTimeConstants;
+import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-/*
+/**
* Utility class where we can capture common logic between the two parquet readers
*/
public class ParquetReaderUtility {
private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ParquetReaderUtility.class);
- // Note the negation symbol in the beginning
- public static final double CORRECT_CORRUPT_DATE_SHIFT = -ParquetOutputRecordWriter.JULIAN_DAY_EPOC - 0.5;
- public static final double SHIFT_PARQUET_DAY_COUNT_TO_JULIAN_DAY = ParquetOutputRecordWriter.JULIAN_DAY_EPOC - 0.5;
- // The year 5000 is the threshold for auto-detecting date corruption.
+ /**
+ * Number of days between Julian day epoch (January 1, 4713 BC) and Unix day epoch (January 1, 1970).
+ * The value of this constant is {@value}.
+ */
+ public static final long JULIAN_DAY_NUMBER_FOR_UNIX_EPOCH = 2440588;
+ /**
+ * All old parquet files (which haven't "is.date.correct=true" property in metadata) have
+ * a corrupt date shift: {@value} days or 2 * {@value #JULIAN_DAY_NUMBER_FOR_UNIX_EPOCH}
+ */
+ public static final long CORRECT_CORRUPT_DATE_SHIFT = 2 * JULIAN_DAY_NUMBER_FOR_UNIX_EPOCH;
+ // The year 5000 (or 1106685 day from Unix epoch) is chosen as the threshold for auto-detecting date corruption.
// This balances two possible cases of bad auto-correction. External tools writing dates in the future will not
// be shifted unless they are past this threshold (and we cannot identify them as external files based on the metadata).
// On the other hand, historical dates written with Drill wouldn't risk being incorrectly shifted unless they were
// something like 10,000 years in the past.
private static final Chronology UTC = org.joda.time.chrono.ISOChronology.getInstanceUTC();
public static final int DATE_CORRUPTION_THRESHOLD =
- (int) (DateTimeUtils.toJulianDayNumber(UTC.getDateTimeMillis(5000, 1, 1, 0)) - ParquetOutputRecordWriter.JULIAN_DAY_EPOC);
+ (int) (UTC.getDateTimeMillis(5000, 1, 1, 0) / DateTimeConstants.MILLIS_PER_DAY);
/**
* For most recently created parquet files, we can determine if we have corrupted dates (see DRILL-4203)
@@ -69,9 +76,24 @@ public class ParquetReaderUtility {
* in the data pages themselves to see if they are likely corrupt.
*/
public enum DateCorruptionStatus {
- META_SHOWS_CORRUPTION, // metadata can determine if the values are definitely CORRUPT
- META_SHOWS_NO_CORRUPTION, // metadata can determine if the values are definitely CORRECT
- META_UNCLEAR_TEST_VALUES // not enough info in metadata, parquet reader must test individual values
+ META_SHOWS_CORRUPTION{
+ @Override
+ public String toString(){
+ return "It is determined from metadata that the date values are definitely CORRUPT";
+ }
+ },
+ META_SHOWS_NO_CORRUPTION {
+ @Override
+ public String toString(){
+ return "It is determined from metadata that the date values are definitely CORRECT";
+ }
+ },
+ META_UNCLEAR_TEST_VALUES {
+ @Override
+ public String toString(){
+ return "Not enough info in metadata, parquet reader will test individual date values";
+ }
+ }
}
public static void checkDecimalTypeEnabled(OptionManager options) {
@@ -102,34 +124,46 @@ public class ParquetReaderUtility {
}
public static int autoCorrectCorruptedDate(int corruptedDate) {
- return (int) (corruptedDate - 2 * ParquetOutputRecordWriter.JULIAN_DAY_EPOC);
+ return (int) (corruptedDate - CORRECT_CORRUPT_DATE_SHIFT);
}
public static void correctDatesInMetadataCache(Metadata.ParquetTableMetadataBase parquetTableMetadata) {
- DateCorruptionStatus cacheFileContainsCorruptDates;
- String drillVersionStr = parquetTableMetadata.getDrillVersion();
- if (drillVersionStr != null) {
- try {
- cacheFileContainsCorruptDates = ParquetReaderUtility.drillVersionHasCorruptedDates(drillVersionStr);
- } catch (VersionParser.VersionParseException e) {
- cacheFileContainsCorruptDates = DateCorruptionStatus.META_SHOWS_CORRUPTION;
- }
- } else {
- cacheFileContainsCorruptDates = DateCorruptionStatus.META_SHOWS_CORRUPTION;
- }
+ boolean isDateCorrect = parquetTableMetadata.isDateCorrect();
+ DateCorruptionStatus cacheFileContainsCorruptDates = isDateCorrect ?
+ DateCorruptionStatus.META_SHOWS_NO_CORRUPTION : DateCorruptionStatus.META_SHOWS_CORRUPTION;
if (cacheFileContainsCorruptDates == DateCorruptionStatus.META_SHOWS_CORRUPTION) {
+ // Looking for the DATE data type of column names in the metadata cache file ("metadata_version" : "v2")
+ String[] names = new String[0];
+ if (parquetTableMetadata instanceof Metadata.ParquetTableMetadata_v2) {
+ for (Metadata.ColumnTypeMetadata_v2 columnTypeMetadata :
+ ((Metadata.ParquetTableMetadata_v2) parquetTableMetadata).columnTypeInfo.values()) {
+ if (OriginalType.DATE.equals(columnTypeMetadata.originalType)) {
+ names = columnTypeMetadata.name;
+ }
+ }
+ }
for (Metadata.ParquetFileMetadata file : parquetTableMetadata.getFiles()) {
// Drill has only ever written a single row group per file, only need to correct the statistics
// on the first row group
Metadata.RowGroupMetadata rowGroupMetadata = file.getRowGroups().get(0);
for (Metadata.ColumnMetadata columnMetadata : rowGroupMetadata.getColumns()) {
- OriginalType originalType = columnMetadata.getOriginalType();
- if (originalType != null && originalType.equals(OriginalType.DATE) &&
- columnMetadata.hasSingleValue() &&
- (Integer) columnMetadata.getMaxValue() > ParquetReaderUtility.DATE_CORRUPTION_THRESHOLD) {
- int newMinMax = ParquetReaderUtility.autoCorrectCorruptedDate((Integer)columnMetadata.getMaxValue());
- columnMetadata.setMax(newMinMax);
- columnMetadata.setMin(newMinMax);
+ // Setting Min/Max values for ParquetTableMetadata_v1
+ if (parquetTableMetadata instanceof Metadata.ParquetTableMetadata_v1) {
+ OriginalType originalType = columnMetadata.getOriginalType();
+ if (OriginalType.DATE.equals(originalType) && columnMetadata.hasSingleValue() &&
+ (Integer) columnMetadata.getMaxValue() > ParquetReaderUtility.DATE_CORRUPTION_THRESHOLD) {
+ int newMinMax = ParquetReaderUtility.autoCorrectCorruptedDate((Integer)columnMetadata.getMaxValue());
+ columnMetadata.setMax(newMinMax);
+ columnMetadata.setMin(newMinMax);
+ }
+ }
+ // Setting Max values for ParquetTableMetadata_v2
+ else if (parquetTableMetadata instanceof Metadata.ParquetTableMetadata_v2 &&
+ columnMetadata.getName() != null && Arrays.equals(columnMetadata.getName(), names) &&
+ columnMetadata.hasSingleValue() && (Integer) columnMetadata.getMaxValue() >
+ ParquetReaderUtility.DATE_CORRUPTION_THRESHOLD) {
+ int newMax = ParquetReaderUtility.autoCorrectCorruptedDate((Integer) columnMetadata.getMaxValue());
+ columnMetadata.setMax(newMax);
}
}
}
@@ -138,9 +172,6 @@ public class ParquetReaderUtility {
/**
* Check for corrupted dates in a parquet file. See Drill-4203
- * @param footer
- * @param columns
- * @return
*/
public static DateCorruptionStatus detectCorruptDates(ParquetMetadata footer,
List<SchemaPath> columns,
@@ -152,60 +183,43 @@ public class ParquetReaderUtility {
// migrated parquet files have 1.8.1 parquet-mr version with drill-r0 in the part of the name usually containing "SNAPSHOT"
- // new parquet files 1.4+ have drill version number
- // - below 1.9.0 dates are corrupt
- // - this includes 1.9.0-SNAPSHOT
+ // new parquet files are generated with "is.date.correct" property have no corruption dates
- String drillVersion = footer.getFileMetaData().getKeyValueMetaData().get(ParquetRecordWriter.DRILL_VERSION_PROPERTY);
String createdBy = footer.getFileMetaData().getCreatedBy();
- try {
- if (drillVersion == null) {
- // Possibly an old, un-migrated Drill file, check the column statistics to see if min/max values look corrupt
- // only applies if there is a date column selected
- if (createdBy.equals("parquet-mr")) {
- // loop through parquet column metadata to find date columns, check for corrupt valuues
- return checkForCorruptDateValuesInStatistics(footer, columns, autoCorrectCorruptDates);
- } else {
- // check the created by to see if it is a migrated Drill file
+ String drillVersion = footer.getFileMetaData().getKeyValueMetaData().get(ParquetRecordWriter.DRILL_VERSION_PROPERTY);
+ String isDateCorrect = footer.getFileMetaData().getKeyValueMetaData().get(ParquetRecordWriter.IS_DATE_CORRECT_PROPERTY);
+ if (drillVersion != null) {
+ return Boolean.valueOf(isDateCorrect) ? DateCorruptionStatus.META_SHOWS_NO_CORRUPTION
+ : DateCorruptionStatus.META_SHOWS_CORRUPTION;
+ } else {
+ // Possibly an old, un-migrated Drill file, check the column statistics to see if min/max values look corrupt
+ // only applies if there is a date column selected
+ if (createdBy.equals("parquet-mr")) {
+ // loop through parquet column metadata to find date columns, check for corrupt values
+ return checkForCorruptDateValuesInStatistics(footer, columns, autoCorrectCorruptDates);
+ } else {
+ // check the created by to see if it is a migrated Drill file
+ try {
VersionParser.ParsedVersion parsedCreatedByVersion = VersionParser.parse(createdBy);
// check if this is a migrated Drill file, lacking a Drill version number, but with
// "drill" in the parquet created-by string
- SemanticVersion semVer = parsedCreatedByVersion.getSemanticVersion();
- String pre = semVer.pre + "";
- if (semVer != null && semVer.major == 1 && semVer.minor == 8 && semVer.patch == 1 && pre.contains("drill")) {
- return DateCorruptionStatus.META_SHOWS_CORRUPTION;
- } else {
- // written by a tool that wasn't Drill, the dates are not corrupted
- return DateCorruptionStatus.META_SHOWS_NO_CORRUPTION;
+ if (parsedCreatedByVersion.hasSemanticVersion()) {
+ SemanticVersion semVer = parsedCreatedByVersion.getSemanticVersion();
+ String pre = semVer.pre + "";
+ if (semVer.major == 1 && semVer.minor == 8 && semVer.patch == 1 && pre.contains("drill")) {
+ return DateCorruptionStatus.META_SHOWS_CORRUPTION;
+ }
}
+ // written by a tool that wasn't Drill, the dates are not corrupted
+ return DateCorruptionStatus.META_SHOWS_NO_CORRUPTION;
+ } catch (VersionParser.VersionParseException e) {
+ // If we couldn't parse "created by" field, check column metadata of date columns
+ return checkForCorruptDateValuesInStatistics(footer, columns, autoCorrectCorruptDates);
}
- } else {
- // this parser expects an application name before the semantic version, just prepending Drill
- // we know from the property name "drill.version" that we wrote this
- return drillVersionHasCorruptedDates(drillVersion);
}
- } catch (VersionParser.VersionParseException e) {
- // Default value of "false" if we cannot parse the version is fine, we are covering all
- // of the metadata values produced by historical versions of Drill
- // If Drill didn't write it the dates should be fine
- return DateCorruptionStatus.META_SHOWS_CORRUPTION;
}
}
- public static DateCorruptionStatus drillVersionHasCorruptedDates(String drillVersion) throws VersionParser.VersionParseException {
- VersionParser.ParsedVersion parsedDrillVersion = parseDrillVersion(drillVersion);
- SemanticVersion semVer = parsedDrillVersion.getSemanticVersion();
- if (semVer == null || semVer.compareTo(new SemanticVersion(1, 9, 0)) < 0) {
- return DateCorruptionStatus.META_SHOWS_CORRUPTION;
- } else {
- return DateCorruptionStatus.META_SHOWS_NO_CORRUPTION;
- }
-
- }
-
- public static VersionParser.ParsedVersion parseDrillVersion(String drillVersion) throws VersionParser.VersionParseException {
- return VersionParser.parse("drill version " + drillVersion + " (build 1234)");
- }
/**
* Detect corrupt date values by looking at the min/max values in the metadata.
@@ -224,7 +238,6 @@ public class ParquetReaderUtility {
* of corrupt dates. There are some rare cases (storing dates thousands
* of years into the future, with tools other than Drill writing files)
* that would result in the date values being "corrected" into bad values.
- * @return
*/
public static DateCorruptionStatus checkForCorruptDateValuesInStatistics(ParquetMetadata footer,
List<SchemaPath> columns,
http://git-wip-us.apache.org/repos/asf/drill/blob/8461d10b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordWriter.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordWriter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordWriter.java
index 3f2defd..bc6bb65 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordWriter.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordWriter.java
@@ -78,6 +78,7 @@ public class ParquetRecordWriter extends ParquetOutputRecordWriter {
private static final int MAXIMUM_RECORD_COUNT_FOR_CHECK = 10000;
public static final String DRILL_VERSION_PROPERTY = "drill.version";
+ public static final String IS_DATE_CORRECT_PROPERTY = "is.date.correct";
private ParquetFileWriter parquetFileWriter;
private MessageType schema;
@@ -115,6 +116,7 @@ public class ParquetRecordWriter extends ParquetOutputRecordWriter {
this.partitionColumns = writer.getPartitionColumns();
this.hasPartitions = partitionColumns != null && partitionColumns.size() > 0;
this.extraMetaData.put(DRILL_VERSION_PROPERTY, DrillVersionInfo.getVersion());
+ this.extraMetaData.put(IS_DATE_CORRECT_PROPERTY, "true");
}
@Override
http://git-wip-us.apache.org/repos/asf/drill/blob/8461d10b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java
index 8f7ace1..bf13977 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java
@@ -107,6 +107,7 @@ public class ParquetScanBatchCreator implements BatchCreator<ParquetRowGroupScan
boolean autoCorrectCorruptDates = rowGroupScan.formatConfig.autoCorrectCorruptDates;
ParquetReaderUtility.DateCorruptionStatus containsCorruptDates = ParquetReaderUtility.detectCorruptDates(footers.get(e.getPath()), rowGroupScan.getColumns(),
autoCorrectCorruptDates);
+ logger.info(containsCorruptDates.toString());
if (!context.getOptions().getOption(ExecConstants.PARQUET_NEW_RECORD_READER).bool_val && !isComplex(footers.get(e.getPath()))) {
readers.add(
new ParquetRecordReader(
http://git-wip-us.apache.org/repos/asf/drill/blob/8461d10b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/FixedByteAlignedReader.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/FixedByteAlignedReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/FixedByteAlignedReader.java
index cccb06f..f806ee4 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/FixedByteAlignedReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/FixedByteAlignedReader.java
@@ -33,7 +33,7 @@ import org.apache.drill.exec.vector.VariableWidthVector;
import org.apache.parquet.column.ColumnDescriptor;
import org.apache.parquet.format.SchemaElement;
import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
-import org.joda.time.DateTimeUtils;
+import org.joda.time.DateTimeConstants;
import io.netty.buffer.DrillBuf;
@@ -134,7 +134,7 @@ class FixedByteAlignedReader<V extends ValueVector> extends ColumnReader<V> {
intValue = readIntLittleEndian(bytebuf, start);
}
- mutator.set(index, DateTimeUtils.fromJulianDay(intValue + ParquetReaderUtility.SHIFT_PARQUET_DAY_COUNT_TO_JULIAN_DAY));
+ mutator.set(index, intValue * (long) DateTimeConstants.MILLIS_PER_DAY);
}
}
@@ -160,7 +160,7 @@ class FixedByteAlignedReader<V extends ValueVector> extends ColumnReader<V> {
intValue = readIntLittleEndian(bytebuf, start);
}
- mutator.set(index, DateTimeUtils.fromJulianDay(intValue + ParquetReaderUtility.CORRECT_CORRUPT_DATE_SHIFT));
+ mutator.set(index, (intValue - ParquetReaderUtility.CORRECT_CORRUPT_DATE_SHIFT) * DateTimeConstants.MILLIS_PER_DAY);
}
}
@@ -191,9 +191,9 @@ class FixedByteAlignedReader<V extends ValueVector> extends ColumnReader<V> {
}
if (intValue > ParquetReaderUtility.DATE_CORRUPTION_THRESHOLD) {
- mutator.set(index, DateTimeUtils.fromJulianDay(intValue + ParquetReaderUtility.CORRECT_CORRUPT_DATE_SHIFT));
+ mutator.set(index, (intValue - ParquetReaderUtility.CORRECT_CORRUPT_DATE_SHIFT) * DateTimeConstants.MILLIS_PER_DAY);
} else {
- mutator.set(index, DateTimeUtils.fromJulianDay(intValue + ParquetReaderUtility.SHIFT_PARQUET_DAY_COUNT_TO_JULIAN_DAY));
+ mutator.set(index, intValue * (long) DateTimeConstants.MILLIS_PER_DAY);
}
}
http://git-wip-us.apache.org/repos/asf/drill/blob/8461d10b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/NullableFixedByteAlignedReaders.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/NullableFixedByteAlignedReaders.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/NullableFixedByteAlignedReaders.java
index 10e0c72..df4c1ba 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/NullableFixedByteAlignedReaders.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/NullableFixedByteAlignedReaders.java
@@ -43,7 +43,7 @@ import org.apache.parquet.column.ColumnDescriptor;
import org.apache.parquet.format.SchemaElement;
import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
import org.apache.parquet.io.api.Binary;
-import org.joda.time.DateTimeUtils;
+import org.joda.time.DateTimeConstants;
import io.netty.buffer.DrillBuf;
@@ -327,7 +327,7 @@ public class NullableFixedByteAlignedReaders {
intValue = readIntLittleEndian(bytebuf, start);
}
- valueVec.getMutator().set(index, DateTimeUtils.fromJulianDay(intValue + ParquetReaderUtility.SHIFT_PARQUET_DAY_COUNT_TO_JULIAN_DAY));
+ valueVec.getMutator().set(index, intValue * (long) DateTimeConstants.MILLIS_PER_DAY);
}
}
@@ -351,7 +351,7 @@ public class NullableFixedByteAlignedReaders {
intValue = readIntLittleEndian(bytebuf, start);
}
- valueVec.getMutator().set(index, DateTimeUtils.fromJulianDay(intValue + ParquetReaderUtility.CORRECT_CORRUPT_DATE_SHIFT));
+ valueVec.getMutator().set(index, (intValue - ParquetReaderUtility.CORRECT_CORRUPT_DATE_SHIFT) * DateTimeConstants.MILLIS_PER_DAY);
}
}
@@ -384,9 +384,9 @@ public class NullableFixedByteAlignedReaders {
}
if (intValue > ParquetReaderUtility.DATE_CORRUPTION_THRESHOLD) {
- dateVector.getMutator().set(index, DateTimeUtils.fromJulianDay(intValue + ParquetReaderUtility.CORRECT_CORRUPT_DATE_SHIFT));
+ dateVector.getMutator().set(index, (intValue - ParquetReaderUtility.CORRECT_CORRUPT_DATE_SHIFT) * DateTimeConstants.MILLIS_PER_DAY);
} else {
- dateVector.getMutator().set(index, DateTimeUtils.fromJulianDay(intValue + ParquetReaderUtility.SHIFT_PARQUET_DAY_COUNT_TO_JULIAN_DAY));
+ dateVector.getMutator().set(index, intValue * (long) DateTimeConstants.MILLIS_PER_DAY);
}
}
}
http://git-wip-us.apache.org/repos/asf/drill/blob/8461d10b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetRecordReader.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetRecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetRecordReader.java
index 99cf0f5..50bb7dc 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetRecordReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetRecordReader.java
@@ -27,7 +27,6 @@ import java.util.Map;
import com.google.common.collect.ImmutableList;
import org.apache.drill.common.exceptions.DrillRuntimeException;
import org.apache.drill.common.exceptions.ExecutionSetupException;
-import org.apache.drill.common.expression.PathSegment;
import org.apache.drill.common.expression.SchemaPath;
import org.apache.drill.common.types.TypeProtos;
import org.apache.drill.common.types.TypeProtos.DataMode;
@@ -42,18 +41,13 @@ import org.apache.drill.exec.record.MaterializedField;
import org.apache.drill.exec.store.AbstractRecordReader;
import org.apache.drill.exec.store.parquet.ParquetReaderStats;
import org.apache.drill.exec.store.parquet.ParquetReaderUtility;
-import org.apache.drill.exec.store.parquet.ParquetRecordWriter;
import org.apache.drill.exec.vector.AllocationHelper;
import org.apache.drill.exec.vector.NullableIntVector;
import org.apache.drill.exec.vector.ValueVector;
import org.apache.drill.exec.vector.complex.RepeatedValueVector;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-import org.apache.parquet.SemanticVersion;
-import org.apache.parquet.VersionParser;
import org.apache.parquet.column.ColumnDescriptor;
-import org.apache.parquet.column.statistics.Statistics;
-import org.apache.parquet.format.ConvertedType;
import org.apache.parquet.format.FileMetaData;
import org.apache.parquet.format.SchemaElement;
import org.apache.parquet.format.converter.ParquetMetadataConverter;
@@ -235,8 +229,6 @@ public class ParquetRecordReader extends AbstractRecordReader {
* repetition level (indicating if it is an array type) is in the ColumnDescriptor and
* the length of a fixed width field is stored at the schema level.
*
- * @param column
- * @param se
* @return the length if fixed width, else -1
*/
private int getDataTypeLength(ColumnDescriptor column, SchemaElement se) {
http://git-wip-us.apache.org/repos/asf/drill/blob/8461d10b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetGroupConverter.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetGroupConverter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetGroupConverter.java
index 32295b9..48a0bfd 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetGroupConverter.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetGroupConverter.java
@@ -65,7 +65,7 @@ import org.apache.drill.exec.vector.complex.writer.TimeStampWriter;
import org.apache.drill.exec.vector.complex.writer.TimeWriter;
import org.apache.drill.exec.vector.complex.writer.VarBinaryWriter;
import org.apache.drill.exec.vector.complex.writer.VarCharWriter;
-import org.joda.time.DateTimeUtils;
+import org.joda.time.DateTimeConstants;
import org.apache.parquet.io.api.Binary;
import org.apache.parquet.io.api.Converter;
@@ -357,9 +357,9 @@ public class DrillParquetGroupConverter extends GroupConverter {
@Override
public void addInt(int value) {
if (value > ParquetReaderUtility.DATE_CORRUPTION_THRESHOLD) {
- holder.value = DateTimeUtils.fromJulianDay(value + ParquetReaderUtility.CORRECT_CORRUPT_DATE_SHIFT);
+ holder.value = (value - ParquetReaderUtility.CORRECT_CORRUPT_DATE_SHIFT) * DateTimeConstants.MILLIS_PER_DAY;
} else {
- holder.value = DateTimeUtils.fromJulianDay(value + ParquetReaderUtility.SHIFT_PARQUET_DAY_COUNT_TO_JULIAN_DAY);
+ holder.value = value * (long) DateTimeConstants.MILLIS_PER_DAY;
}
writer.write(holder);
}
@@ -375,7 +375,7 @@ public class DrillParquetGroupConverter extends GroupConverter {
@Override
public void addInt(int value) {
- holder.value = DateTimeUtils.fromJulianDay(value + ParquetReaderUtility.CORRECT_CORRUPT_DATE_SHIFT);
+ holder.value = (value - ParquetReaderUtility.CORRECT_CORRUPT_DATE_SHIFT) * DateTimeConstants.MILLIS_PER_DAY;
writer.write(holder);
}
}
@@ -390,7 +390,7 @@ public class DrillParquetGroupConverter extends GroupConverter {
@Override
public void addInt(int value) {
- holder.value = DateTimeUtils.fromJulianDay(value + ParquetReaderUtility.SHIFT_PARQUET_DAY_COUNT_TO_JULIAN_DAY);
+ holder.value = value * (long) DateTimeConstants.MILLIS_PER_DAY;
writer.write(holder);
}
}
http://git-wip-us.apache.org/repos/asf/drill/blob/8461d10b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/writer/TestCorruptParquetDateCorrection.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/writer/TestCorruptParquetDateCorrection.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/writer/TestCorruptParquetDateCorrection.java
index f9b0220..27cc093 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/writer/TestCorruptParquetDateCorrection.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/writer/TestCorruptParquetDateCorrection.java
@@ -23,7 +23,6 @@ import org.apache.drill.common.util.TestTools;
import org.apache.drill.exec.ExecConstants;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.joda.time.DateTime;
@@ -68,9 +67,8 @@ import java.util.regex.Pattern;
public class TestCorruptParquetDateCorrection extends PlanTestBase {
// 4 files are in the directory:
- // - one created with the fixed version of the reader, right before 1.9
- // - the code was changed to write the version number 1.9 (without snapshot) into the file
- // - for compatibility all 1.9-SNAPSHOT files are read to correct the corrupt dates
+ // - one created with the fixed version of the reader
+ // - files have extra meta field: is.date.correct = true
// - one from and old version of Drill, before we put in proper created by in metadata
// - this is read properly by looking at a Max value in the file statistics, to see that
// it is way off of a typical date value
@@ -97,7 +95,9 @@ public class TestCorruptParquetDateCorrection extends PlanTestBase {
private static final String VARCHAR_PARTITIONED =
"[WORKING_PATH]/src/test/resources/parquet/4203_corrupt_dates/fewtypes_varcharpartition";
private static final String DATE_PARTITIONED =
- "[WORKING_PATH]/src/test/resources/parquet/4203_corrupt_dates/fewtypes_varcharpartition";
+ "[WORKING_PATH]/src/test/resources/parquet/4203_corrupt_dates/fewtypes_datepartition";
+ private static final String EXCEPTION_WHILE_PARSING_CREATED_BY_META =
+ "[WORKING_PATH]/src/test/resources/parquet/4203_corrupt_dates/hive1dot2_fewtypes_null";
private static FileSystem fs;
private static Path path;
@@ -123,9 +123,9 @@ public class TestCorruptParquetDateCorrection extends PlanTestBase {
/**
* Test reading a directory full of partitioned parquet files with dates, these files have a drill version
- * number of 1.9.0 in their footers, so we can be certain they do not have corruption. The option to disable the
- * correction is passed, but it will not change the result in the case where we are certain correction
- * is NOT needed. For more info see DRILL-4203.
+ * number of 1.9.0-SNAPSHOT and is.date.correct = true label in their footers, so we can be certain
+ * they do not have corruption. The option to disable the correction is passed, but it will not change the result
+ * in the case where we are certain correction is NOT needed. For more info see DRILL-4203.
*/
@Test
public void testReadPartitionedOnCorrectedDates() throws Exception {
@@ -186,6 +186,21 @@ public class TestCorruptParquetDateCorrection extends PlanTestBase {
testPlanMatchingPatterns(sql, new String[]{"numFiles=6"}, null);
}
+ @Test
+ public void testCorrectDatesAndExceptionWhileParsingCreatedBy() throws Exception {
+ testBuilder()
+ .sqlQuery("select date_col from " +
+ "dfs.`" + EXCEPTION_WHILE_PARSING_CREATED_BY_META +
+ "` where to_date(date_col, 'yyyy-mm-dd') < '1997-01-02'")
+ .baselineColumns("date_col")
+ .unOrdered()
+ .baselineValues(new DateTime(1996, 1, 29, 0, 0))
+ .baselineValues(new DateTime(1996, 3, 1, 0, 0))
+ .baselineValues(new DateTime(1996, 3, 2, 0, 0))
+ .baselineValues(new DateTime(1997, 3, 1, 0, 0))
+ .go();
+ }
+
/**
* Test reading a directory full of partitioned parquet files with dates, these files have a drill version
* number of 1.4.0 in their footers, so we can be certain they are corrupt. The option to disable the
@@ -298,7 +313,8 @@ public class TestCorruptParquetDateCorrection extends PlanTestBase {
@Test
public void testReadCorruptDatesWithNullFilledColumns() throws Exception {
testBuilder()
- .sqlQuery("select null_dates_1, null_dates_2, non_existent_field, date_col from dfs.`" + PARQUET_DATE_FILE_WITH_NULL_FILLED_COLS + "`")
+ .sqlQuery("select null_dates_1, null_dates_2, non_existent_field, date_col from dfs.`" +
+ PARQUET_DATE_FILE_WITH_NULL_FILLED_COLS + "`")
.unOrdered()
.baselineColumns("null_dates_1", "null_dates_2", "non_existent_field", "date_col")
.baselineValues(null, null, null, new DateTime(1970, 1, 1, 0, 0))
@@ -347,90 +363,6 @@ public class TestCorruptParquetDateCorrection extends PlanTestBase {
}
}
- public void addDateBaselineVals(TestBuilder builder) {
- builder
- .baselineValues(new DateTime(1970, 1, 1, 0, 0))
- .baselineValues(new DateTime(1970, 1, 2, 0, 0))
- .baselineValues(new DateTime(1969, 12, 31, 0, 0))
- .baselineValues(new DateTime(1969, 12, 30, 0, 0))
- .baselineValues(new DateTime(1900, 1, 1, 0, 0))
- .baselineValues(new DateTime(2015, 1, 1, 0, 0));
- }
-
- /**
- * These are the same values added in the addDateBaselineVals, shifted as corrupt values
- */
- public void addCorruptedDateBaselineVals(TestBuilder builder) {
- builder
- .baselineValues(new DateTime(15334, 03, 17, 0, 0))
- .baselineValues(new DateTime(15334, 03, 18, 0, 0))
- .baselineValues(new DateTime(15334, 03, 15, 0, 0))
- .baselineValues(new DateTime(15334, 03, 16, 0, 0))
- .baselineValues(new DateTime(15264, 03, 16, 0, 0))
- .baselineValues(new DateTime(15379, 03, 17, 0, 0));
- }
-
- public void readFilesWithUserDisabledAutoCorrection() throws Exception {
- // ensure that selecting the date column explicitly or as part of a star still results
- // in checking the file metadata for date columns (when we need to check the statistics
- // for bad values) to set the flag that the values are corrupt
- for (String selection : new String[] {"*", "date_col"}) {
- TestBuilder builder = testBuilder()
- .sqlQuery("select " + selection + " from table(dfs.`" + MIXED_CORRUPTED_AND_CORRECTED_DATES_PATH + "`" +
- "(type => 'parquet', autoCorrectCorruptDates => false))")
- .unOrdered()
- .baselineColumns("date_col");
- addDateBaselineVals(builder);
- addDateBaselineVals(builder);
- addCorruptedDateBaselineVals(builder);
- addCorruptedDateBaselineVals(builder);
- builder.go();
- }
- }
-
- private static String replaceWorkingPathInString(String orig) {
- return orig.replaceAll(Pattern.quote("[WORKING_PATH]"), Matcher.quoteReplacement(TestTools.getWorkingPath()));
- }
-
- private static void copyDirectoryIntoTempSpace(String resourcesDir) throws IOException {
- copyDirectoryIntoTempSpace(resourcesDir, null);
- }
-
- private static void copyDirectoryIntoTempSpace(String resourcesDir, String destinationSubDir) throws IOException {
- Path destination = path;
- if (destinationSubDir != null) {
- destination = new Path(path, destinationSubDir);
- }
- fs.copyFromLocalFile(
- new Path(replaceWorkingPathInString(resourcesDir)),
- destination);
- }
-
- /**
- * Metadata cache files include full paths to the files that have been scanned.
- *
- * There is no way to generate a metadata cache file with absolute paths that
- * will be guarenteed to be available on an arbitrary test machine.
- *
- * To enable testing older metadata cache files, they were generated manually
- * using older drill versions, and the absolute path up to the folder where
- * the metadata cache file appeared was manually replaced with the string
- * REPLACED_IN_TEST. Here the file is re-written into the given temporary
- * location after the REPLACED_IN_TEST string has been replaced by the actual
- * location generated during this run of the tests.
- *
- * @param srcFileOnClassPath
- * @param destFolderInTmp
- * @throws IOException
- */
- private static void copyMetaDataCacheToTempReplacingInternalPaths(String srcFileOnClassPath, String destFolderInTmp) throws IOException {
- String metadataFileContents = getFile(srcFileOnClassPath);
- Path newMetaCache = new Path(new Path(path, destFolderInTmp), ".drill.parquet_metadata");
- FSDataOutputStream outSteam = fs.create(newMetaCache);
- outSteam.writeBytes(metadataFileContents.replace("REPLACED_IN_TEST", path.toString()));
- outSteam.close();
- }
-
@Test
public void testReadOldMetadataCacheFile() throws Exception {
// for sanity, try reading all partitions without a filter
@@ -488,8 +420,8 @@ public class TestCorruptParquetDateCorrection extends PlanTestBase {
@Test
public void testReadNewMetadataCacheFileOverOldAndNewFiles() throws Exception {
String table = "dfs.`" + new Path(path, MIXED_CORRUPTED_AND_CORRECTED_PARTITIONED_FOLDER) + "`";
- copyMetaDataCacheToTempReplacingInternalPaths("parquet/4203_corrupt_dates/mixed_version_partitioned_metadata.requires_replace.txt",
- MIXED_CORRUPTED_AND_CORRECTED_PARTITIONED_FOLDER);
+ copyMetaDataCacheToTempReplacingInternalPaths("parquet/4203_corrupt_dates/" +
+ "mixed_version_partitioned_metadata.requires_replace.txt", MIXED_CORRUPTED_AND_CORRECTED_PARTITIONED_FOLDER);
// for sanity, try reading all partitions without a filter
TestBuilder builder = testBuilder()
.sqlQuery("select date_col from " + table)
@@ -520,7 +452,7 @@ public class TestCorruptParquetDateCorrection extends PlanTestBase {
* Read a directory with parquet files where some have corrupted dates, see DRILL-4203.
* @throws Exception
*/
- public void readMixedCorruptedAndCorrectedDates() throws Exception {
+ private void readMixedCorruptedAndCorrectedDates() throws Exception {
// ensure that selecting the date column explicitly or as part of a star still results
// in checking the file metadata for date columns (when we need to check the statistics
// for bad values) to set the flag that the values are corrupt
@@ -536,4 +468,90 @@ public class TestCorruptParquetDateCorrection extends PlanTestBase {
}
}
+
+ private void addDateBaselineVals(TestBuilder builder) {
+ builder
+ .baselineValues(new DateTime(1970, 1, 1, 0, 0))
+ .baselineValues(new DateTime(1970, 1, 2, 0, 0))
+ .baselineValues(new DateTime(1969, 12, 31, 0, 0))
+ .baselineValues(new DateTime(1969, 12, 30, 0, 0))
+ .baselineValues(new DateTime(1900, 1, 1, 0, 0))
+ .baselineValues(new DateTime(2015, 1, 1, 0, 0));
+ }
+
+ /**
+ * These are the same values added in the addDateBaselineVals, shifted as corrupt values
+ */
+ private void addCorruptedDateBaselineVals(TestBuilder builder) {
+ builder
+ .baselineValues(new DateTime(15334, 03, 17, 0, 0))
+ .baselineValues(new DateTime(15334, 03, 18, 0, 0))
+ .baselineValues(new DateTime(15334, 03, 15, 0, 0))
+ .baselineValues(new DateTime(15334, 03, 16, 0, 0))
+ .baselineValues(new DateTime(15264, 03, 16, 0, 0))
+ .baselineValues(new DateTime(15379, 03, 17, 0, 0));
+ }
+
+ private void readFilesWithUserDisabledAutoCorrection() throws Exception {
+ // ensure that selecting the date column explicitly or as part of a star still results
+ // in checking the file metadata for date columns (when we need to check the statistics
+ // for bad values) to set the flag that the values are corrupt
+ for (String selection : new String[] {"*", "date_col"}) {
+ TestBuilder builder = testBuilder()
+ .sqlQuery("select " + selection + " from table(dfs.`" + MIXED_CORRUPTED_AND_CORRECTED_DATES_PATH + "`" +
+ "(type => 'parquet', autoCorrectCorruptDates => false))")
+ .unOrdered()
+ .baselineColumns("date_col");
+ addDateBaselineVals(builder);
+ addDateBaselineVals(builder);
+ addCorruptedDateBaselineVals(builder);
+ addCorruptedDateBaselineVals(builder);
+ builder.go();
+ }
+ }
+
+ private static String replaceWorkingPathInString(String orig) {
+ return orig.replaceAll(Pattern.quote("[WORKING_PATH]"), Matcher.quoteReplacement(TestTools.getWorkingPath()));
+ }
+
+ private static void copyDirectoryIntoTempSpace(String resourcesDir) throws IOException {
+ copyDirectoryIntoTempSpace(resourcesDir, null);
+ }
+
+ private static void copyDirectoryIntoTempSpace(String resourcesDir, String destinationSubDir) throws IOException {
+ Path destination = path;
+ if (destinationSubDir != null) {
+ destination = new Path(path, destinationSubDir);
+ }
+ fs.copyFromLocalFile(
+ new Path(replaceWorkingPathInString(resourcesDir)),
+ destination);
+ }
+
+ /**
+ * Metadata cache files include full paths to the files that have been scanned.
+ *
+ * There is no way to generate a metadata cache file with absolute paths that
+ * will be guaranteed to be available on an arbitrary test machine.
+ *
+ * To enable testing older metadata cache files, they were generated manually
+ * using older drill versions, and the absolute path up to the folder where
+ * the metadata cache file appeared was manually replaced with the string
+ * REPLACED_IN_TEST. Here the file is re-written into the given temporary
+ * location after the REPLACED_IN_TEST string has been replaced by the actual
+ * location generated during this run of the tests.
+ *
+ * @param srcFileOnClassPath
+ * @param destFolderInTmp
+ * @throws IOException
+ */
+ private static void copyMetaDataCacheToTempReplacingInternalPaths(String srcFileOnClassPath, String destFolderInTmp)
+ throws IOException {
+ String metadataFileContents = getFile(srcFileOnClassPath);
+ Path newMetaCache = new Path(new Path(path, destFolderInTmp), ".drill.parquet_metadata");
+ FSDataOutputStream outSteam = fs.create(newMetaCache);
+ outSteam.writeBytes(metadataFileContents.replace("REPLACED_IN_TEST", path.toString()));
+ outSteam.close();
+ }
+
}
http://git-wip-us.apache.org/repos/asf/drill/blob/8461d10b/exec/java-exec/src/test/resources/parquet/4203_corrupt_dates/1_9_0_partitioned_no_corruption/0_0_1.parquet
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/resources/parquet/4203_corrupt_dates/1_9_0_partitioned_no_corruption/0_0_1.parquet b/exec/java-exec/src/test/resources/parquet/4203_corrupt_dates/1_9_0_partitioned_no_corruption/0_0_1.parquet
index 7aa8e61..0ec76c8 100644
Binary files a/exec/java-exec/src/test/resources/parquet/4203_corrupt_dates/1_9_0_partitioned_no_corruption/0_0_1.parquet and b/exec/java-exec/src/test/resources/parquet/4203_corrupt_dates/1_9_0_partitioned_no_corruption/0_0_1.parquet differ
http://git-wip-us.apache.org/repos/asf/drill/blob/8461d10b/exec/java-exec/src/test/resources/parquet/4203_corrupt_dates/1_9_0_partitioned_no_corruption/0_0_2.parquet
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/resources/parquet/4203_corrupt_dates/1_9_0_partitioned_no_corruption/0_0_2.parquet b/exec/java-exec/src/test/resources/parquet/4203_corrupt_dates/1_9_0_partitioned_no_corruption/0_0_2.parquet
index 8c43cee..b43ab9d 100644
Binary files a/exec/java-exec/src/test/resources/parquet/4203_corrupt_dates/1_9_0_partitioned_no_corruption/0_0_2.parquet and b/exec/java-exec/src/test/resources/parquet/4203_corrupt_dates/1_9_0_partitioned_no_corruption/0_0_2.parquet differ
http://git-wip-us.apache.org/repos/asf/drill/blob/8461d10b/exec/java-exec/src/test/resources/parquet/4203_corrupt_dates/1_9_0_partitioned_no_corruption/0_0_3.parquet
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/resources/parquet/4203_corrupt_dates/1_9_0_partitioned_no_corruption/0_0_3.parquet b/exec/java-exec/src/test/resources/parquet/4203_corrupt_dates/1_9_0_partitioned_no_corruption/0_0_3.parquet
index 2d2415a..95d46bb 100644
Binary files a/exec/java-exec/src/test/resources/parquet/4203_corrupt_dates/1_9_0_partitioned_no_corruption/0_0_3.parquet and b/exec/java-exec/src/test/resources/parquet/4203_corrupt_dates/1_9_0_partitioned_no_corruption/0_0_3.parquet differ
http://git-wip-us.apache.org/repos/asf/drill/blob/8461d10b/exec/java-exec/src/test/resources/parquet/4203_corrupt_dates/1_9_0_partitioned_no_corruption/0_0_4.parquet
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/resources/parquet/4203_corrupt_dates/1_9_0_partitioned_no_corruption/0_0_4.parquet b/exec/java-exec/src/test/resources/parquet/4203_corrupt_dates/1_9_0_partitioned_no_corruption/0_0_4.parquet
index ff5ce24..9f7269e 100644
Binary files a/exec/java-exec/src/test/resources/parquet/4203_corrupt_dates/1_9_0_partitioned_no_corruption/0_0_4.parquet and b/exec/java-exec/src/test/resources/parquet/4203_corrupt_dates/1_9_0_partitioned_no_corruption/0_0_4.parquet differ
http://git-wip-us.apache.org/repos/asf/drill/blob/8461d10b/exec/java-exec/src/test/resources/parquet/4203_corrupt_dates/1_9_0_partitioned_no_corruption/0_0_5.parquet
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/resources/parquet/4203_corrupt_dates/1_9_0_partitioned_no_corruption/0_0_5.parquet b/exec/java-exec/src/test/resources/parquet/4203_corrupt_dates/1_9_0_partitioned_no_corruption/0_0_5.parquet
index 4a5d4fb..b0d370a 100644
Binary files a/exec/java-exec/src/test/resources/parquet/4203_corrupt_dates/1_9_0_partitioned_no_corruption/0_0_5.parquet and b/exec/java-exec/src/test/resources/parquet/4203_corrupt_dates/1_9_0_partitioned_no_corruption/0_0_5.parquet differ
http://git-wip-us.apache.org/repos/asf/drill/blob/8461d10b/exec/java-exec/src/test/resources/parquet/4203_corrupt_dates/1_9_0_partitioned_no_corruption/0_0_6.parquet
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/resources/parquet/4203_corrupt_dates/1_9_0_partitioned_no_corruption/0_0_6.parquet b/exec/java-exec/src/test/resources/parquet/4203_corrupt_dates/1_9_0_partitioned_no_corruption/0_0_6.parquet
index da7ad3f..a5c7b86 100644
Binary files a/exec/java-exec/src/test/resources/parquet/4203_corrupt_dates/1_9_0_partitioned_no_corruption/0_0_6.parquet and b/exec/java-exec/src/test/resources/parquet/4203_corrupt_dates/1_9_0_partitioned_no_corruption/0_0_6.parquet differ
http://git-wip-us.apache.org/repos/asf/drill/blob/8461d10b/exec/java-exec/src/test/resources/parquet/4203_corrupt_dates/hive1dot2_fewtypes_null/000000_0
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/resources/parquet/4203_corrupt_dates/hive1dot2_fewtypes_null/000000_0 b/exec/java-exec/src/test/resources/parquet/4203_corrupt_dates/hive1dot2_fewtypes_null/000000_0
new file mode 100644
index 0000000..89528e3
Binary files /dev/null and b/exec/java-exec/src/test/resources/parquet/4203_corrupt_dates/hive1dot2_fewtypes_null/000000_0 differ
http://git-wip-us.apache.org/repos/asf/drill/blob/8461d10b/exec/java-exec/src/test/resources/parquet/4203_corrupt_dates/mixed_drill_versions/4203_corrected_dates.parquet
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/resources/parquet/4203_corrupt_dates/mixed_drill_versions/4203_corrected_dates.parquet b/exec/java-exec/src/test/resources/parquet/4203_corrupt_dates/mixed_drill_versions/4203_corrected_dates.parquet
index cee02a2..3d46f56 100644
Binary files a/exec/java-exec/src/test/resources/parquet/4203_corrupt_dates/mixed_drill_versions/4203_corrected_dates.parquet and b/exec/java-exec/src/test/resources/parquet/4203_corrupt_dates/mixed_drill_versions/4203_corrected_dates.parquet differ
http://git-wip-us.apache.org/repos/asf/drill/blob/8461d10b/exec/java-exec/src/test/resources/parquet/4203_corrupt_dates/mixed_version_partitioned_metadata.requires_replace.txt
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/resources/parquet/4203_corrupt_dates/mixed_version_partitioned_metadata.requires_replace.txt b/exec/java-exec/src/test/resources/parquet/4203_corrupt_dates/mixed_version_partitioned_metadata.requires_replace.txt
index 7fdb5b2..bb6e282 100644
--- a/exec/java-exec/src/test/resources/parquet/4203_corrupt_dates/mixed_version_partitioned_metadata.requires_replace.txt
+++ b/exec/java-exec/src/test/resources/parquet/4203_corrupt_dates/mixed_version_partitioned_metadata.requires_replace.txt
@@ -297,5 +297,5 @@
} ]
} ],
"directories" : [ "file:REPLACED_IN_TEST/mixed_partitioned/1_9_0_partitioned_no_corruption", "file:REPLACED_IN_TEST/mixed_partitioned/partitioned_with_corruption_4203" ],
- "drillVersion" : "1.9.0"
+ "isDateCorrect" : "true"
}