You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by pa...@apache.org on 2016/10/18 23:45:23 UTC

[03/11] drill git commit: DRILL-4203: Parquet File. Date is stored wrongly - Added new extra field in the parquet meta info "is.date.correct = true"; - Removed unnecessary double conversion of value with Julian day; - Added ability to correct corrupted d

DRILL-4203: Parquet File. Date is stored wrongly - Added new extra field in the parquet meta info "is.date.correct = true"; - Removed unnecessary double conversion of value with Julian day; - Added ability to correct corrupted dates for parquet files with the second version old metadata cache file as well.

This closes #595


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/8461d10b
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/8461d10b
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/8461d10b

Branch: refs/heads/master
Commit: 8461d10b4fd6ce56361d1d826bb3a38b6dc8473c
Parents: ae34d5c
Author: Vitalii Diravka <vi...@gmail.com>
Authored: Mon Sep 19 12:59:27 2016 +0000
Committer: Parth Chandra <pa...@apache.org>
Committed: Fri Oct 14 11:08:07 2016 -0700

----------------------------------------------------------------------
 .../hive/HiveDrillNativeScanBatchCreator.java   |   1 +
 .../templates/ParquetOutputRecordWriter.java    |   8 +-
 .../drill/exec/store/parquet/Metadata.java      |  34 ++-
 .../exec/store/parquet/ParquetFormatConfig.java |   5 +-
 .../exec/store/parquet/ParquetGroupScan.java    |  19 +-
 .../store/parquet/ParquetReaderUtility.java     | 167 ++++++++-------
 .../exec/store/parquet/ParquetRecordWriter.java |   2 +
 .../store/parquet/ParquetScanBatchCreator.java  |   1 +
 .../columnreaders/FixedByteAlignedReader.java   |  10 +-
 .../NullableFixedByteAlignedReaders.java        |  10 +-
 .../columnreaders/ParquetRecordReader.java      |   8 -
 .../parquet2/DrillParquetGroupConverter.java    |  10 +-
 .../TestCorruptParquetDateCorrection.java       | 210 ++++++++++---------
 .../0_0_1.parquet                               | Bin 257 -> 290 bytes
 .../0_0_2.parquet                               | Bin 257 -> 290 bytes
 .../0_0_3.parquet                               | Bin 257 -> 290 bytes
 .../0_0_4.parquet                               | Bin 257 -> 290 bytes
 .../0_0_5.parquet                               | Bin 257 -> 290 bytes
 .../0_0_6.parquet                               | Bin 257 -> 290 bytes
 .../hive1dot2_fewtypes_null/000000_0            | Bin 0 -> 2569 bytes
 .../4203_corrected_dates.parquet                | Bin 278 -> 311 bytes
 ...on_partitioned_metadata.requires_replace.txt |   2 +-
 22 files changed, 261 insertions(+), 226 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/8461d10b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveDrillNativeScanBatchCreator.java
----------------------------------------------------------------------
diff --git a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveDrillNativeScanBatchCreator.java b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveDrillNativeScanBatchCreator.java
index 1ded153..d78c620 100644
--- a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveDrillNativeScanBatchCreator.java
+++ b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveDrillNativeScanBatchCreator.java
@@ -123,6 +123,7 @@ public class HiveDrillNativeScanBatchCreator implements BatchCreator<HiveDrillNa
           // in the first row group
           ParquetReaderUtility.DateCorruptionStatus containsCorruptDates =
               ParquetReaderUtility.detectCorruptDates(parquetMetadata, config.getColumns(), true);
+          logger.info(containsCorruptDates.toString());
           readers.add(new ParquetRecordReader(
                   context,
                   Path.getPathWithoutSchemeAndAuthority(finalPath).toString(),

http://git-wip-us.apache.org/repos/asf/drill/blob/8461d10b/exec/java-exec/src/main/codegen/templates/ParquetOutputRecordWriter.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/codegen/templates/ParquetOutputRecordWriter.java b/exec/java-exec/src/main/codegen/templates/ParquetOutputRecordWriter.java
index aac0f0c..0af3527 100644
--- a/exec/java-exec/src/main/codegen/templates/ParquetOutputRecordWriter.java
+++ b/exec/java-exec/src/main/codegen/templates/ParquetOutputRecordWriter.java
@@ -16,7 +16,6 @@
  * limitations under the License.
  */
 
-import org.joda.time.DateTimeUtils;
 import org.apache.parquet.io.api.Binary;
 
 import java.lang.Override;
@@ -49,7 +48,7 @@ import org.apache.drill.exec.record.MaterializedField;
 
 import org.apache.drill.common.types.TypeProtos;
 
-import org.joda.time.DateTimeUtils;
+import org.joda.time.DateTimeConstants;
 
 import java.io.IOException;
 import java.lang.UnsupportedOperationException;
@@ -71,7 +70,6 @@ public abstract class ParquetOutputRecordWriter extends AbstractRecordWriter imp
 
   private RecordConsumer consumer;
   private MessageType schema;
-  public static final long JULIAN_DAY_EPOC = DateTimeUtils.toJulianDayNumber(0);
 
   public void setUp(MessageType schema, RecordConsumer consumer) {
     this.schema = schema;
@@ -156,12 +154,12 @@ public abstract class ParquetOutputRecordWriter extends AbstractRecordWriter imp
   <#elseif minor.class == "Date">
     <#if mode.prefix == "Repeated" >
       reader.read(i, holder);
-      consumer.addInteger((int) (DateTimeUtils.toJulianDayNumber(holder.value) - JULIAN_DAY_EPOC));
+      consumer.addInteger((int) (holder.value / DateTimeConstants.MILLIS_PER_DAY));
     <#else>
       consumer.startField(fieldName, fieldId);
       reader.read(holder);
       // convert from internal Drill date format to Julian Day centered around Unix Epoc
-      consumer.addInteger((int) (DateTimeUtils.toJulianDayNumber(holder.value) - JULIAN_DAY_EPOC));
+      consumer.addInteger((int) (holder.value / DateTimeConstants.MILLIS_PER_DAY));
       consumer.endField(fieldName, fieldId);
     </#if>
   <#elseif

http://git-wip-us.apache.org/repos/asf/drill/blob/8461d10b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/Metadata.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/Metadata.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/Metadata.java
index d6a739d..f5554c9 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/Metadata.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/Metadata.java
@@ -49,13 +49,13 @@ import org.apache.parquet.schema.MessageType;
 import org.apache.parquet.schema.OriginalType;
 import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName;
 import org.apache.parquet.schema.Type;
-import org.codehaus.jackson.annotate.JsonIgnore;
 import org.apache.commons.lang3.tuple.Pair;
 
 import com.fasterxml.jackson.annotation.JsonProperty;
 import com.fasterxml.jackson.annotation.JsonSubTypes;
 import com.fasterxml.jackson.annotation.JsonTypeInfo;
 import com.fasterxml.jackson.annotation.JsonTypeName;
+import com.fasterxml.jackson.annotation.JsonIgnore;
 import com.fasterxml.jackson.core.JsonFactory;
 import com.fasterxml.jackson.core.JsonGenerator;
 import com.fasterxml.jackson.core.JsonGenerator.Feature;
@@ -184,7 +184,7 @@ public class Metadata {
         childFiles.add(file);
       }
     }
-    ParquetTableMetadata_v2 parquetTableMetadata = new ParquetTableMetadata_v2();
+    ParquetTableMetadata_v2 parquetTableMetadata = new ParquetTableMetadata_v2(true);
     if (childFiles.size() > 0) {
       List<ParquetFileMetadata_v2> childFilesMetadata =
           getParquetFileMetadata_v2(parquetTableMetadata, childFiles);
@@ -353,6 +353,7 @@ public class Metadata {
     ALL_COLS.add(AbstractRecordReader.STAR_COLUMN);
     boolean autoCorrectCorruptDates = formatConfig.autoCorrectCorruptDates;
     ParquetReaderUtility.DateCorruptionStatus containsCorruptDates = ParquetReaderUtility.detectCorruptDates(metadata, ALL_COLS, autoCorrectCorruptDates);
+    logger.info(containsCorruptDates.toString());
     for (BlockMetaData rowGroup : metadata.getBlocks()) {
       List<ColumnMetadata_v2> columnMetadataList = Lists.newArrayList();
       long length = 0;
@@ -596,7 +597,10 @@ public class Metadata {
     @JsonIgnore public abstract OriginalType getOriginalType(String[] columnName);
 
     @JsonIgnore public abstract ParquetTableMetadataBase clone();
+
     @JsonIgnore public abstract String getDrillVersion();
+
+    @JsonIgnore public abstract boolean isDateCorrect();
   }
 
   public static abstract class ParquetFileMetadata {
@@ -635,16 +639,14 @@ public class Metadata {
      *
      * This object would just be immutable, but due to Drill-4203 we need to correct
      * date values that had been corrupted by earlier versions of Drill.
-     * @return
      */
     public abstract void setMax(Object newMax);
 
     /**
-     * Set the max value recorded in the parquet metadata statistics.
+     * Set the min value recorded in the parquet metadata statistics.
      *
      * This object would just be immutable, but due to Drill-4203 we need to correct
      * date values that had been corrupted by earlier versions of Drill.
-     * @return
      */
     public abstract void setMin(Object newMax);
 
@@ -711,10 +713,16 @@ public class Metadata {
     @JsonIgnore @Override public ParquetTableMetadataBase clone() {
       return new ParquetTableMetadata_v1(files, directories);
     }
+
     @Override
     public String getDrillVersion() {
       return null;
     }
+
+    @JsonIgnore @Override
+    public boolean isDateCorrect() {
+      return false;
+    }
   }
 
 
@@ -919,9 +927,15 @@ public class Metadata {
     @JsonProperty List<ParquetFileMetadata_v2> files;
     @JsonProperty List<String> directories;
     @JsonProperty String drillVersion;
+    @JsonProperty boolean isDateCorrect;
 
     public ParquetTableMetadata_v2() {
+      super();
+    }
+
+    public ParquetTableMetadata_v2(boolean isDateCorrect) {
       this.drillVersion = DrillVersionInfo.getVersion();
+      this.isDateCorrect = isDateCorrect;
     }
 
     public ParquetTableMetadata_v2(ParquetTableMetadataBase parquetTable,
@@ -930,6 +944,7 @@ public class Metadata {
       this.directories = directories;
       this.columnTypeInfo = ((ParquetTableMetadata_v2) parquetTable).columnTypeInfo;
       this.drillVersion = DrillVersionInfo.getVersion();
+      this.isDateCorrect = true;
     }
 
     public ParquetTableMetadata_v2(List<ParquetFileMetadata_v2> files, List<String> directories,
@@ -970,11 +985,16 @@ public class Metadata {
     @JsonIgnore @Override public ParquetTableMetadataBase clone() {
       return new ParquetTableMetadata_v2(files, directories, columnTypeInfo);
     }
-    @Override
+
+    @JsonIgnore @Override
     public String getDrillVersion() {
       return drillVersion;
     }
 
+    @JsonIgnore @Override public boolean isDateCorrect() {
+      return isDateCorrect;
+    }
+
   }
 
 
@@ -1187,7 +1207,7 @@ public class Metadata {
     }
 
     @Override public PrimitiveTypeName getPrimitiveType() {
-      return null;
+      return primitiveType;
     }
 
     @Override public OriginalType getOriginalType() {

http://git-wip-us.apache.org/repos/asf/drill/blob/8461d10b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFormatConfig.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFormatConfig.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFormatConfig.java
index 9ba03df..b33186e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFormatConfig.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFormatConfig.java
@@ -17,12 +17,13 @@
  */
 package org.apache.drill.exec.store.parquet;
 
+import com.fasterxml.jackson.annotation.JsonInclude;
 import com.fasterxml.jackson.annotation.JsonProperty;
 import org.apache.drill.common.logical.FormatPluginConfig;
 
 import com.fasterxml.jackson.annotation.JsonTypeName;
 
-@JsonTypeName("parquet")
+@JsonTypeName("parquet") @JsonInclude(JsonInclude.Include.NON_DEFAULT)
 public class ParquetFormatConfig implements FormatPluginConfig{
 
   public boolean autoCorrectCorruptDates = true;
@@ -44,6 +45,6 @@ public class ParquetFormatConfig implements FormatPluginConfig{
 
   @Override
   public int hashCode() {
-    return (autoCorrectCorruptDates ? 1 : 0);
+    return (autoCorrectCorruptDates ? 1231 : 1237);
   }
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/8461d10b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java
index 649282b..b9f0ac0 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java
@@ -80,7 +80,7 @@ import org.apache.drill.exec.vector.NullableVarCharVector;
 import org.apache.drill.exec.vector.ValueVector;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.Path;
-import org.joda.time.DateTimeUtils;
+import org.joda.time.DateTimeConstants;
 import org.apache.parquet.io.api.Binary;
 import org.apache.parquet.schema.OriginalType;
 import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName;
@@ -479,8 +479,7 @@ public class ParquetGroupScan extends AbstractFileGroupScan {
       case DATE: {
         NullableDateVector dateVector = (NullableDateVector) v;
         Integer value = (Integer) partitionValueMap.get(f).get(column);
-        dateVector.getMutator().setSafe(index, DateTimeUtils.fromJulianDay(
-            value + ParquetReaderUtility.SHIFT_PARQUET_DAY_COUNT_TO_JULIAN_DAY));
+        dateVector.getMutator().setSafe(index, value * (long) DateTimeConstants.MILLIS_PER_DAY);
         return;
       }
       case TIME: {
@@ -668,9 +667,7 @@ public class ParquetGroupScan extends AbstractFileGroupScan {
       }
       if (metaPath != null && fs.exists(metaPath)) {
         usedMetadataCache = true;
-        if (parquetTableMetadata == null) {
-          parquetTableMetadata = Metadata.readBlockMeta(fs, metaPath.toString(), metaContext, formatConfig);
-        }
+        parquetTableMetadata = Metadata.readBlockMeta(fs, metaPath.toString(), metaContext, formatConfig);
       } else {
         parquetTableMetadata = Metadata.getParquetTableMetadata(fs, p.toString(), formatConfig);
       }
@@ -683,15 +680,7 @@ public class ParquetGroupScan extends AbstractFileGroupScan {
           parquetTableMetadata = Metadata.readBlockMeta(fs, metaPath.toString(), metaContext, formatConfig);
         }
         if (fileSet != null) {
-          if (parquetTableMetadata == null) {
-            parquetTableMetadata = removeUnneededRowGroups(Metadata.readBlockMeta(fs, metaPath.toString(), metaContext, formatConfig));
-          } else {
-            parquetTableMetadata = removeUnneededRowGroups(parquetTableMetadata);
-          }
-        } else {
-          if (parquetTableMetadata == null) {
-            parquetTableMetadata = Metadata.readBlockMeta(fs, metaPath.toString(), metaContext, formatConfig);
-          }
+          parquetTableMetadata = removeUnneededRowGroups(parquetTableMetadata);
         }
       } else {
         final List<FileStatus> fileStatuses = Lists.newArrayList();

http://git-wip-us.apache.org/repos/asf/drill/blob/8461d10b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetReaderUtility.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetReaderUtility.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetReaderUtility.java
index 9d0886f..1f6dc1e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetReaderUtility.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetReaderUtility.java
@@ -23,7 +23,6 @@ import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.exec.planner.physical.PlannerSettings;
 import org.apache.drill.exec.server.options.OptionManager;
 import org.apache.drill.exec.store.AbstractRecordReader;
-import org.apache.drill.exec.store.ParquetOutputRecordWriter;
 import org.apache.drill.exec.work.ExecErrorConstants;
 import org.apache.parquet.SemanticVersion;
 import org.apache.parquet.VersionParser;
@@ -39,29 +38,37 @@ import org.apache.parquet.hadoop.metadata.ColumnPath;
 import org.apache.parquet.hadoop.metadata.ParquetMetadata;
 import org.apache.parquet.schema.OriginalType;
 import org.joda.time.Chronology;
-import org.joda.time.DateTimeUtils;
+import org.joda.time.DateTimeConstants;
 
+import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
-/*
+/**
  * Utility class where we can capture common logic between the two parquet readers
  */
 public class ParquetReaderUtility {
   private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ParquetReaderUtility.class);
 
-  // Note the negation symbol in the beginning
-  public static final double CORRECT_CORRUPT_DATE_SHIFT = -ParquetOutputRecordWriter.JULIAN_DAY_EPOC - 0.5;
-  public static final double SHIFT_PARQUET_DAY_COUNT_TO_JULIAN_DAY = ParquetOutputRecordWriter.JULIAN_DAY_EPOC - 0.5;
-  // The year 5000 is the threshold for auto-detecting date corruption.
+  /**
+   * Number of days between Julian day epoch (January 1, 4713 BC) and Unix day epoch (January 1, 1970).
+   * The value of this constant is {@value}.
+   */
+  public static final long JULIAN_DAY_NUMBER_FOR_UNIX_EPOCH = 2440588;
+  /**
+   * All old parquet files (which haven't "is.date.correct=true" property in metadata) have
+   * a corrupt date shift: {@value} days or 2 * {@value #JULIAN_DAY_NUMBER_FOR_UNIX_EPOCH}
+   */
+  public static final long CORRECT_CORRUPT_DATE_SHIFT = 2 * JULIAN_DAY_NUMBER_FOR_UNIX_EPOCH;
+  // The year 5000 (or 1106685 day from Unix epoch) is chosen as the threshold for auto-detecting date corruption.
   // This balances two possible cases of bad auto-correction. External tools writing dates in the future will not
   // be shifted unless they are past this threshold (and we cannot identify them as external files based on the metadata).
   // On the other hand, historical dates written with Drill wouldn't risk being incorrectly shifted unless they were
   // something like 10,000 years in the past.
   private static final Chronology UTC = org.joda.time.chrono.ISOChronology.getInstanceUTC();
   public static final int DATE_CORRUPTION_THRESHOLD =
-      (int) (DateTimeUtils.toJulianDayNumber(UTC.getDateTimeMillis(5000, 1, 1, 0)) - ParquetOutputRecordWriter.JULIAN_DAY_EPOC);
+      (int) (UTC.getDateTimeMillis(5000, 1, 1, 0) / DateTimeConstants.MILLIS_PER_DAY);
 
   /**
    * For most recently created parquet files, we can determine if we have corrupted dates (see DRILL-4203)
@@ -69,9 +76,24 @@ public class ParquetReaderUtility {
    * in the data pages themselves to see if they are likely corrupt.
    */
   public enum DateCorruptionStatus {
-    META_SHOWS_CORRUPTION, // metadata can determine if the values are definitely CORRUPT
-    META_SHOWS_NO_CORRUPTION, // metadata can determine if the values are definitely CORRECT
-    META_UNCLEAR_TEST_VALUES // not enough info in metadata, parquet reader must test individual values
+    META_SHOWS_CORRUPTION{
+      @Override
+      public String toString(){
+        return "It is determined from metadata that the date values are definitely CORRUPT";
+      }
+    },
+    META_SHOWS_NO_CORRUPTION {
+      @Override
+      public String toString(){
+        return "It is determined from metadata that the date values are definitely CORRECT";
+      }
+    },
+    META_UNCLEAR_TEST_VALUES {
+      @Override
+      public String toString(){
+        return "Not enough info in metadata, parquet reader will test individual date values";
+      }
+    }
   }
 
   public static void checkDecimalTypeEnabled(OptionManager options) {
@@ -102,34 +124,46 @@ public class ParquetReaderUtility {
   }
 
   public static int autoCorrectCorruptedDate(int corruptedDate) {
-    return (int) (corruptedDate - 2 * ParquetOutputRecordWriter.JULIAN_DAY_EPOC);
+    return (int) (corruptedDate - CORRECT_CORRUPT_DATE_SHIFT);
   }
 
   public static void correctDatesInMetadataCache(Metadata.ParquetTableMetadataBase parquetTableMetadata) {
-    DateCorruptionStatus cacheFileContainsCorruptDates;
-    String drillVersionStr = parquetTableMetadata.getDrillVersion();
-    if (drillVersionStr != null) {
-      try {
-        cacheFileContainsCorruptDates = ParquetReaderUtility.drillVersionHasCorruptedDates(drillVersionStr);
-      } catch (VersionParser.VersionParseException e) {
-        cacheFileContainsCorruptDates = DateCorruptionStatus.META_SHOWS_CORRUPTION;
-      }
-    } else {
-      cacheFileContainsCorruptDates = DateCorruptionStatus.META_SHOWS_CORRUPTION;
-    }
+    boolean isDateCorrect = parquetTableMetadata.isDateCorrect();
+    DateCorruptionStatus cacheFileContainsCorruptDates = isDateCorrect ?
+        DateCorruptionStatus.META_SHOWS_NO_CORRUPTION : DateCorruptionStatus.META_SHOWS_CORRUPTION;
     if (cacheFileContainsCorruptDates == DateCorruptionStatus.META_SHOWS_CORRUPTION) {
+      // Looking for the DATE data type of column names in the metadata cache file ("metadata_version" : "v2")
+      String[] names = new String[0];
+      if (parquetTableMetadata instanceof Metadata.ParquetTableMetadata_v2) {
+        for (Metadata.ColumnTypeMetadata_v2 columnTypeMetadata :
+            ((Metadata.ParquetTableMetadata_v2) parquetTableMetadata).columnTypeInfo.values()) {
+          if (OriginalType.DATE.equals(columnTypeMetadata.originalType)) {
+            names = columnTypeMetadata.name;
+          }
+        }
+      }
       for (Metadata.ParquetFileMetadata file : parquetTableMetadata.getFiles()) {
         // Drill has only ever written a single row group per file, only need to correct the statistics
         // on the first row group
         Metadata.RowGroupMetadata rowGroupMetadata = file.getRowGroups().get(0);
         for (Metadata.ColumnMetadata columnMetadata : rowGroupMetadata.getColumns()) {
-          OriginalType originalType = columnMetadata.getOriginalType();
-          if (originalType != null && originalType.equals(OriginalType.DATE) &&
-              columnMetadata.hasSingleValue() &&
-              (Integer) columnMetadata.getMaxValue() > ParquetReaderUtility.DATE_CORRUPTION_THRESHOLD) {
-            int newMinMax = ParquetReaderUtility.autoCorrectCorruptedDate((Integer)columnMetadata.getMaxValue());
-            columnMetadata.setMax(newMinMax);
-            columnMetadata.setMin(newMinMax);
+          // Setting Min/Max values for ParquetTableMetadata_v1
+          if (parquetTableMetadata instanceof Metadata.ParquetTableMetadata_v1) {
+            OriginalType originalType = columnMetadata.getOriginalType();
+            if (OriginalType.DATE.equals(originalType) && columnMetadata.hasSingleValue() &&
+                (Integer) columnMetadata.getMaxValue() > ParquetReaderUtility.DATE_CORRUPTION_THRESHOLD) {
+              int newMinMax = ParquetReaderUtility.autoCorrectCorruptedDate((Integer)columnMetadata.getMaxValue());
+              columnMetadata.setMax(newMinMax);
+              columnMetadata.setMin(newMinMax);
+            }
+          }
+          // Setting Max values for ParquetTableMetadata_v2
+          else if (parquetTableMetadata instanceof Metadata.ParquetTableMetadata_v2 &&
+              columnMetadata.getName() != null && Arrays.equals(columnMetadata.getName(), names) &&
+              columnMetadata.hasSingleValue() && (Integer) columnMetadata.getMaxValue() >
+              ParquetReaderUtility.DATE_CORRUPTION_THRESHOLD) {
+            int newMax = ParquetReaderUtility.autoCorrectCorruptedDate((Integer) columnMetadata.getMaxValue());
+            columnMetadata.setMax(newMax);
           }
         }
       }
@@ -138,9 +172,6 @@ public class ParquetReaderUtility {
 
   /**
    * Check for corrupted dates in a parquet file. See Drill-4203
-   * @param footer
-   * @param columns
-   * @return
    */
   public static DateCorruptionStatus detectCorruptDates(ParquetMetadata footer,
                                            List<SchemaPath> columns,
@@ -152,60 +183,43 @@ public class ParquetReaderUtility {
 
     // migrated parquet files have 1.8.1 parquet-mr version with drill-r0 in the part of the name usually containing "SNAPSHOT"
 
-    // new parquet files 1.4+ have drill version number
-    //  - below 1.9.0 dates are corrupt
-    //  - this includes 1.9.0-SNAPSHOT
+    // new parquet files are generated with "is.date.correct" property have no corruption dates
 
-    String drillVersion = footer.getFileMetaData().getKeyValueMetaData().get(ParquetRecordWriter.DRILL_VERSION_PROPERTY);
     String createdBy = footer.getFileMetaData().getCreatedBy();
-    try {
-      if (drillVersion == null) {
-        // Possibly an old, un-migrated Drill file, check the column statistics to see if min/max values look corrupt
-        // only applies if there is a date column selected
-        if (createdBy.equals("parquet-mr")) {
-          // loop through parquet column metadata to find date columns, check for corrupt valuues
-          return checkForCorruptDateValuesInStatistics(footer, columns, autoCorrectCorruptDates);
-        } else {
-          // check the created by to see if it is a migrated Drill file
+    String drillVersion = footer.getFileMetaData().getKeyValueMetaData().get(ParquetRecordWriter.DRILL_VERSION_PROPERTY);
+    String isDateCorrect = footer.getFileMetaData().getKeyValueMetaData().get(ParquetRecordWriter.IS_DATE_CORRECT_PROPERTY);
+    if (drillVersion != null) {
+      return Boolean.valueOf(isDateCorrect) ? DateCorruptionStatus.META_SHOWS_NO_CORRUPTION
+          : DateCorruptionStatus.META_SHOWS_CORRUPTION;
+    } else {
+      // Possibly an old, un-migrated Drill file, check the column statistics to see if min/max values look corrupt
+      // only applies if there is a date column selected
+      if (createdBy.equals("parquet-mr")) {
+        // loop through parquet column metadata to find date columns, check for corrupt values
+        return checkForCorruptDateValuesInStatistics(footer, columns, autoCorrectCorruptDates);
+      } else {
+        // check the created by to see if it is a migrated Drill file
+        try {
           VersionParser.ParsedVersion parsedCreatedByVersion = VersionParser.parse(createdBy);
           // check if this is a migrated Drill file, lacking a Drill version number, but with
           // "drill" in the parquet created-by string
-          SemanticVersion semVer = parsedCreatedByVersion.getSemanticVersion();
-          String pre = semVer.pre + "";
-          if (semVer != null && semVer.major == 1 && semVer.minor == 8 && semVer.patch == 1 && pre.contains("drill")) {
-            return DateCorruptionStatus.META_SHOWS_CORRUPTION;
-          } else {
-            // written by a tool that wasn't Drill, the dates are not corrupted
-            return DateCorruptionStatus.META_SHOWS_NO_CORRUPTION;
+          if (parsedCreatedByVersion.hasSemanticVersion()) {
+            SemanticVersion semVer = parsedCreatedByVersion.getSemanticVersion();
+            String pre = semVer.pre + "";
+            if (semVer.major == 1 && semVer.minor == 8 && semVer.patch == 1 && pre.contains("drill")) {
+              return DateCorruptionStatus.META_SHOWS_CORRUPTION;
+            }
           }
+          // written by a tool that wasn't Drill, the dates are not corrupted
+          return DateCorruptionStatus.META_SHOWS_NO_CORRUPTION;
+        } catch (VersionParser.VersionParseException e) {
+          // If we couldn't parse "created by" field, check column metadata of date columns
+          return checkForCorruptDateValuesInStatistics(footer, columns, autoCorrectCorruptDates);
         }
-      } else {
-        // this parser expects an application name before the semantic version, just prepending Drill
-        // we know from the property name "drill.version" that we wrote this
-        return drillVersionHasCorruptedDates(drillVersion);
       }
-    } catch (VersionParser.VersionParseException e) {
-      // Default value of "false" if we cannot parse the version is fine, we are covering all
-      // of the metadata values produced by historical versions of Drill
-      // If Drill didn't write it the dates should be fine
-      return DateCorruptionStatus.META_SHOWS_CORRUPTION;
     }
   }
 
-  public static DateCorruptionStatus drillVersionHasCorruptedDates(String drillVersion) throws VersionParser.VersionParseException {
-    VersionParser.ParsedVersion parsedDrillVersion = parseDrillVersion(drillVersion);
-    SemanticVersion semVer = parsedDrillVersion.getSemanticVersion();
-    if (semVer == null || semVer.compareTo(new SemanticVersion(1, 9, 0)) < 0) {
-      return DateCorruptionStatus.META_SHOWS_CORRUPTION;
-    } else {
-      return DateCorruptionStatus.META_SHOWS_NO_CORRUPTION;
-    }
-
-  }
-
-  public static VersionParser.ParsedVersion parseDrillVersion(String drillVersion) throws VersionParser.VersionParseException {
-    return VersionParser.parse("drill version " + drillVersion + " (build 1234)");
-  }
 
   /**
    * Detect corrupt date values by looking at the min/max values in the metadata.
@@ -224,7 +238,6 @@ public class ParquetReaderUtility {
    *                                of corrupt dates. There are some rare cases (storing dates thousands
    *                                of years into the future, with tools other than Drill writing files)
    *                                that would result in the date values being "corrected" into bad values.
-   * @return
    */
   public static DateCorruptionStatus checkForCorruptDateValuesInStatistics(ParquetMetadata footer,
                                                               List<SchemaPath> columns,

http://git-wip-us.apache.org/repos/asf/drill/blob/8461d10b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordWriter.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordWriter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordWriter.java
index 3f2defd..bc6bb65 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordWriter.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordWriter.java
@@ -78,6 +78,7 @@ public class ParquetRecordWriter extends ParquetOutputRecordWriter {
   private static final int MAXIMUM_RECORD_COUNT_FOR_CHECK = 10000;
 
   public static final String DRILL_VERSION_PROPERTY = "drill.version";
+  public static final String IS_DATE_CORRECT_PROPERTY = "is.date.correct";
 
   private ParquetFileWriter parquetFileWriter;
   private MessageType schema;
@@ -115,6 +116,7 @@ public class ParquetRecordWriter extends ParquetOutputRecordWriter {
     this.partitionColumns = writer.getPartitionColumns();
     this.hasPartitions = partitionColumns != null && partitionColumns.size() > 0;
     this.extraMetaData.put(DRILL_VERSION_PROPERTY, DrillVersionInfo.getVersion());
+    this.extraMetaData.put(IS_DATE_CORRECT_PROPERTY, "true");
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/drill/blob/8461d10b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java
index 8f7ace1..bf13977 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java
@@ -107,6 +107,7 @@ public class ParquetScanBatchCreator implements BatchCreator<ParquetRowGroupScan
         boolean autoCorrectCorruptDates = rowGroupScan.formatConfig.autoCorrectCorruptDates;
         ParquetReaderUtility.DateCorruptionStatus containsCorruptDates = ParquetReaderUtility.detectCorruptDates(footers.get(e.getPath()), rowGroupScan.getColumns(),
                 autoCorrectCorruptDates);
+        logger.info(containsCorruptDates.toString());
         if (!context.getOptions().getOption(ExecConstants.PARQUET_NEW_RECORD_READER).bool_val && !isComplex(footers.get(e.getPath()))) {
           readers.add(
               new ParquetRecordReader(

http://git-wip-us.apache.org/repos/asf/drill/blob/8461d10b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/FixedByteAlignedReader.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/FixedByteAlignedReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/FixedByteAlignedReader.java
index cccb06f..f806ee4 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/FixedByteAlignedReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/FixedByteAlignedReader.java
@@ -33,7 +33,7 @@ import org.apache.drill.exec.vector.VariableWidthVector;
 import org.apache.parquet.column.ColumnDescriptor;
 import org.apache.parquet.format.SchemaElement;
 import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
-import org.joda.time.DateTimeUtils;
+import org.joda.time.DateTimeConstants;
 
 import io.netty.buffer.DrillBuf;
 
@@ -134,7 +134,7 @@ class FixedByteAlignedReader<V extends ValueVector> extends ColumnReader<V> {
         intValue = readIntLittleEndian(bytebuf, start);
       }
 
-      mutator.set(index, DateTimeUtils.fromJulianDay(intValue + ParquetReaderUtility.SHIFT_PARQUET_DAY_COUNT_TO_JULIAN_DAY));
+      mutator.set(index, intValue * (long) DateTimeConstants.MILLIS_PER_DAY);
     }
   }
 
@@ -160,7 +160,7 @@ class FixedByteAlignedReader<V extends ValueVector> extends ColumnReader<V> {
         intValue = readIntLittleEndian(bytebuf, start);
       }
 
-      mutator.set(index, DateTimeUtils.fromJulianDay(intValue + ParquetReaderUtility.CORRECT_CORRUPT_DATE_SHIFT));
+      mutator.set(index, (intValue - ParquetReaderUtility.CORRECT_CORRUPT_DATE_SHIFT) * DateTimeConstants.MILLIS_PER_DAY);
     }
 
   }
@@ -191,9 +191,9 @@ class FixedByteAlignedReader<V extends ValueVector> extends ColumnReader<V> {
       }
 
       if (intValue > ParquetReaderUtility.DATE_CORRUPTION_THRESHOLD) {
-        mutator.set(index, DateTimeUtils.fromJulianDay(intValue + ParquetReaderUtility.CORRECT_CORRUPT_DATE_SHIFT));
+        mutator.set(index, (intValue - ParquetReaderUtility.CORRECT_CORRUPT_DATE_SHIFT) * DateTimeConstants.MILLIS_PER_DAY);
       } else {
-        mutator.set(index, DateTimeUtils.fromJulianDay(intValue + ParquetReaderUtility.SHIFT_PARQUET_DAY_COUNT_TO_JULIAN_DAY));
+        mutator.set(index, intValue * (long) DateTimeConstants.MILLIS_PER_DAY);
       }
     }
 

http://git-wip-us.apache.org/repos/asf/drill/blob/8461d10b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/NullableFixedByteAlignedReaders.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/NullableFixedByteAlignedReaders.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/NullableFixedByteAlignedReaders.java
index 10e0c72..df4c1ba 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/NullableFixedByteAlignedReaders.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/NullableFixedByteAlignedReaders.java
@@ -43,7 +43,7 @@ import org.apache.parquet.column.ColumnDescriptor;
 import org.apache.parquet.format.SchemaElement;
 import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
 import org.apache.parquet.io.api.Binary;
-import org.joda.time.DateTimeUtils;
+import org.joda.time.DateTimeConstants;
 
 import io.netty.buffer.DrillBuf;
 
@@ -327,7 +327,7 @@ public class NullableFixedByteAlignedReaders {
         intValue = readIntLittleEndian(bytebuf, start);
       }
 
-      valueVec.getMutator().set(index, DateTimeUtils.fromJulianDay(intValue + ParquetReaderUtility.SHIFT_PARQUET_DAY_COUNT_TO_JULIAN_DAY));
+      valueVec.getMutator().set(index, intValue * (long) DateTimeConstants.MILLIS_PER_DAY);
     }
 
   }
@@ -351,7 +351,7 @@ public class NullableFixedByteAlignedReaders {
         intValue = readIntLittleEndian(bytebuf, start);
       }
 
-      valueVec.getMutator().set(index, DateTimeUtils.fromJulianDay(intValue + ParquetReaderUtility.CORRECT_CORRUPT_DATE_SHIFT));
+      valueVec.getMutator().set(index, (intValue - ParquetReaderUtility.CORRECT_CORRUPT_DATE_SHIFT) * DateTimeConstants.MILLIS_PER_DAY);
     }
 
   }
@@ -384,9 +384,9 @@ public class NullableFixedByteAlignedReaders {
       }
 
       if (intValue > ParquetReaderUtility.DATE_CORRUPTION_THRESHOLD) {
-        dateVector.getMutator().set(index, DateTimeUtils.fromJulianDay(intValue + ParquetReaderUtility.CORRECT_CORRUPT_DATE_SHIFT));
+        dateVector.getMutator().set(index, (intValue - ParquetReaderUtility.CORRECT_CORRUPT_DATE_SHIFT) * DateTimeConstants.MILLIS_PER_DAY);
       } else {
-        dateVector.getMutator().set(index, DateTimeUtils.fromJulianDay(intValue + ParquetReaderUtility.SHIFT_PARQUET_DAY_COUNT_TO_JULIAN_DAY));
+        dateVector.getMutator().set(index, intValue * (long) DateTimeConstants.MILLIS_PER_DAY);
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/drill/blob/8461d10b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetRecordReader.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetRecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetRecordReader.java
index 99cf0f5..50bb7dc 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetRecordReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetRecordReader.java
@@ -27,7 +27,6 @@ import java.util.Map;
 import com.google.common.collect.ImmutableList;
 import org.apache.drill.common.exceptions.DrillRuntimeException;
 import org.apache.drill.common.exceptions.ExecutionSetupException;
-import org.apache.drill.common.expression.PathSegment;
 import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.common.types.TypeProtos;
 import org.apache.drill.common.types.TypeProtos.DataMode;
@@ -42,18 +41,13 @@ import org.apache.drill.exec.record.MaterializedField;
 import org.apache.drill.exec.store.AbstractRecordReader;
 import org.apache.drill.exec.store.parquet.ParquetReaderStats;
 import org.apache.drill.exec.store.parquet.ParquetReaderUtility;
-import org.apache.drill.exec.store.parquet.ParquetRecordWriter;
 import org.apache.drill.exec.vector.AllocationHelper;
 import org.apache.drill.exec.vector.NullableIntVector;
 import org.apache.drill.exec.vector.ValueVector;
 import org.apache.drill.exec.vector.complex.RepeatedValueVector;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.parquet.SemanticVersion;
-import org.apache.parquet.VersionParser;
 import org.apache.parquet.column.ColumnDescriptor;
-import org.apache.parquet.column.statistics.Statistics;
-import org.apache.parquet.format.ConvertedType;
 import org.apache.parquet.format.FileMetaData;
 import org.apache.parquet.format.SchemaElement;
 import org.apache.parquet.format.converter.ParquetMetadataConverter;
@@ -235,8 +229,6 @@ public class ParquetRecordReader extends AbstractRecordReader {
    * repetition level (indicating if it is an array type) is in the ColumnDescriptor and
    * the length of a fixed width field is stored at the schema level.
    *
-   * @param column
-   * @param se
    * @return the length if fixed width, else -1
    */
   private int getDataTypeLength(ColumnDescriptor column, SchemaElement se) {

http://git-wip-us.apache.org/repos/asf/drill/blob/8461d10b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetGroupConverter.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetGroupConverter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetGroupConverter.java
index 32295b9..48a0bfd 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetGroupConverter.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetGroupConverter.java
@@ -65,7 +65,7 @@ import org.apache.drill.exec.vector.complex.writer.TimeStampWriter;
 import org.apache.drill.exec.vector.complex.writer.TimeWriter;
 import org.apache.drill.exec.vector.complex.writer.VarBinaryWriter;
 import org.apache.drill.exec.vector.complex.writer.VarCharWriter;
-import org.joda.time.DateTimeUtils;
+import org.joda.time.DateTimeConstants;
 
 import org.apache.parquet.io.api.Binary;
 import org.apache.parquet.io.api.Converter;
@@ -357,9 +357,9 @@ public class DrillParquetGroupConverter extends GroupConverter {
     @Override
     public void addInt(int value) {
       if (value > ParquetReaderUtility.DATE_CORRUPTION_THRESHOLD) {
-        holder.value = DateTimeUtils.fromJulianDay(value + ParquetReaderUtility.CORRECT_CORRUPT_DATE_SHIFT);
+        holder.value = (value - ParquetReaderUtility.CORRECT_CORRUPT_DATE_SHIFT) * DateTimeConstants.MILLIS_PER_DAY;
       } else {
-        holder.value = DateTimeUtils.fromJulianDay(value + ParquetReaderUtility.SHIFT_PARQUET_DAY_COUNT_TO_JULIAN_DAY);
+        holder.value = value * (long) DateTimeConstants.MILLIS_PER_DAY;
       }
       writer.write(holder);
     }
@@ -375,7 +375,7 @@ public class DrillParquetGroupConverter extends GroupConverter {
 
     @Override
     public void addInt(int value) {
-      holder.value = DateTimeUtils.fromJulianDay(value + ParquetReaderUtility.CORRECT_CORRUPT_DATE_SHIFT);
+      holder.value = (value - ParquetReaderUtility.CORRECT_CORRUPT_DATE_SHIFT) * DateTimeConstants.MILLIS_PER_DAY;
       writer.write(holder);
     }
   }
@@ -390,7 +390,7 @@ public class DrillParquetGroupConverter extends GroupConverter {
 
     @Override
     public void addInt(int value) {
-      holder.value = DateTimeUtils.fromJulianDay(value + ParquetReaderUtility.SHIFT_PARQUET_DAY_COUNT_TO_JULIAN_DAY);
+      holder.value = value * (long) DateTimeConstants.MILLIS_PER_DAY;
       writer.write(holder);
     }
   }

http://git-wip-us.apache.org/repos/asf/drill/blob/8461d10b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/writer/TestCorruptParquetDateCorrection.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/writer/TestCorruptParquetDateCorrection.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/writer/TestCorruptParquetDateCorrection.java
index f9b0220..27cc093 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/writer/TestCorruptParquetDateCorrection.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/writer/TestCorruptParquetDateCorrection.java
@@ -23,7 +23,6 @@ import org.apache.drill.common.util.TestTools;
 import org.apache.drill.exec.ExecConstants;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.joda.time.DateTime;
@@ -68,9 +67,8 @@ import java.util.regex.Pattern;
 public class TestCorruptParquetDateCorrection extends PlanTestBase {
 
   // 4 files are in the directory:
-  //    - one created with the fixed version of the reader, right before 1.9
-  //        - the code was changed to write the version number 1.9 (without snapshot) into the file
-  //        - for compatibility all 1.9-SNAPSHOT files are read to correct the corrupt dates
+  //    - one created with the fixed version of the reader
+  //        - files have extra meta field: is.date.correct = true
   //    - one from and old version of Drill, before we put in proper created by in metadata
   //        - this is read properly by looking at a Max value in the file statistics, to see that
   //          it is way off of a typical date value
@@ -97,7 +95,9 @@ public class TestCorruptParquetDateCorrection extends PlanTestBase {
   private static final String VARCHAR_PARTITIONED =
       "[WORKING_PATH]/src/test/resources/parquet/4203_corrupt_dates/fewtypes_varcharpartition";
   private static final String DATE_PARTITIONED =
-      "[WORKING_PATH]/src/test/resources/parquet/4203_corrupt_dates/fewtypes_varcharpartition";
+      "[WORKING_PATH]/src/test/resources/parquet/4203_corrupt_dates/fewtypes_datepartition";
+  private static final String EXCEPTION_WHILE_PARSING_CREATED_BY_META =
+      "[WORKING_PATH]/src/test/resources/parquet/4203_corrupt_dates/hive1dot2_fewtypes_null";
 
   private static FileSystem fs;
   private static Path path;
@@ -123,9 +123,9 @@ public class TestCorruptParquetDateCorrection extends PlanTestBase {
 
   /**
    * Test reading a directory full of partitioned parquet files with dates, these files have a drill version
-   * number of 1.9.0 in their footers, so we can be certain they do not have corruption. The option to disable the
-   * correction is passed, but it will not change the result in the case where we are certain correction
-   * is NOT needed. For more info see DRILL-4203.
+   * number of 1.9.0-SNAPSHOT and is.date.correct = true label in their footers, so we can be certain
+   * they do not have corruption. The option to disable the correction is passed, but it will not change the result
+   * in the case where we are certain correction is NOT needed. For more info see DRILL-4203.
    */
   @Test
   public void testReadPartitionedOnCorrectedDates() throws Exception {
@@ -186,6 +186,21 @@ public class TestCorruptParquetDateCorrection extends PlanTestBase {
     testPlanMatchingPatterns(sql, new String[]{"numFiles=6"}, null);
   }
 
+  @Test
+  public void testCorrectDatesAndExceptionWhileParsingCreatedBy() throws Exception {
+    testBuilder()
+        .sqlQuery("select date_col from " +
+            "dfs.`" + EXCEPTION_WHILE_PARSING_CREATED_BY_META +
+            "` where to_date(date_col, 'yyyy-mm-dd') < '1997-01-02'")
+        .baselineColumns("date_col")
+        .unOrdered()
+        .baselineValues(new DateTime(1996, 1, 29, 0, 0))
+        .baselineValues(new DateTime(1996, 3, 1, 0, 0))
+        .baselineValues(new DateTime(1996, 3, 2, 0, 0))
+        .baselineValues(new DateTime(1997, 3, 1, 0, 0))
+        .go();
+  }
+
   /**
    * Test reading a directory full of partitioned parquet files with dates, these files have a drill version
    * number of 1.4.0 in their footers, so we can be certain they are corrupt. The option to disable the
@@ -298,7 +313,8 @@ public class TestCorruptParquetDateCorrection extends PlanTestBase {
   @Test
   public void testReadCorruptDatesWithNullFilledColumns() throws Exception {
     testBuilder()
-        .sqlQuery("select null_dates_1, null_dates_2, non_existent_field, date_col from dfs.`" + PARQUET_DATE_FILE_WITH_NULL_FILLED_COLS + "`")
+        .sqlQuery("select null_dates_1, null_dates_2, non_existent_field, date_col from dfs.`" +
+            PARQUET_DATE_FILE_WITH_NULL_FILLED_COLS + "`")
         .unOrdered()
         .baselineColumns("null_dates_1", "null_dates_2", "non_existent_field", "date_col")
         .baselineValues(null, null, null, new DateTime(1970, 1, 1, 0, 0))
@@ -347,90 +363,6 @@ public class TestCorruptParquetDateCorrection extends PlanTestBase {
     }
   }
 
-  public void addDateBaselineVals(TestBuilder builder) {
-    builder
-        .baselineValues(new DateTime(1970, 1, 1, 0, 0))
-        .baselineValues(new DateTime(1970, 1, 2, 0, 0))
-        .baselineValues(new DateTime(1969, 12, 31, 0, 0))
-        .baselineValues(new DateTime(1969, 12, 30, 0, 0))
-        .baselineValues(new DateTime(1900, 1, 1, 0, 0))
-        .baselineValues(new DateTime(2015, 1, 1, 0, 0));
-  }
-
-  /**
-   * These are the same values added in the addDateBaselineVals, shifted as corrupt values
-   */
-  public void addCorruptedDateBaselineVals(TestBuilder builder) {
-    builder
-        .baselineValues(new DateTime(15334, 03, 17, 0, 0))
-        .baselineValues(new DateTime(15334, 03, 18, 0, 0))
-        .baselineValues(new DateTime(15334, 03, 15, 0, 0))
-        .baselineValues(new DateTime(15334, 03, 16, 0, 0))
-        .baselineValues(new DateTime(15264, 03, 16, 0, 0))
-        .baselineValues(new DateTime(15379, 03, 17, 0, 0));
-  }
-
-  public void readFilesWithUserDisabledAutoCorrection() throws Exception {
-    // ensure that selecting the date column explicitly or as part of a star still results
-    // in checking the file metadata for date columns (when we need to check the statistics
-    // for bad values) to set the flag that the values are corrupt
-    for (String selection : new String[] {"*", "date_col"}) {
-      TestBuilder builder = testBuilder()
-          .sqlQuery("select " + selection + " from table(dfs.`" + MIXED_CORRUPTED_AND_CORRECTED_DATES_PATH + "`" +
-              "(type => 'parquet', autoCorrectCorruptDates => false))")
-          .unOrdered()
-          .baselineColumns("date_col");
-      addDateBaselineVals(builder);
-      addDateBaselineVals(builder);
-      addCorruptedDateBaselineVals(builder);
-      addCorruptedDateBaselineVals(builder);
-      builder.go();
-    }
-  }
-
-  private static String replaceWorkingPathInString(String orig) {
-    return orig.replaceAll(Pattern.quote("[WORKING_PATH]"), Matcher.quoteReplacement(TestTools.getWorkingPath()));
-  }
-
-  private static void copyDirectoryIntoTempSpace(String resourcesDir) throws IOException {
-    copyDirectoryIntoTempSpace(resourcesDir, null);
-  }
-
-  private static void copyDirectoryIntoTempSpace(String resourcesDir, String destinationSubDir) throws IOException {
-    Path destination = path;
-    if (destinationSubDir != null) {
-      destination = new Path(path, destinationSubDir);
-    }
-    fs.copyFromLocalFile(
-        new Path(replaceWorkingPathInString(resourcesDir)),
-        destination);
-  }
-
-  /**
-   * Metadata cache files include full paths to the files that have been scanned.
-   *
-   * There is no way to generate a metadata cache file with absolute paths that
-   * will be guarenteed to be available on an arbitrary test machine.
-   *
-   * To enable testing older metadata cache files, they were generated manually
-   * using older drill versions, and the absolute path up to the folder where
-   * the metadata cache file appeared was manually replaced with the string
-   * REPLACED_IN_TEST. Here the file is re-written into the given temporary
-   * location after the REPLACED_IN_TEST string has been replaced by the actual
-   * location generated during this run of the tests.
-   *
-   * @param srcFileOnClassPath
-   * @param destFolderInTmp
-   * @throws IOException
-   */
-  private static void copyMetaDataCacheToTempReplacingInternalPaths(String srcFileOnClassPath, String destFolderInTmp) throws IOException {
-    String metadataFileContents = getFile(srcFileOnClassPath);
-    Path newMetaCache = new Path(new Path(path, destFolderInTmp), ".drill.parquet_metadata");
-    FSDataOutputStream outSteam = fs.create(newMetaCache);
-    outSteam.writeBytes(metadataFileContents.replace("REPLACED_IN_TEST", path.toString()));
-    outSteam.close();
-  }
-
   @Test
   public void testReadOldMetadataCacheFile() throws Exception {
     // for sanity, try reading all partitions without a filter
@@ -488,8 +420,8 @@ public class TestCorruptParquetDateCorrection extends PlanTestBase {
   @Test
   public void testReadNewMetadataCacheFileOverOldAndNewFiles() throws Exception {
     String table = "dfs.`" + new Path(path, MIXED_CORRUPTED_AND_CORRECTED_PARTITIONED_FOLDER) + "`";
-    copyMetaDataCacheToTempReplacingInternalPaths("parquet/4203_corrupt_dates/mixed_version_partitioned_metadata.requires_replace.txt",
-        MIXED_CORRUPTED_AND_CORRECTED_PARTITIONED_FOLDER);
+    copyMetaDataCacheToTempReplacingInternalPaths("parquet/4203_corrupt_dates/" +
+        "mixed_version_partitioned_metadata.requires_replace.txt", MIXED_CORRUPTED_AND_CORRECTED_PARTITIONED_FOLDER);
     // for sanity, try reading all partitions without a filter
     TestBuilder builder = testBuilder()
         .sqlQuery("select date_col from " + table)
@@ -520,7 +452,7 @@ public class TestCorruptParquetDateCorrection extends PlanTestBase {
    * Read a directory with parquet files where some have corrupted dates, see DRILL-4203.
    * @throws Exception
    */
-  public void readMixedCorruptedAndCorrectedDates() throws Exception {
+  private void readMixedCorruptedAndCorrectedDates() throws Exception {
     // ensure that selecting the date column explicitly or as part of a star still results
     // in checking the file metadata for date columns (when we need to check the statistics
     // for bad values) to set the flag that the values are corrupt
@@ -536,4 +468,90 @@ public class TestCorruptParquetDateCorrection extends PlanTestBase {
     }
   }
 
+
+  private void addDateBaselineVals(TestBuilder builder) {
+    builder
+        .baselineValues(new DateTime(1970, 1, 1, 0, 0))
+        .baselineValues(new DateTime(1970, 1, 2, 0, 0))
+        .baselineValues(new DateTime(1969, 12, 31, 0, 0))
+        .baselineValues(new DateTime(1969, 12, 30, 0, 0))
+        .baselineValues(new DateTime(1900, 1, 1, 0, 0))
+        .baselineValues(new DateTime(2015, 1, 1, 0, 0));
+  }
+
+  /**
+   * These are the same values added in the addDateBaselineVals, shifted as corrupt values
+   */
+  private void addCorruptedDateBaselineVals(TestBuilder builder) {
+    builder
+        .baselineValues(new DateTime(15334, 03, 17, 0, 0))
+        .baselineValues(new DateTime(15334, 03, 18, 0, 0))
+        .baselineValues(new DateTime(15334, 03, 15, 0, 0))
+        .baselineValues(new DateTime(15334, 03, 16, 0, 0))
+        .baselineValues(new DateTime(15264, 03, 16, 0, 0))
+        .baselineValues(new DateTime(15379, 03, 17, 0, 0));
+  }
+
+  private void readFilesWithUserDisabledAutoCorrection() throws Exception {
+    // ensure that selecting the date column explicitly or as part of a star still results
+    // in checking the file metadata for date columns (when we need to check the statistics
+    // for bad values) to set the flag that the values are corrupt
+    for (String selection : new String[] {"*", "date_col"}) {
+      TestBuilder builder = testBuilder()
+          .sqlQuery("select " + selection + " from table(dfs.`" + MIXED_CORRUPTED_AND_CORRECTED_DATES_PATH + "`" +
+              "(type => 'parquet', autoCorrectCorruptDates => false))")
+          .unOrdered()
+          .baselineColumns("date_col");
+      addDateBaselineVals(builder);
+      addDateBaselineVals(builder);
+      addCorruptedDateBaselineVals(builder);
+      addCorruptedDateBaselineVals(builder);
+      builder.go();
+    }
+  }
+
+  private static String replaceWorkingPathInString(String orig) {
+    return orig.replaceAll(Pattern.quote("[WORKING_PATH]"), Matcher.quoteReplacement(TestTools.getWorkingPath()));
+  }
+
+  private static void copyDirectoryIntoTempSpace(String resourcesDir) throws IOException {
+    copyDirectoryIntoTempSpace(resourcesDir, null);
+  }
+
+  private static void copyDirectoryIntoTempSpace(String resourcesDir, String destinationSubDir) throws IOException {
+    Path destination = path;
+    if (destinationSubDir != null) {
+      destination = new Path(path, destinationSubDir);
+    }
+    fs.copyFromLocalFile(
+        new Path(replaceWorkingPathInString(resourcesDir)),
+        destination);
+  }
+
+  /**
+   * Metadata cache files include full paths to the files that have been scanned.
+   *
+   * There is no way to generate a metadata cache file with absolute paths that
+   * will be guaranteed to be available on an arbitrary test machine.
+   *
+   * To enable testing older metadata cache files, they were generated manually
+   * using older drill versions, and the absolute path up to the folder where
+   * the metadata cache file appeared was manually replaced with the string
+   * REPLACED_IN_TEST. Here the file is re-written into the given temporary
+   * location after the REPLACED_IN_TEST string has been replaced by the actual
+   * location generated during this run of the tests.
+   *
+   * @param srcFileOnClassPath
+   * @param destFolderInTmp
+   * @throws IOException
+   */
+  private static void copyMetaDataCacheToTempReplacingInternalPaths(String srcFileOnClassPath, String destFolderInTmp)
+      throws IOException {
+    String metadataFileContents = getFile(srcFileOnClassPath);
+    Path newMetaCache = new Path(new Path(path, destFolderInTmp), ".drill.parquet_metadata");
+    FSDataOutputStream outSteam = fs.create(newMetaCache);
+    outSteam.writeBytes(metadataFileContents.replace("REPLACED_IN_TEST", path.toString()));
+    outSteam.close();
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/8461d10b/exec/java-exec/src/test/resources/parquet/4203_corrupt_dates/1_9_0_partitioned_no_corruption/0_0_1.parquet
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/resources/parquet/4203_corrupt_dates/1_9_0_partitioned_no_corruption/0_0_1.parquet b/exec/java-exec/src/test/resources/parquet/4203_corrupt_dates/1_9_0_partitioned_no_corruption/0_0_1.parquet
index 7aa8e61..0ec76c8 100644
Binary files a/exec/java-exec/src/test/resources/parquet/4203_corrupt_dates/1_9_0_partitioned_no_corruption/0_0_1.parquet and b/exec/java-exec/src/test/resources/parquet/4203_corrupt_dates/1_9_0_partitioned_no_corruption/0_0_1.parquet differ

http://git-wip-us.apache.org/repos/asf/drill/blob/8461d10b/exec/java-exec/src/test/resources/parquet/4203_corrupt_dates/1_9_0_partitioned_no_corruption/0_0_2.parquet
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/resources/parquet/4203_corrupt_dates/1_9_0_partitioned_no_corruption/0_0_2.parquet b/exec/java-exec/src/test/resources/parquet/4203_corrupt_dates/1_9_0_partitioned_no_corruption/0_0_2.parquet
index 8c43cee..b43ab9d 100644
Binary files a/exec/java-exec/src/test/resources/parquet/4203_corrupt_dates/1_9_0_partitioned_no_corruption/0_0_2.parquet and b/exec/java-exec/src/test/resources/parquet/4203_corrupt_dates/1_9_0_partitioned_no_corruption/0_0_2.parquet differ

http://git-wip-us.apache.org/repos/asf/drill/blob/8461d10b/exec/java-exec/src/test/resources/parquet/4203_corrupt_dates/1_9_0_partitioned_no_corruption/0_0_3.parquet
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/resources/parquet/4203_corrupt_dates/1_9_0_partitioned_no_corruption/0_0_3.parquet b/exec/java-exec/src/test/resources/parquet/4203_corrupt_dates/1_9_0_partitioned_no_corruption/0_0_3.parquet
index 2d2415a..95d46bb 100644
Binary files a/exec/java-exec/src/test/resources/parquet/4203_corrupt_dates/1_9_0_partitioned_no_corruption/0_0_3.parquet and b/exec/java-exec/src/test/resources/parquet/4203_corrupt_dates/1_9_0_partitioned_no_corruption/0_0_3.parquet differ

http://git-wip-us.apache.org/repos/asf/drill/blob/8461d10b/exec/java-exec/src/test/resources/parquet/4203_corrupt_dates/1_9_0_partitioned_no_corruption/0_0_4.parquet
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/resources/parquet/4203_corrupt_dates/1_9_0_partitioned_no_corruption/0_0_4.parquet b/exec/java-exec/src/test/resources/parquet/4203_corrupt_dates/1_9_0_partitioned_no_corruption/0_0_4.parquet
index ff5ce24..9f7269e 100644
Binary files a/exec/java-exec/src/test/resources/parquet/4203_corrupt_dates/1_9_0_partitioned_no_corruption/0_0_4.parquet and b/exec/java-exec/src/test/resources/parquet/4203_corrupt_dates/1_9_0_partitioned_no_corruption/0_0_4.parquet differ

http://git-wip-us.apache.org/repos/asf/drill/blob/8461d10b/exec/java-exec/src/test/resources/parquet/4203_corrupt_dates/1_9_0_partitioned_no_corruption/0_0_5.parquet
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/resources/parquet/4203_corrupt_dates/1_9_0_partitioned_no_corruption/0_0_5.parquet b/exec/java-exec/src/test/resources/parquet/4203_corrupt_dates/1_9_0_partitioned_no_corruption/0_0_5.parquet
index 4a5d4fb..b0d370a 100644
Binary files a/exec/java-exec/src/test/resources/parquet/4203_corrupt_dates/1_9_0_partitioned_no_corruption/0_0_5.parquet and b/exec/java-exec/src/test/resources/parquet/4203_corrupt_dates/1_9_0_partitioned_no_corruption/0_0_5.parquet differ

http://git-wip-us.apache.org/repos/asf/drill/blob/8461d10b/exec/java-exec/src/test/resources/parquet/4203_corrupt_dates/1_9_0_partitioned_no_corruption/0_0_6.parquet
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/resources/parquet/4203_corrupt_dates/1_9_0_partitioned_no_corruption/0_0_6.parquet b/exec/java-exec/src/test/resources/parquet/4203_corrupt_dates/1_9_0_partitioned_no_corruption/0_0_6.parquet
index da7ad3f..a5c7b86 100644
Binary files a/exec/java-exec/src/test/resources/parquet/4203_corrupt_dates/1_9_0_partitioned_no_corruption/0_0_6.parquet and b/exec/java-exec/src/test/resources/parquet/4203_corrupt_dates/1_9_0_partitioned_no_corruption/0_0_6.parquet differ

http://git-wip-us.apache.org/repos/asf/drill/blob/8461d10b/exec/java-exec/src/test/resources/parquet/4203_corrupt_dates/hive1dot2_fewtypes_null/000000_0
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/resources/parquet/4203_corrupt_dates/hive1dot2_fewtypes_null/000000_0 b/exec/java-exec/src/test/resources/parquet/4203_corrupt_dates/hive1dot2_fewtypes_null/000000_0
new file mode 100644
index 0000000..89528e3
Binary files /dev/null and b/exec/java-exec/src/test/resources/parquet/4203_corrupt_dates/hive1dot2_fewtypes_null/000000_0 differ

http://git-wip-us.apache.org/repos/asf/drill/blob/8461d10b/exec/java-exec/src/test/resources/parquet/4203_corrupt_dates/mixed_drill_versions/4203_corrected_dates.parquet
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/resources/parquet/4203_corrupt_dates/mixed_drill_versions/4203_corrected_dates.parquet b/exec/java-exec/src/test/resources/parquet/4203_corrupt_dates/mixed_drill_versions/4203_corrected_dates.parquet
index cee02a2..3d46f56 100644
Binary files a/exec/java-exec/src/test/resources/parquet/4203_corrupt_dates/mixed_drill_versions/4203_corrected_dates.parquet and b/exec/java-exec/src/test/resources/parquet/4203_corrupt_dates/mixed_drill_versions/4203_corrected_dates.parquet differ

http://git-wip-us.apache.org/repos/asf/drill/blob/8461d10b/exec/java-exec/src/test/resources/parquet/4203_corrupt_dates/mixed_version_partitioned_metadata.requires_replace.txt
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/resources/parquet/4203_corrupt_dates/mixed_version_partitioned_metadata.requires_replace.txt b/exec/java-exec/src/test/resources/parquet/4203_corrupt_dates/mixed_version_partitioned_metadata.requires_replace.txt
index 7fdb5b2..bb6e282 100644
--- a/exec/java-exec/src/test/resources/parquet/4203_corrupt_dates/mixed_version_partitioned_metadata.requires_replace.txt
+++ b/exec/java-exec/src/test/resources/parquet/4203_corrupt_dates/mixed_version_partitioned_metadata.requires_replace.txt
@@ -297,5 +297,5 @@
     } ]
   } ],
   "directories" : [ "file:REPLACED_IN_TEST/mixed_partitioned/1_9_0_partitioned_no_corruption", "file:REPLACED_IN_TEST/mixed_partitioned/partitioned_with_corruption_4203" ],
-  "drillVersion" : "1.9.0"
+  "isDateCorrect" : "true"
 }