You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by ar...@apache.org on 2017/08/15 17:25:10 UTC

[2/2] drill git commit: DRILL-5660: Parquet metadata caching improvements

DRILL-5660: Parquet metadata caching improvements

1. Bumped up metadata file version to v3_1.
2. Introduced MetadataVersion comparable class.
3. Added support to ignore unknown metadata version (for example, metadata generated from future versions of drill).
4. Added support to ignore corrupted or missing metadata files.
5. Removed "%20" symbols from path if present.

closes #877


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/e9065b55
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/e9065b55
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/e9065b55

Branch: refs/heads/master
Commit: e9065b55ea560e7f737d6fcb4948f9e945b9b14f
Parents: 073ea68
Author: Vitalii Diravka <vi...@gmail.com>
Authored: Wed Jul 12 15:12:00 2017 -0700
Committer: Arina Ielchiieva <ar...@gmail.com>
Committed: Tue Aug 15 19:31:00 2017 +0300

----------------------------------------------------------------------
 .../drill/exec/store/dfs/FileSelection.java     |   2 +-
 .../drill/exec/store/dfs/MetadataContext.java   |  37 ++-
 .../drill/exec/store/parquet/Metadata.java      | 293 ++++++++++++-------
 .../exec/store/parquet/MetadataVersion.java     | 155 ++++++++++
 .../exec/store/parquet/ParquetFormatConfig.java |  14 +-
 .../exec/store/parquet/ParquetFormatPlugin.java |   6 +-
 .../exec/store/parquet/ParquetGroupScan.java    | 144 ++++++---
 .../store/parquet/ParquetReaderUtility.java     |  11 +-
 .../exec/store/parquet/ParquetRowGroupScan.java |   9 +-
 .../store/parquet/ParquetScanBatchCreator.java  |   2 +-
 .../java/org/apache/drill/BaseTestQuery.java    |  19 +-
 .../store/parquet/TestParquetMetadataCache.java | 249 ++++++++++++++--
 .../parquet/TestParquetMetadataVersion.java     | 121 ++++++++
 ...ies_with_absolute_paths.requires_replace.txt |   2 +-
 ...ble_with_absolute_paths.requires_replace.txt |   6 +-
 ..._with_absolute_paths_t1.requires_replace.txt |   2 +-
 ..._with_absolute_paths_t2.requires_replace.txt |   2 +-
 .../corrupted_metadata.requires_replace.txt     |  41 +++
 .../empty_metadata_file.requires_replace.txt    |   0
 ...ported_metadata_version.requires_replace.txt |  76 +++++
 20 files changed, 980 insertions(+), 211 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/e9065b55/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSelection.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSelection.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSelection.java
index 7682d69..7cabee0 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSelection.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSelection.java
@@ -274,7 +274,7 @@ public class FileSelection {
     if (statuses == null) {
       return null;
     }
-    final FileSelection fileSel = create(Lists.newArrayList(statuses), null, combined.toUri().toString());
+    final FileSelection fileSel = create(Lists.newArrayList(statuses), null, combined.toUri().getPath());
     logger.debug("FileSelection.create() took {} ms ", timer.elapsed(TimeUnit.MILLISECONDS));
     if (fileSel == null) {
       return null;

http://git-wip-us.apache.org/repos/asf/drill/blob/e9065b55/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/MetadataContext.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/MetadataContext.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/MetadataContext.java
index aff8367..4ed6b4f 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/MetadataContext.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/MetadataContext.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -27,22 +27,16 @@ import com.google.common.collect.Maps;
  */
 public class MetadataContext {
 
-  /** Map of directory path to the status of whether modification time was already checked.
+  /**
+   *  Map of directory path to the status of whether modification time was already checked.
    *  Note: the #directories is typically a small percentage of the #files, so the memory footprint
    *  is expected to be relatively small.
    */
   private Map<String, Boolean> dirModifCheckMap = Maps.newHashMap();
 
-  public enum PruneStatus {
-    NOT_STARTED,         // initial state
-    PRUNED,              // partitions were pruned
-    NOT_PRUNED           // partitions did not get pruned
-  }
-
   private PruneStatus pruneStatus = PruneStatus.NOT_STARTED;
 
-  public MetadataContext() {
-  }
+  private boolean metadataCacheCorrupted;
 
   public void setStatus(String dir) {
     dirModifCheckMap.put(dir,  true);
@@ -61,6 +55,7 @@ public class MetadataContext {
 
   public void clear() {
     dirModifCheckMap.clear();
+    metadataCacheCorrupted = false;
   }
 
   public void setPruneStatus(PruneStatus status) {
@@ -71,6 +66,28 @@ public class MetadataContext {
     return pruneStatus;
   }
 
+  /**
+   * @return true if parquet metadata cache files are missing or corrupted, false otherwise
+   */
+  public boolean isMetadataCacheCorrupted() {
+    return metadataCacheCorrupted;
+  }
+
+  /**
+   * Setting this as true allows to avoid double reading of corrupted, unsupported or missing metadata files
+   *
+   * @param metadataCacheCorrupted metadata corruption status
+   */
+  public void setMetadataCacheCorrupted(boolean metadataCacheCorrupted) {
+    this.metadataCacheCorrupted = metadataCacheCorrupted;
+  }
+
+  public enum PruneStatus {
+    NOT_STARTED,         // initial state
+    PRUNED,              // partitions were pruned
+    NOT_PRUNED           // partitions did not get pruned
+  }
+
 }
 
 

http://git-wip-us.apache.org/repos/asf/drill/blob/e9065b55/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/Metadata.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/Metadata.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/Metadata.java
index d9b99f5..a33f46a 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/Metadata.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/Metadata.java
@@ -78,6 +78,13 @@ import com.google.common.base.Stopwatch;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 
+import javax.annotation.Nullable;
+
+import static org.apache.drill.exec.store.parquet.MetadataVersion.Constants.V1;
+import static org.apache.drill.exec.store.parquet.MetadataVersion.Constants.V2;
+import static org.apache.drill.exec.store.parquet.MetadataVersion.Constants.V3;
+import static org.apache.drill.exec.store.parquet.MetadataVersion.Constants.V3_1;
+
 public class Metadata {
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(Metadata.class);
 
@@ -132,25 +139,60 @@ public class Metadata {
   }
 
   /**
-   * Get the parquet metadata for a directory by reading the metadata file
+   * Get the parquet metadata for the table by reading the metadata file
    *
-   * @param fs
+   * @param fs current file system
    * @param path The path to the metadata file, located in the directory that contains the parquet files
-   * @return
-   * @throws IOException
+   * @param metaContext metadata context
+   * @param formatConfig parquet format plugin configs
+   * @return parquet table metadata. Null if metadata cache is missing, unsupported or corrupted
    */
-  public static ParquetTableMetadataBase readBlockMeta(FileSystem fs, String path, MetadataContext metaContext, ParquetFormatConfig formatConfig) throws IOException {
+  public static @Nullable ParquetTableMetadataBase readBlockMeta(FileSystem fs, Path path, MetadataContext metaContext,
+      ParquetFormatConfig formatConfig) {
+    if (ignoreReadingMetadata(metaContext, path)) {
+      return null;
+    }
     Metadata metadata = new Metadata(fs, formatConfig);
     metadata.readBlockMeta(path, false, metaContext);
     return metadata.parquetTableMetadata;
   }
 
-  public static ParquetTableMetadataDirs readMetadataDirs(FileSystem fs, String path, MetadataContext metaContext, ParquetFormatConfig formatConfig) throws IOException {
+  /**
+   * Get the parquet metadata for all subdirectories by reading the metadata file
+   *
+   * @param fs current file system
+   * @param path The path to the metadata file, located in the directory that contains the parquet files
+   * @param metaContext metadata context
+   * @param formatConfig parquet format plugin configs
+   * @return parquet metadata for a directory. Null if metadata cache is missing, unsupported or corrupted
+   */
+  public static @Nullable ParquetTableMetadataDirs readMetadataDirs(FileSystem fs, Path path,
+      MetadataContext metaContext, ParquetFormatConfig formatConfig) {
+    if (ignoreReadingMetadata(metaContext, path)) {
+      return null;
+    }
     Metadata metadata = new Metadata(fs, formatConfig);
     metadata.readBlockMeta(path, true, metaContext);
     return metadata.parquetTableMetadataDirs;
   }
 
+  /**
+   * Ignore reading metadata files, if metadata is missing, unsupported or corrupted
+   *
+   * @param metaContext Metadata context
+   * @param path The path to the metadata file, located in the directory that contains the parquet files
+   * @return true if parquet metadata is missing or corrupted, false otherwise
+   */
+  private static boolean ignoreReadingMetadata(MetadataContext metaContext, Path path) {
+    if (metaContext.isMetadataCacheCorrupted()) {
+      logger.warn("Ignoring of reading '{}' metadata file. Parquet metadata cache files are unsupported or corrupted. " +
+          "Query performance may be slow. Make sure the cache files are up-to-date by running the 'REFRESH TABLE " +
+          "METADATA' command", path);
+      return true;
+    }
+    return false;
+  }
+
   private Metadata(FileSystem fs, ParquetFormatConfig formatConfig) {
     this.fs = ImpersonationUtil.createFileSystem(ImpersonationUtil.getProcessUserName(), fs.getConf());
     this.formatConfig = formatConfig;
@@ -192,7 +234,7 @@ public class Metadata {
         childFiles.add(file);
       }
     }
-    ParquetTableMetadata_v3 parquetTableMetadata = new ParquetTableMetadata_v3(DrillVersionInfo.getVersion());
+    ParquetTableMetadata_v3 parquetTableMetadata = new ParquetTableMetadata_v3(V3_1, DrillVersionInfo.getVersion());
     if (childFiles.size() > 0) {
       List<ParquetFileMetadata_v3 > childFilesMetadata =
           getParquetFileMetadata_v3(parquetTableMetadata, childFiles);
@@ -203,7 +245,7 @@ public class Metadata {
 
     parquetTableMetadata.directories = directoryList;
     parquetTableMetadata.files = metaDataList;
-    //TODO: We need a merge method that merges two colums with the same name but different types
+    // TODO: We need a merge method that merges two columns with the same name but different types
     if (parquetTableMetadata.columnTypeInfo == null) {
       parquetTableMetadata.columnTypeInfo = new ConcurrentHashMap<>();
     }
@@ -260,13 +302,13 @@ public class Metadata {
   /**
    * Get the parquet metadata for a list of parquet files
    *
-   * @param fileStatuses
-   * @return
-   * @throws IOException
+   * @param fileStatuses List of file statuses
+   * @return parquet table metadata object
+   * @throws IOException if parquet file metadata can't be obtained
    */
   private ParquetTableMetadata_v3 getParquetTableMetadata(List<FileStatus> fileStatuses)
       throws IOException {
-    ParquetTableMetadata_v3 tableMetadata = new ParquetTableMetadata_v3();
+    ParquetTableMetadata_v3 tableMetadata = new ParquetTableMetadata_v3(V3_1, DrillVersionInfo.getVersion());
     List<ParquetFileMetadata_v3> fileMetadataList = getParquetFileMetadata_v3(tableMetadata, fileStatuses);
     tableMetadata.files = fileMetadataList;
     tableMetadata.directories = new ArrayList<String>();
@@ -280,7 +322,7 @@ public class Metadata {
    * @param fileStatuses list of the parquet files statuses
    *
    * @return list of the parquet file metadata with absolute paths
-   * @throws IOException
+   * @throws IOException is thrown in case of issues while executing the list of runnables
    */
   private List<ParquetFileMetadata_v3> getParquetFileMetadata_v3(
       ParquetTableMetadata_v3 parquetTableMetadata_v3, List<FileStatus> fileStatuses) throws IOException {
@@ -384,7 +426,7 @@ public class Metadata {
 
     ArrayList<SchemaPath> ALL_COLS = new ArrayList<>();
     ALL_COLS.add(AbstractRecordReader.STAR_COLUMN);
-    boolean autoCorrectCorruptDates = formatConfig.autoCorrectCorruptDates;
+    boolean autoCorrectCorruptDates = formatConfig.areCorruptDatesAutoCorrected();
     ParquetReaderUtility.DateCorruptionStatus containsCorruptDates = ParquetReaderUtility.detectCorruptDates(metadata, ALL_COLS, autoCorrectCorruptDates);
     if (logger.isDebugEnabled()) {
       logger.debug(containsCorruptDates.toString());
@@ -519,17 +561,16 @@ public class Metadata {
   /**
    * Read the parquet metadata from a file
    *
-   * @param path
-   * @return
-   * @throws IOException
+   * @param path to metadata file
+   * @param dirsOnly true for {@link Metadata#METADATA_DIRECTORIES_FILENAME}
+   *                 or false for {@link Metadata#METADATA_FILENAME} files reading
+   * @param metaContext current metadata context
+   * @throws IOException if metadata file can't be read or updated
    */
-  private void readBlockMeta(String path,
-      boolean dirsOnly,
-      MetadataContext metaContext) throws IOException {
+  private void readBlockMeta(Path path, boolean dirsOnly, MetadataContext metaContext) {
     Stopwatch timer = Stopwatch.createStarted();
-    Path p = new Path(path);
-    Path parentDir = Path.getPathWithoutSchemeAndAuthority(p.getParent()); // parent directory of the metadata file
-    String parentDirString = parentDir.toUri().toString(); // string representation for parent directory of the metadata file
+    Path metadataParentDir = Path.getPathWithoutSchemeAndAuthority(path.getParent());
+    String metadataParentDirPath = metadataParentDir.toUri().getPath();
     ObjectMapper mapper = new ObjectMapper();
 
     final SimpleModule serialModule = new SimpleModule();
@@ -543,78 +584,72 @@ public class Metadata {
     mapper.registerModule(serialModule);
     mapper.registerModule(module);
     mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
-    FSDataInputStream is = fs.open(p);
-
-    boolean alreadyCheckedModification = false;
-    boolean newMetadata = false;
-
-    if (metaContext != null) {
-      alreadyCheckedModification = metaContext.getStatus(parentDirString);
-    }
-
-    if (dirsOnly) {
-      parquetTableMetadataDirs = mapper.readValue(is, ParquetTableMetadataDirs.class);
-      logger.info("Took {} ms to read directories from directory cache file", timer.elapsed(TimeUnit.MILLISECONDS));
-      timer.stop();
-      parquetTableMetadataDirs.updateRelativePaths(parentDirString);
-      if (!alreadyCheckedModification && tableModified(parquetTableMetadataDirs.getDirectories(), p, parentDir, metaContext)) {
-        parquetTableMetadataDirs =
-            (createMetaFilesRecursively(Path.getPathWithoutSchemeAndAuthority(p.getParent()).toString())).getRight();
-        newMetadata = true;
-      }
-    } else {
-      parquetTableMetadata = mapper.readValue(is, ParquetTableMetadataBase.class);
-      logger.info("Took {} ms to read metadata from cache file", timer.elapsed(TimeUnit.MILLISECONDS));
-      timer.stop();
-      if (parquetTableMetadata instanceof ParquetTableMetadata_v3) {
-        ((ParquetTableMetadata_v3) parquetTableMetadata).updateRelativePaths(parentDirString);
-      }
-      if (!alreadyCheckedModification && tableModified(parquetTableMetadata.getDirectories(), p, parentDir, metaContext)) {
-        parquetTableMetadata =
-            (createMetaFilesRecursively(Path.getPathWithoutSchemeAndAuthority(p.getParent()).toString())).getLeft();
-        newMetadata = true;
-      }
+    try (FSDataInputStream is = fs.open(path)) {
+      boolean alreadyCheckedModification = false;
+      boolean newMetadata = false;
+        alreadyCheckedModification = metaContext.getStatus(metadataParentDirPath);
+
+      if (dirsOnly) {
+        parquetTableMetadataDirs = mapper.readValue(is, ParquetTableMetadataDirs.class);
+        logger.info("Took {} ms to read directories from directory cache file", timer.elapsed(TimeUnit.MILLISECONDS));
+        timer.stop();
+        parquetTableMetadataDirs.updateRelativePaths(metadataParentDirPath);
+        if (!alreadyCheckedModification && tableModified(parquetTableMetadataDirs.getDirectories(), path, metadataParentDir, metaContext)) {
+          parquetTableMetadataDirs =
+              (createMetaFilesRecursively(Path.getPathWithoutSchemeAndAuthority(path.getParent()).toString())).getRight();
+          newMetadata = true;
+        }
+      } else {
+        parquetTableMetadata = mapper.readValue(is, ParquetTableMetadataBase.class);
+        logger.info("Took {} ms to read metadata from cache file", timer.elapsed(TimeUnit.MILLISECONDS));
+        timer.stop();
+        if (new MetadataVersion(parquetTableMetadata.getMetadataVersion()).compareTo(new MetadataVersion(3, 0)) >= 0) {
+          ((ParquetTableMetadata_v3) parquetTableMetadata).updateRelativePaths(metadataParentDirPath);
+        }
+        if (!alreadyCheckedModification && tableModified(parquetTableMetadata.getDirectories(), path, metadataParentDir, metaContext)) {
+          parquetTableMetadata =
+              (createMetaFilesRecursively(Path.getPathWithoutSchemeAndAuthority(path.getParent()).toString())).getLeft();
+          newMetadata = true;
+        }
 
-      // DRILL-5009: Remove the RowGroup if it is empty
-      List<? extends ParquetFileMetadata> files = parquetTableMetadata.getFiles();
-      for (ParquetFileMetadata file : files) {
-        List<? extends RowGroupMetadata> rowGroups = file.getRowGroups();
-        for (Iterator<? extends RowGroupMetadata> iter = rowGroups.iterator(); iter.hasNext(); ) {
-          RowGroupMetadata r = iter.next();
-          if (r.getRowCount() == 0) {
-            iter.remove();
+        // DRILL-5009: Remove the RowGroup if it is empty
+        List<? extends ParquetFileMetadata> files = parquetTableMetadata.getFiles();
+        for (ParquetFileMetadata file : files) {
+          List<? extends RowGroupMetadata> rowGroups = file.getRowGroups();
+          for (Iterator<? extends RowGroupMetadata> iter = rowGroups.iterator(); iter.hasNext(); ) {
+            RowGroupMetadata r = iter.next();
+            if (r.getRowCount() == 0) {
+              iter.remove();
+            }
           }
         }
-      }
-
-    }
 
-    if (newMetadata && metaContext != null) {
-      // if new metadata files were created, invalidate the existing metadata context
-      metaContext.clear();
+      }
+      if (newMetadata) {
+        // if new metadata files were created, invalidate the existing metadata context
+        metaContext.clear();
+      }
+    } catch (IOException e) {
+      logger.error("Failed to read '{}' metadata file", path, e);
+      metaContext.setMetadataCacheCorrupted(true);
     }
-
   }
 
   /**
    * Check if the parquet metadata needs to be updated by comparing the modification time of the directories with
    * the modification time of the metadata file
    *
-   * @param directories
-   * @param metaFilePath
-   * @return
-   * @throws IOException
+   * @param directories List of directories
+   * @param metaFilePath path of parquet metadata cache file
+   * @return true if metadata needs to be updated, false otherwise
+   * @throws IOException if some resources are not accessible
    */
-  private boolean tableModified(List<String> directories, Path metaFilePath,
-      Path parentDir,
-      MetadataContext metaContext)
+  private boolean tableModified(List<String> directories, Path metaFilePath, Path parentDir, MetadataContext metaContext)
       throws IOException {
 
     Stopwatch timer = Stopwatch.createStarted();
 
-    if (metaContext != null) {
-      metaContext.setStatus(parentDir.toString());
-    }
+    metaContext.setStatus(parentDir.toUri().getPath());
     long metaFileModifyTime = fs.getFileStatus(metaFilePath).getModificationTime();
     FileStatus directoryStatus = fs.getFileStatus(parentDir);
     int numDirs = 1;
@@ -627,9 +662,7 @@ public class Metadata {
     }
     for (String directory : directories) {
       numDirs++;
-      if (metaContext != null) {
-        metaContext.setStatus(directory);
-      }
+      metaContext.setStatus(directory);
       directoryStatus = fs.getFileStatus(new Path(directory));
       if (directoryStatus.getModificationTime() > metaFileModifyTime) {
         logger.info("Directory {} was modified. Took {} ms to check modification time of {} directories", directoryStatus.getPath().toString(),
@@ -644,11 +677,25 @@ public class Metadata {
     return false;
   }
 
-  @JsonTypeInfo(use = JsonTypeInfo.Id.NAME, include = JsonTypeInfo.As.PROPERTY, property = "metadata_version")
+  /**
+   * Basic class for parquet metadata. Inheritors of this class are json serializable structures which contain
+   * different metadata versions for an entire parquet directory structure
+   * <p>
+   * If any new code changes affect on the metadata files content, please update metadata version in such manner:
+   * Bump up metadata major version if metadata structure is changed.
+   * Bump up metadata minor version if only metadata content is changed, but metadata structure is the same.
+   * <p>
+   * Note: keep metadata versions synchronized with {@link MetadataVersion.Constants}
+   */
+  @JsonTypeInfo(use = JsonTypeInfo.Id.NAME,
+                include = JsonTypeInfo.As.PROPERTY,
+                property = "metadata_version",
+                visible = true)
   @JsonSubTypes({
-      @JsonSubTypes.Type(value = ParquetTableMetadata_v1.class, name="v1"),
-      @JsonSubTypes.Type(value = ParquetTableMetadata_v2.class, name="v2"),
-      @JsonSubTypes.Type(value = ParquetTableMetadata_v3.class, name="v3")
+      @JsonSubTypes.Type(value = ParquetTableMetadata_v1.class, name = V1),
+      @JsonSubTypes.Type(value = ParquetTableMetadata_v2.class, name = V2),
+      @JsonSubTypes.Type(value = ParquetTableMetadata_v3.class, name = V3),
+      @JsonSubTypes.Type(value = ParquetTableMetadata_v3.class, name = V3_1)
       })
   public static abstract class ParquetTableMetadataBase {
 
@@ -673,6 +720,8 @@ public class Metadata {
     @JsonIgnore public abstract ParquetTableMetadataBase clone();
 
     @JsonIgnore public abstract String getDrillVersion();
+
+    @JsonIgnore public abstract String getMetadataVersion();
   }
 
   public static abstract class ParquetFileMetadata {
@@ -752,8 +801,9 @@ public class Metadata {
     }
   }
 
-  @JsonTypeName("v1")
+  @JsonTypeName(V1)
   public static class ParquetTableMetadata_v1 extends ParquetTableMetadataBase {
+    @JsonProperty(value = "metadata_version", access = JsonProperty.Access.WRITE_ONLY) private String metadataVersion;
     @JsonProperty List<ParquetFileMetadata_v1> files;
     @JsonProperty List<String> directories;
 
@@ -761,7 +811,8 @@ public class Metadata {
       super();
     }
 
-    public ParquetTableMetadata_v1(List<ParquetFileMetadata_v1> files, List<String> directories) {
+    public ParquetTableMetadata_v1(String metadataVersion, List<ParquetFileMetadata_v1> files, List<String> directories) {
+      this.metadataVersion = metadataVersion;
       this.files = files;
       this.directories = directories;
     }
@@ -806,13 +857,17 @@ public class Metadata {
     }
 
     @JsonIgnore @Override public ParquetTableMetadataBase clone() {
-      return new ParquetTableMetadata_v1(files, directories);
+      return new ParquetTableMetadata_v1(metadataVersion, files, directories);
     }
 
-    @Override
+    @JsonIgnore @Override
     public String getDrillVersion() {
       return null;
     }
+
+    @JsonIgnore @Override public String getMetadataVersion() {
+      return metadataVersion;
+    }
   }
 
 
@@ -1011,7 +1066,8 @@ public class Metadata {
   /**
    * Struct which contains the metadata for an entire parquet directory structure
    */
-  @JsonTypeName("v2") public static class ParquetTableMetadata_v2 extends ParquetTableMetadataBase {
+  @JsonTypeName(V2) public static class ParquetTableMetadata_v2 extends ParquetTableMetadataBase {
+    @JsonProperty(value = "metadata_version", access = JsonProperty.Access.WRITE_ONLY) private String metadataVersion;
     /*
      ColumnTypeInfo is schema information from all the files and row groups, merged into
      one. To get this info, we pass the ParquetTableMetadata object all the way dow to the
@@ -1026,20 +1082,23 @@ public class Metadata {
       super();
     }
 
-    public ParquetTableMetadata_v2(String drillVersion) {
+    public ParquetTableMetadata_v2(String metadataVersion, String drillVersion) {
+      this.metadataVersion = metadataVersion;
       this.drillVersion = drillVersion;
     }
 
-    public ParquetTableMetadata_v2(ParquetTableMetadataBase parquetTable,
+    public ParquetTableMetadata_v2(String metadataVersion, ParquetTableMetadataBase parquetTable,
         List<ParquetFileMetadata_v2> files, List<String> directories, String drillVersion) {
+      this.metadataVersion = metadataVersion;
       this.files = files;
       this.directories = directories;
       this.columnTypeInfo = ((ParquetTableMetadata_v2) parquetTable).columnTypeInfo;
       this.drillVersion = drillVersion;
     }
 
-    public ParquetTableMetadata_v2(List<ParquetFileMetadata_v2> files, List<String> directories,
+    public ParquetTableMetadata_v2(String metadataVersion, List<ParquetFileMetadata_v2> files, List<String> directories,
         ConcurrentHashMap<ColumnTypeMetadata_v2.Key, ColumnTypeMetadata_v2> columnTypeInfo, String drillVersion) {
+      this.metadataVersion = metadataVersion;
       this.files = files;
       this.directories = directories;
       this.columnTypeInfo = columnTypeInfo;
@@ -1090,7 +1149,7 @@ public class Metadata {
     }
 
     @JsonIgnore @Override public ParquetTableMetadataBase clone() {
-      return new ParquetTableMetadata_v2(files, directories, columnTypeInfo, drillVersion);
+      return new ParquetTableMetadata_v2(metadataVersion, files, directories, columnTypeInfo, drillVersion);
     }
 
     @JsonIgnore @Override
@@ -1098,6 +1157,10 @@ public class Metadata {
       return drillVersion;
     }
 
+    @JsonIgnore @Override public String getMetadataVersion() {
+      return metadataVersion;
+    }
+
   }
 
 
@@ -1358,12 +1421,9 @@ public class Metadata {
 
   }
 
-  /**
-   * Struct which contains the metadata for an entire parquet directory structure
-   *
-   * Difference between v3 and v2 : min/max, type_length, precision, scale, repetitionLevel, definitionLevel
-   */
-  @JsonTypeName("v3") public static class ParquetTableMetadata_v3 extends ParquetTableMetadataBase {
+  @JsonTypeName(V3_1)
+  public static class ParquetTableMetadata_v3 extends ParquetTableMetadataBase {
+    @JsonProperty(value = "metadata_version", access = JsonProperty.Access.WRITE_ONLY) private String metadataVersion;
     /*
      ColumnTypeInfo is schema information from all the files and row groups, merged into
      one. To get this info, we pass the ParquetTableMetadata object all the way dow to the
@@ -1376,31 +1436,34 @@ public class Metadata {
 
     /**
      * Default constructor needed for deserialization from Parquet Metadata Cache Files
-     * or for creating an empty instances of this class for the case when the Metadata Cache File is absent
      */
     public ParquetTableMetadata_v3() {
       super();
     }
 
     /**
-     * Used for creating the Parquet Metadata Cache File
-     * @param drillVersion  actual version of apache drill
+     * Used for creating the Parquet Metadata cache file
+     * @param metadataVersion metadata version
+     * @param drillVersion  apache drill version
      */
-    public ParquetTableMetadata_v3(String drillVersion) {
+    public ParquetTableMetadata_v3(String metadataVersion, String drillVersion) {
+      this.metadataVersion = metadataVersion;
       this.drillVersion = drillVersion;
     }
 
-    public ParquetTableMetadata_v3(ParquetTableMetadataBase parquetTable,
+    public ParquetTableMetadata_v3(String metadataVersion, ParquetTableMetadataBase parquetTable,
         List<ParquetFileMetadata_v3> files, List<String> directories, String drillVersion) {
+      this.metadataVersion = metadataVersion;
       this.files = files;
       this.directories = directories;
       this.columnTypeInfo = ((ParquetTableMetadata_v3) parquetTable).columnTypeInfo;
       this.drillVersion = drillVersion;
     }
 
-    public ParquetTableMetadata_v3(List<ParquetFileMetadata_v3> files, List<String> directories,
+    public ParquetTableMetadata_v3(String metadataVersion, List<ParquetFileMetadata_v3> files, List<String> directories,
         ConcurrentHashMap<ColumnTypeMetadata_v3.Key, ColumnTypeMetadata_v3> columnTypeInfo,
         String drillVersion) {
+      this.metadataVersion = metadataVersion;
       this.files = files;
       this.directories = directories;
       this.columnTypeInfo = columnTypeInfo;
@@ -1415,6 +1478,10 @@ public class Metadata {
       return directories;
     }
 
+    @JsonIgnore @Override public String getMetadataVersion() {
+      return metadataVersion;
+    }
+
     /**
      * If directories list and file metadata list contain relative paths, update it to absolute ones
      * @param baseDir base parent directory
@@ -1463,7 +1530,7 @@ public class Metadata {
     }
 
     @JsonIgnore @Override public ParquetTableMetadataBase clone() {
-      return new ParquetTableMetadata_v3(files, directories, columnTypeInfo, drillVersion);
+      return new ParquetTableMetadata_v3(metadataVersion, files, directories, columnTypeInfo, drillVersion);
     }
 
     @JsonIgnore @Override
@@ -1767,7 +1834,7 @@ public class Metadata {
         List<String> absolutePaths = Lists.newArrayList();
         for (String relativePath : paths) {
           String absolutePath = (new Path(relativePath).isAbsolute()) ? relativePath
-              : new Path(baseDir, relativePath).toUri().toString();
+              : new Path(baseDir, relativePath).toUri().getPath();
           absolutePaths.add(absolutePath);
         }
         return absolutePaths;
@@ -1790,7 +1857,7 @@ public class Metadata {
           Path relativePath = new Path(file.getPath());
           // create a new file if old one contains a relative path, otherwise use an old file
           ParquetFileMetadata_v3 fileWithAbsolutePath = (relativePath.isAbsolute()) ? file
-              : new ParquetFileMetadata_v3(new Path(baseDir, relativePath).toUri().toString(), file.length, file.rowGroups);
+              : new ParquetFileMetadata_v3(new Path(baseDir, relativePath).toUri().getPath(), file.length, file.rowGroups);
           filesWithAbsolutePaths.add(fileWithAbsolutePath);
         }
         return filesWithAbsolutePaths;
@@ -1817,7 +1884,7 @@ public class Metadata {
         filesWithRelativePaths.add(new ParquetFileMetadata_v3(
             relativize(baseDir, file.getPath()), file.length, file.rowGroups));
       }
-      return new ParquetTableMetadata_v3(tableMetadataWithAbsolutePaths, filesWithRelativePaths,
+      return new ParquetTableMetadata_v3(V3_1, tableMetadataWithAbsolutePaths, filesWithRelativePaths,
           directoriesWithRelativePaths, DrillVersionInfo.getVersion());
     }
 
@@ -1837,9 +1904,9 @@ public class Metadata {
           .relativize(fullPathWithoutSchemeAndAuthority.toUri()));
       if (relativeFilePath.isAbsolute()) {
         throw new IllegalStateException(String.format("Path %s is not a subpath of %s.",
-            basePathWithoutSchemeAndAuthority.toUri().toString(), fullPathWithoutSchemeAndAuthority.toUri().toString()));
+            basePathWithoutSchemeAndAuthority.toUri().getPath(), fullPathWithoutSchemeAndAuthority.toUri().getPath()));
       }
-      return relativeFilePath.toUri().toString();
+      return relativeFilePath.toUri().getPath();
     }
   }
 

http://git-wip-us.apache.org/repos/asf/drill/blob/e9065b55/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/MetadataVersion.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/MetadataVersion.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/MetadataVersion.java
new file mode 100644
index 0000000..bc6fd70
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/MetadataVersion.java
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.parquet;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ComparisonChain;
+import com.google.common.collect.ImmutableSortedSet;
+import org.apache.drill.common.exceptions.DrillRuntimeException;
+
+import java.util.SortedSet;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+
+public class MetadataVersion implements Comparable<MetadataVersion> {
+
+  private static final String FORMAT = "v?((?!0)\\d+)(\\.(\\d+))?";
+  private static final Pattern PATTERN = Pattern.compile(FORMAT);
+
+  private final int major;
+  private final int minor;
+
+  public MetadataVersion(int major, int minor) {
+    this.major = major;
+    this.minor = minor;
+  }
+
+  public MetadataVersion(String metadataVersion) {
+    Matcher matcher = PATTERN.matcher(metadataVersion);
+    if (!matcher.matches()) {
+      DrillRuntimeException.format("Could not parse metadata version '%s' using format '%s'", metadataVersion, FORMAT);
+    }
+    this.major = Integer.parseInt(matcher.group(1));
+    this.minor = matcher.group(3) != null ? Integer.parseInt(matcher.group(3)) : 0;
+  }
+
+  public int getMajor() {
+    return major;
+  }
+
+  public int getMinor() {
+    return minor;
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) {
+      return true;
+    }
+    if (!(o instanceof MetadataVersion)) {
+      return false;
+    }
+    MetadataVersion that = (MetadataVersion) o;
+    return this.major == that.major
+        && this.minor == that.minor;
+  }
+
+  @Override
+  public int hashCode() {
+    int result = major;
+    result = 31 * result + minor;
+    return result;
+  }
+
+  /**
+   * @return string representation of the metadata file version, for example: "1", "10", "4.13"
+   * <p>
+   * String metadata version consists of the following characters:<p>
+   * major metadata version (any number of digits, except a single zero digit),<p>
+   * optional "." delimiter (used if minor metadata version is specified),<p>
+   * minor metadata version (not specified for "0" minor version)
+   */
+  @Override
+  public String toString() {
+    StringBuilder builder = new StringBuilder();
+    builder.append(major);
+    if (minor != 0) {
+      builder.append(".").append(minor);
+    }
+    return builder.toString();
+  }
+
+  @Override
+  public int compareTo(MetadataVersion o) {
+    Preconditions.checkNotNull(o);
+    return ComparisonChain.start()
+        .compare(this.major, o.major)
+        .compare(this.minor, o.minor)
+        .result();
+  }
+
+/**
+ * Supported metadata versions.
+ * <p>
+ * Note: keep them synchronized with {@link Metadata.ParquetTableMetadataBase} versions
+ */
+  public static class Constants {
+    /**
+     * Version 1: Introduces parquet file metadata caching.<br>
+     * See DRILL-2743
+     */
+    public static final String V1 = "v1";
+    /**
+     * Version 2: Metadata cache file size is reduced.<br>
+     * See DRILL-4053
+     */
+    public static final String V2 = "v2";
+    /**
+     * Version 3: Difference between v3 and v2 : min/max, type_length, precision, scale, repetitionLevel, definitionLevel.<br>
+     * Filter pushdown for Parquet is implemented. <br>
+     * See DRILL-1950
+     */
+    public static final String V3 = "v3";
+    /**
+     * Version 3.1: Absolute paths of files and directories are replaced with relative ones. Metadata version value
+     * doesn't contain `v` letter<br>
+     * See DRILL-3867, DRILL-5660
+     */
+    public static final String V3_1 = "3.1";
+
+    /**
+     * All historical versions of the Drill metadata cache files. In case of introducing a new parquet metadata version
+     * please follow the {@link MetadataVersion#FORMAT}.
+     */
+    public static final SortedSet<MetadataVersion> SUPPORTED_VERSIONS = ImmutableSortedSet.of(
+        new MetadataVersion(V1),
+        new MetadataVersion(V2),
+        new MetadataVersion(V3),
+        new MetadataVersion(V3_1)
+    );
+
+    /**
+     * @param metadataVersion string representation of the parquet metadata version
+     * @return true if metadata version is supported, false otherwise
+     */
+    public static boolean isVersionSupported(String metadataVersion) {
+      return SUPPORTED_VERSIONS.contains(new MetadataVersion(metadataVersion));
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/e9065b55/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFormatConfig.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFormatConfig.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFormatConfig.java
index b33186e..9d4c453 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFormatConfig.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFormatConfig.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -17,17 +17,25 @@
  */
 package org.apache.drill.exec.store.parquet;
 
+import com.fasterxml.jackson.annotation.JsonIgnore;
 import com.fasterxml.jackson.annotation.JsonInclude;
-import com.fasterxml.jackson.annotation.JsonProperty;
 import org.apache.drill.common.logical.FormatPluginConfig;
 
 import com.fasterxml.jackson.annotation.JsonTypeName;
 
 @JsonTypeName("parquet") @JsonInclude(JsonInclude.Include.NON_DEFAULT)
-public class ParquetFormatConfig implements FormatPluginConfig{
+public class ParquetFormatConfig implements FormatPluginConfig {
 
   public boolean autoCorrectCorruptDates = true;
 
+  /**
+   * @return true if auto correction of corrupt dates is enabled, false otherwise
+   */
+  @JsonIgnore
+  public boolean areCorruptDatesAutoCorrected() {
+    return autoCorrectCorruptDates;
+  }
+
   @Override
   public boolean equals(Object o) {
     if (this == o) {

http://git-wip-us.apache.org/repos/asf/drill/blob/e9065b55/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFormatPlugin.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFormatPlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFormatPlugin.java
index 3f331b1..e457e18 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFormatPlugin.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFormatPlugin.java
@@ -167,7 +167,7 @@ public class ParquetFormatPlugin implements FormatPlugin{
   @Override
   public ParquetGroupScan getGroupScan(String userName, FileSelection selection, List<SchemaPath> columns)
       throws IOException {
-    return new ParquetGroupScan(userName, selection, this, selection.selectionRoot, selection.cacheFileRoot, columns);
+    return new ParquetGroupScan(userName, selection, this, columns);
   }
 
   @Override
@@ -222,8 +222,8 @@ public class ParquetFormatPlugin implements FormatPlugin{
           // create a metadata context that will be used for the duration of the query for this table
           MetadataContext metaContext = new MetadataContext();
 
-          ParquetTableMetadataDirs mDirs = Metadata.readMetadataDirs(fs, dirMetaPath.toString(), metaContext, formatConfig);
-          if (mDirs.getDirectories().size() > 0) {
+          ParquetTableMetadataDirs mDirs = Metadata.readMetadataDirs(fs, dirMetaPath, metaContext, formatConfig);
+          if (mDirs != null && mDirs.getDirectories().size() > 0) {
             FileSelection dirSelection = FileSelection.createFromDirectories(mDirs.getDirectories(), selection,
                 selection.getSelectionRoot() /* cacheFileRoot initially points to selectionRoot */);
             dirSelection.setExpandedPartial();

http://git-wip-us.apache.org/repos/asf/drill/blob/e9065b55/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java
index c333a3e..2989819 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java
@@ -119,6 +119,7 @@ public class ParquetGroupScan extends AbstractFileGroupScan {
   private final ParquetFormatPlugin formatPlugin;
   private final ParquetFormatConfig formatConfig;
   private final DrillFileSystem fs;
+  private final MetadataContext metaContext;
   private String selectionRoot;
 
   private boolean usedMetadataCache = false;
@@ -172,26 +173,23 @@ public class ParquetGroupScan extends AbstractFileGroupScan {
     this.selectionRoot = selectionRoot;
     this.cacheFileRoot = cacheFileRoot;
     this.filter = filter;
+    this.metaContext = new MetadataContext();
 
-    init(null);
+    init();
   }
 
   public ParquetGroupScan( //
       String userName,
       FileSelection selection, //
       ParquetFormatPlugin formatPlugin, //
-      String selectionRoot,
-      String cacheFileRoot,
-      List<SchemaPath> columns) throws IOException{
-    this(userName, selection, formatPlugin, selectionRoot, cacheFileRoot, columns, ValueExpressions.BooleanExpression.TRUE);
+      List<SchemaPath> columns) throws IOException {
+    this(userName, selection, formatPlugin, columns, ValueExpressions.BooleanExpression.TRUE);
   }
 
   public ParquetGroupScan( //
       String userName,
       FileSelection selection, //
       ParquetFormatPlugin formatPlugin, //
-      String selectionRoot,
-      String cacheFileRoot,
       List<SchemaPath> columns,
       LogicalExpression filter) //
       throws IOException {
@@ -201,19 +199,18 @@ public class ParquetGroupScan extends AbstractFileGroupScan {
     this.formatConfig = formatPlugin.getConfig();
     this.fs = ImpersonationUtil.createFileSystem(userName, formatPlugin.getFsConf());
 
-    this.selectionRoot = selectionRoot;
-    this.cacheFileRoot = cacheFileRoot;
+    this.selectionRoot = selection.getSelectionRoot();
+    this.cacheFileRoot = selection.getCacheFileRoot();
+
+    MetadataContext metadataContext = selection.getMetaContext();
+    this.metaContext = metadataContext != null ? metadataContext : new MetadataContext();
 
     final FileSelection fileSelection = expandIfNecessary(selection);
 
     this.entries = Lists.newArrayList();
-    if (fileSelection.getMetaContext() != null &&
-        (fileSelection.getMetaContext().getPruneStatus() == PruneStatus.NOT_STARTED ||
-          fileSelection.getMetaContext().getPruneStatus() == PruneStatus.NOT_PRUNED)) {
-      // if pruning was not applicable or was attempted and nothing was pruned, initialize the
-      // entries with just the selection root instead of the fully expanded list to reduce overhead.
-      // The fully expanded list is already stored as part of the fileSet.
-      // TODO: at some point we should examine whether the list of entries is absolutely needed.
+
+    if (checkForInitializingEntriesWithSelectionRoot()) {
+      // The fully expanded list is already stored as part of the fileSet
       entries.add(new ReadEntryWithPath(fileSelection.getSelectionRoot()));
     } else {
       for (String fileName : fileSelection.getFiles()) {
@@ -223,13 +220,20 @@ public class ParquetGroupScan extends AbstractFileGroupScan {
 
     this.filter = filter;
 
-    init(fileSelection.getMetaContext());
+    init();
   }
 
   /*
    * This is used to clone another copy of the group scan.
    */
   private ParquetGroupScan(ParquetGroupScan that) {
+    this(that, null);
+  }
+
+  /*
+   * This is used to clone another copy of the group scan, but keep the metadata caching info from the file selection
+   */
+  private ParquetGroupScan(ParquetGroupScan that, FileSelection selection) {
     super(that);
     this.columns = that.columns == null ? null : Lists.newArrayList(that.columns);
     this.endpointAffinities = that.endpointAffinities == null ? null : Lists.newArrayList(that.endpointAffinities);
@@ -248,7 +252,14 @@ public class ParquetGroupScan extends AbstractFileGroupScan {
     this.usedMetadataCache = that.usedMetadataCache;
     this.parquetTableMetadata = that.parquetTableMetadata;
     this.filter = that.filter;
-    this.cacheFileRoot = that.cacheFileRoot;
+    if (selection != null) {
+      this.cacheFileRoot = selection.getCacheFileRoot();
+      MetadataContext metaContext = selection.getMetaContext();
+      this.metaContext = metaContext != null ? metaContext : that.metaContext;
+    } else {
+      this.cacheFileRoot = that.cacheFileRoot;
+      this.metaContext = that.metaContext;
+    }
   }
 
   /**
@@ -268,11 +279,31 @@ public class ParquetGroupScan extends AbstractFileGroupScan {
     // use the cacheFileRoot if provided (e.g after partition pruning)
     Path metaFilePath = new Path(cacheFileRoot != null ? cacheFileRoot : selectionRoot, Metadata.METADATA_FILENAME);
     if (!fs.exists(metaFilePath)) { // no metadata cache
+      if (selection.isExpandedPartial()) {
+        logger.error("'{}' metadata file does not exist, but metadata directories cache file is present", metaFilePath);
+        metaContext.setMetadataCacheCorrupted(true);
+      }
+
       return selection;
     }
 
-    FileSelection expandedSelection = initFromMetadataCache(selection, metaFilePath);
-    return expandedSelection;
+    return expandSelectionFromMetadataCache(selection, metaFilePath);
+  }
+
+  /**
+   * For two cases the entries should be initialized with just the selection root instead of the fully expanded list:
+   * <ul>
+   *   <li> When metadata caching is corrupted (to use correct file selection)
+   *   <li> Metadata caching is correct and used, but pruning was not applicable or was attempted and nothing was pruned
+   *        (to reduce overhead in parquet group scan).
+   * </ul>
+   *
+   * @return true if entries should be initialized with selection root, false otherwise
+   */
+  private boolean checkForInitializingEntriesWithSelectionRoot() {
+    // TODO: at some point we should examine whether the list of entries is absolutely needed.
+    return metaContext.isMetadataCacheCorrupted() || (parquetTableMetadata != null &&
+        (metaContext.getPruneStatus() == PruneStatus.NOT_STARTED || metaContext.getPruneStatus() == PruneStatus.NOT_PRUNED));
   }
 
   public List<ReadEntryWithPath> getEntries() {
@@ -633,14 +664,17 @@ public class ParquetGroupScan extends AbstractFileGroupScan {
    * @throws UserException when the updated selection is empty, this happens if the user selects an empty folder.
    */
   private FileSelection
-  initFromMetadataCache(FileSelection selection, Path metaFilePath) throws IOException {
+  expandSelectionFromMetadataCache(FileSelection selection, Path metaFilePath) throws IOException {
     // get the metadata for the root directory by reading the metadata file
     // parquetTableMetadata contains the metadata for all files in the selection root folder, but we need to make sure
     // we only select the files that are part of selection (by setting fileSet appropriately)
 
     // get (and set internal field) the metadata for the directory by reading the metadata file
-    this.parquetTableMetadata = Metadata.readBlockMeta(fs, metaFilePath.toString(), selection.getMetaContext(), formatConfig);
-    if (formatConfig.autoCorrectCorruptDates) {
+    parquetTableMetadata = Metadata.readBlockMeta(fs, metaFilePath, metaContext, formatConfig);
+    if (ignoreExpandingSelection(parquetTableMetadata)) {
+      return selection;
+    }
+    if (formatConfig.areCorruptDatesAutoCorrected()) {
       ParquetReaderUtility.correctDatesInMetadataCache(this.parquetTableMetadata);
     }
     List<FileStatus> fileStatuses = selection.getStatuses(fs);
@@ -656,8 +690,7 @@ public class ParquetGroupScan extends AbstractFileGroupScan {
         fileSet.add(file.getPath());
       }
 
-    } else if (selection.isExpandedPartial() && !selection.hadWildcard() &&
-        cacheFileRoot != null) {
+    } else if (selection.isExpandedPartial() && !selection.hadWildcard() && cacheFileRoot != null) {
       if (selection.wasAllPartitionsPruned()) {
         // if all partitions were previously pruned, we only need to read 1 file (for the schema)
         fileSet.add(this.parquetTableMetadata.getFiles().get(0).getPath());
@@ -677,7 +710,10 @@ public class ParquetGroupScan extends AbstractFileGroupScan {
         if (status.isDirectory()) {
           //TODO [DRILL-4496] read the metadata cache files in parallel
           final Path metaPath = new Path(cacheFileRoot, Metadata.METADATA_FILENAME);
-          final Metadata.ParquetTableMetadataBase metadata = Metadata.readBlockMeta(fs, metaPath.toString(), selection.getMetaContext(), formatConfig);
+          final Metadata.ParquetTableMetadataBase metadata = Metadata.readBlockMeta(fs, metaPath, metaContext, formatConfig);
+          if (ignoreExpandingSelection(metadata)) {
+            return selection;
+          }
           for (Metadata.ParquetFileMetadata file : metadata.getFiles()) {
             fileSet.add(file.getPath());
           }
@@ -710,11 +746,11 @@ public class ParquetGroupScan extends AbstractFileGroupScan {
         cacheFileRoot, selection.wasAllPartitionsPruned());
 
     newSelection.setExpandedFully();
-    newSelection.setMetaContext(selection.getMetaContext());
+    newSelection.setMetaContext(metaContext);
     return newSelection;
   }
 
-  private void init(MetadataContext metaContext) throws IOException {
+  private void init() throws IOException {
     Path metaPath = null;
     if (entries.size() == 1 && parquetTableMetadata == null) {
       Path p = Path.getPathWithoutSchemeAndAuthority(new Path(entries.get(0).getPath()));
@@ -723,24 +759,31 @@ public class ParquetGroupScan extends AbstractFileGroupScan {
         // if querying a single file we can look up the metadata directly from the file
         metaPath = new Path(p, Metadata.METADATA_FILENAME);
       }
-      if (metaPath != null && fs.exists(metaPath)) {
-        usedMetadataCache = true;
-        parquetTableMetadata = Metadata.readBlockMeta(fs, metaPath.toString(), metaContext, formatConfig);
-      } else {
+      if (!metaContext.isMetadataCacheCorrupted() && metaPath != null && fs.exists(metaPath)) {
+        parquetTableMetadata = Metadata.readBlockMeta(fs, metaPath, metaContext, formatConfig);
+        if (parquetTableMetadata != null) {
+          usedMetadataCache = true;
+        }
+      }
+      if (!usedMetadataCache) {
         parquetTableMetadata = Metadata.getParquetTableMetadata(fs, p.toString(), formatConfig);
       }
     } else {
       Path p = Path.getPathWithoutSchemeAndAuthority(new Path(selectionRoot));
       metaPath = new Path(p, Metadata.METADATA_FILENAME);
-      if (fs.isDirectory(new Path(selectionRoot)) && fs.exists(metaPath)) {
-        usedMetadataCache = true;
+      if (!metaContext.isMetadataCacheCorrupted() && fs.isDirectory(new Path(selectionRoot))
+          && fs.exists(metaPath)) {
         if (parquetTableMetadata == null) {
-          parquetTableMetadata = Metadata.readBlockMeta(fs, metaPath.toString(), metaContext, formatConfig);
+          parquetTableMetadata = Metadata.readBlockMeta(fs, metaPath, metaContext, formatConfig);
         }
-        if (fileSet != null) {
-          parquetTableMetadata = removeUnneededRowGroups(parquetTableMetadata);
+        if (parquetTableMetadata != null) {
+          usedMetadataCache = true;
+          if (fileSet != null) {
+            parquetTableMetadata = removeUnneededRowGroups(parquetTableMetadata);
+          }
         }
-      } else {
+      }
+      if (!usedMetadataCache) {
         final List<FileStatus> fileStatuses = Lists.newArrayList();
         for (ReadEntryWithPath entry : entries) {
           fileStatuses.addAll(DrillFileSystemUtil.listFiles(fs, Path.getPathWithoutSchemeAndAuthority(new Path(entry.getPath())), true));
@@ -913,10 +956,6 @@ public class ParquetGroupScan extends AbstractFileGroupScan {
     return toString();
   }
 
-  public void setCacheFileRoot(String cacheFileRoot) {
-    this.cacheFileRoot = cacheFileRoot;
-  }
-
   @Override
   public String toString() {
     String cacheFileString = "";
@@ -972,10 +1011,9 @@ public class ParquetGroupScan extends AbstractFileGroupScan {
 
   @Override
   public ParquetGroupScan clone(FileSelection selection) throws IOException {
-    ParquetGroupScan newScan = new ParquetGroupScan(this);
+    ParquetGroupScan newScan = new ParquetGroupScan(this, selection);
     newScan.modifyFileSelection(selection);
-    newScan.setCacheFileRoot(selection.cacheFileRoot);
-    newScan.init(selection.getMetaContext());
+    newScan.init();
     return newScan;
   }
 
@@ -1123,4 +1161,20 @@ public class ParquetGroupScan extends AbstractFileGroupScan {
     }
   }
 
+  /**
+   * If metadata is corrupted, ignore expanding selection and reset parquetTableMetadata and fileSet fields
+   *
+   * @param metadata parquet table metadata
+   * @return true if parquet metadata is corrupted, false otherwise
+   */
+  private boolean ignoreExpandingSelection(ParquetTableMetadataBase metadata) {
+    if (metadata == null || metaContext.isMetadataCacheCorrupted()) {
+      logger.debug("Selection can't be expanded since metadata file is corrupted or metadata version is not supported");
+      this.parquetTableMetadata = null;
+      this.fileSet = null;
+      return true;
+    }
+    return false;
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/e9065b55/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetReaderUtility.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetReaderUtility.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetReaderUtility.java
index 7d7c13b..78e9655 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetReaderUtility.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetReaderUtility.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -139,12 +139,13 @@ public class ParquetReaderUtility {
   }
 
   public static void correctDatesInMetadataCache(Metadata.ParquetTableMetadataBase parquetTableMetadata) {
-    DateCorruptionStatus cacheFileCanContainsCorruptDates = parquetTableMetadata instanceof Metadata.ParquetTableMetadata_v3 ?
+    DateCorruptionStatus cacheFileCanContainsCorruptDates =
+        new MetadataVersion(parquetTableMetadata.getMetadataVersion()).compareTo(new MetadataVersion(3, 0)) >= 0 ?
         DateCorruptionStatus.META_SHOWS_NO_CORRUPTION : DateCorruptionStatus.META_UNCLEAR_TEST_VALUES;
     if (cacheFileCanContainsCorruptDates == DateCorruptionStatus.META_UNCLEAR_TEST_VALUES) {
       // Looking for the DATE data type of column names in the metadata cache file ("metadata_version" : "v2")
       String[] names = new String[0];
-      if (parquetTableMetadata instanceof Metadata.ParquetTableMetadata_v2) {
+      if (new MetadataVersion(2, 0).equals(new MetadataVersion(parquetTableMetadata.getMetadataVersion()))) {
         for (Metadata.ColumnTypeMetadata_v2 columnTypeMetadata :
             ((Metadata.ParquetTableMetadata_v2) parquetTableMetadata).columnTypeInfo.values()) {
           if (OriginalType.DATE.equals(columnTypeMetadata.originalType)) {
@@ -158,7 +159,7 @@ public class ParquetReaderUtility {
         Metadata.RowGroupMetadata rowGroupMetadata = file.getRowGroups().get(0);
         for (Metadata.ColumnMetadata columnMetadata : rowGroupMetadata.getColumns()) {
           // Setting Min/Max values for ParquetTableMetadata_v1
-          if (parquetTableMetadata instanceof Metadata.ParquetTableMetadata_v1) {
+          if (new MetadataVersion(1, 0).equals(new MetadataVersion(parquetTableMetadata.getMetadataVersion()))) {
             OriginalType originalType = columnMetadata.getOriginalType();
             if (OriginalType.DATE.equals(originalType) && columnMetadata.hasSingleValue() &&
                 (Integer) columnMetadata.getMaxValue() > ParquetReaderUtility.DATE_CORRUPTION_THRESHOLD) {
@@ -168,7 +169,7 @@ public class ParquetReaderUtility {
             }
           }
           // Setting Max values for ParquetTableMetadata_v2
-          else if (parquetTableMetadata instanceof Metadata.ParquetTableMetadata_v2 &&
+          else if (new MetadataVersion(2, 0).equals(new MetadataVersion(parquetTableMetadata.getMetadataVersion())) &&
               columnMetadata.getName() != null && Arrays.equals(columnMetadata.getName(), names) &&
               columnMetadata.hasSingleValue() && (Integer) columnMetadata.getMaxValue() >
               ParquetReaderUtility.DATE_CORRUPTION_THRESHOLD) {

http://git-wip-us.apache.org/repos/asf/drill/blob/e9065b55/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRowGroupScan.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRowGroupScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRowGroupScan.java
index f62efb5..5da6aef 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRowGroupScan.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRowGroupScan.java
@@ -47,7 +47,7 @@ import com.google.common.collect.Iterators;
 public class ParquetRowGroupScan extends AbstractBase implements SubScan {
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ParquetRowGroupScan.class);
 
-  public final ParquetFormatConfig formatConfig;
+  private final ParquetFormatConfig formatConfig;
   private final ParquetFormatPlugin formatPlugin;
   private final List<RowGroupReadEntry> rowGroupReadEntries;
   private final List<SchemaPath> columns;
@@ -140,4 +140,11 @@ public class ParquetRowGroupScan extends AbstractBase implements SubScan {
     return CoreOperatorType.PARQUET_ROW_GROUP_SCAN_VALUE;
   }
 
+  /**
+   * @return Parquet plugin format config
+   */
+  public ParquetFormatConfig getFormatConfig() {
+    return formatConfig;
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/e9065b55/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java
index 490a5a0..21fc4ef 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java
@@ -113,7 +113,7 @@ public class ParquetScanBatchCreator implements BatchCreator<ParquetRowGroupScan
           logger.trace("ParquetTrace,Read Footer,{},{},{},{},{},{},{}", "", e.getPath(), "", 0, 0, 0, timeToRead);
           footers.put(e.getPath(), footer );
         }
-        boolean autoCorrectCorruptDates = rowGroupScan.formatConfig.autoCorrectCorruptDates;
+        boolean autoCorrectCorruptDates = rowGroupScan.getFormatConfig().areCorruptDatesAutoCorrected();
         ParquetReaderUtility.DateCorruptionStatus containsCorruptDates = ParquetReaderUtility.detectCorruptDates(footers.get(e.getPath()), rowGroupScan.getColumns(),
                 autoCorrectCorruptDates);
         if (logger.isDebugEnabled()) {

http://git-wip-us.apache.org/repos/asf/drill/blob/e9065b55/exec/java-exec/src/test/java/org/apache/drill/BaseTestQuery.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/BaseTestQuery.java b/exec/java-exec/src/test/java/org/apache/drill/BaseTestQuery.java
index e3ce9fc..7e8bdab 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/BaseTestQuery.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/BaseTestQuery.java
@@ -623,6 +623,11 @@ public class BaseTestQuery extends ExecTest {
         destination);
   }
 
+  protected static void copyMetaDataCacheToTempReplacingInternalPaths(String srcFileOnClassPath, String destFolderInTmp,
+      String metaFileName) throws IOException {
+    copyMetaDataCacheToTempWithReplacements(srcFileOnClassPath, destFolderInTmp, metaFileName, null);
+  }
+
   /**
    * Old metadata cache files include full paths to the files that have been scanned.
    * <p>
@@ -639,16 +644,20 @@ public class BaseTestQuery extends ExecTest {
    * @param srcFileOnClassPath the source path of metadata cache file, which should be replaced
    * @param destFolderInTmp  the parent folder name of the metadata cache file
    * @param metaFileName the name of metadata cache file depending on the type of the metadata
+   * @param customStringReplacement custom string to replace the "CUSTOM_REPLACED" target string in metadata file
    * @throws IOException if a create or write errors occur
    */
-  protected static void copyMetaDataCacheToTempReplacingInternalPaths(String srcFileOnClassPath, String destFolderInTmp,
-      String metaFileName) throws IOException {
+  protected static void copyMetaDataCacheToTempWithReplacements(String srcFileOnClassPath,
+      String destFolderInTmp, String metaFileName, String customStringReplacement) throws IOException {
     String metadataFileContents = getFile(srcFileOnClassPath);
     Path rootMeta = new Path(dfsTestTmpSchemaLocation, destFolderInTmp);
     Path newMetaCache = new Path(rootMeta, metaFileName);
-    FSDataOutputStream outSteam = fs.create(newMetaCache);
-    outSteam.writeBytes(metadataFileContents.replace("REPLACED_IN_TEST", dfsTestTmpSchemaLocation));
-    outSteam.close();
+    try (FSDataOutputStream outSteam = fs.create(newMetaCache)) {
+      if (customStringReplacement != null) {
+        metadataFileContents = metadataFileContents.replace("CUSTOM_STRING_REPLACEMENT", customStringReplacement);
+      }
+      outSteam.writeBytes(metadataFileContents.replace("REPLACED_IN_TEST", dfsTestTmpSchemaLocation));
+    }
   }
 
  }

http://git-wip-us.apache.org/repos/asf/drill/blob/e9065b55/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestParquetMetadataCache.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestParquetMetadataCache.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestParquetMetadataCache.java
index edb3bd8..b6f1408 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestParquetMetadataCache.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestParquetMetadataCache.java
@@ -17,45 +17,59 @@
  */
 package org.apache.drill.exec.store.parquet;
 
-import com.google.common.base.Joiner;
+import mockit.Mock;
+import mockit.MockUp;
+import mockit.integration.junit4.JMockit;
 import org.apache.drill.PlanTestBase;
 import org.apache.drill.common.util.TestTools;
 import org.apache.commons.io.FileUtils;
+import org.apache.drill.exec.store.dfs.MetadataContext;
 import org.apache.hadoop.fs.Path;
+import org.junit.AfterClass;
 import org.junit.Assert;
 import org.junit.BeforeClass;
 import org.junit.Test;
+import org.junit.runner.RunWith;
 
 import java.io.File;
 import java.nio.file.Files;
+import java.util.List;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 
+@RunWith(JMockit.class)
 public class TestParquetMetadataCache extends PlanTestBase {
   private static final String WORKING_PATH = TestTools.getWorkingPath();
   private static final String TEST_RES_PATH = WORKING_PATH + "/src/test/resources";
   private static final String tableName1 = "parquetTable1";
   private static final String tableName2 = "parquetTable2";
-  private static final String RELATIVE_PATHS_METADATA = "relative_paths_metadata";
+  private static File dataDir1;
+  private static File dataDir2;
 
 
   @BeforeClass
   public static void copyData() throws Exception {
     // copy the data into the temporary location
     String tmpLocation = getDfsTestTmpSchemaLocation();
-    File dataDir1 = new File(tmpLocation + Path.SEPARATOR + tableName1);
+    dataDir1 = new File(tmpLocation, tableName1);
     dataDir1.mkdir();
     FileUtils.copyDirectory(new File(String.format(String.format("%s/multilevel/parquet", TEST_RES_PATH))),
         dataDir1);
 
-    File dataDir2 = new File(tmpLocation + Path.SEPARATOR + tableName2);
+    dataDir2 = new File(tmpLocation, tableName2);
     dataDir2.mkdir();
     FileUtils.copyDirectory(new File(String.format(String.format("%s/multilevel/parquet2", TEST_RES_PATH))),
         dataDir2);
   }
 
+  @AfterClass
+  public static void cleanupTestData() throws Exception {
+    FileUtils.deleteQuietly(dataDir1);
+    FileUtils.deleteQuietly(dataDir2);
+  }
+
   @Test
   public void testPartitionPruningWithMetadataCache_1() throws Exception {
     test(String.format("refresh table metadata dfs_test.`%s/%s`", getDfsTestTmpSchemaLocation(), tableName1));
@@ -452,33 +466,232 @@ public class TestParquetMetadataCache extends PlanTestBase {
 
   @Test
   public void testMetadataCacheAbsolutePaths() throws Exception {
+    final String absolutePathsMetadata = "absolute_paths_metadata";
     try {
       test("use dfs_test.tmp");
-      final String relative_path_metadata_t1 = RELATIVE_PATHS_METADATA + "/t1";
-      final String relative_path_metadata_t2 = RELATIVE_PATHS_METADATA + "/t2";
-      test("create table `%s` as select * from cp.`tpch/nation.parquet`", relative_path_metadata_t1);
-      test("create table `%s` as select * from cp.`tpch/nation.parquet`", relative_path_metadata_t2);
+      // creating two inner directories to leverage METADATA_DIRECTORIES_FILENAME metadata file as well
+      final String absolutePathsMetadataT1 = absolutePathsMetadata + "/t1";
+      final String absolutePathsMetadataT2 = absolutePathsMetadata + "/t2";
+      test("create table `%s` as select * from cp.`tpch/nation.parquet`", absolutePathsMetadataT1);
+      test("create table `%s` as select * from cp.`tpch/nation.parquet`", absolutePathsMetadataT2);
       copyMetaDataCacheToTempReplacingInternalPaths("parquet/metadata_with_absolute_path/" +
-          "metadata_directories_with_absolute_paths.requires_replace.txt", RELATIVE_PATHS_METADATA, Metadata.METADATA_DIRECTORIES_FILENAME);
+          "metadata_directories_with_absolute_paths.requires_replace.txt", absolutePathsMetadata, Metadata.METADATA_DIRECTORIES_FILENAME);
       copyMetaDataCacheToTempReplacingInternalPaths("parquet/metadata_with_absolute_path/" +
-          "metadata_table_with_absolute_paths.requires_replace.txt", RELATIVE_PATHS_METADATA, Metadata.METADATA_FILENAME);
+          "metadata_table_with_absolute_paths.requires_replace.txt", absolutePathsMetadata, Metadata.METADATA_FILENAME);
       copyMetaDataCacheToTempReplacingInternalPaths("parquet/metadata_with_absolute_path/" +
-          "metadata_table_with_absolute_paths_t1.requires_replace.txt", relative_path_metadata_t1, Metadata.METADATA_FILENAME);
+          "metadata_table_with_absolute_paths_t1.requires_replace.txt", absolutePathsMetadataT1, Metadata.METADATA_FILENAME);
       copyMetaDataCacheToTempReplacingInternalPaths("parquet/metadata_with_absolute_path/" +
-          "metadata_table_with_absolute_paths_t2.requires_replace.txt", relative_path_metadata_t2, Metadata.METADATA_FILENAME);
+          "metadata_table_with_absolute_paths_t2.requires_replace.txt", absolutePathsMetadataT2, Metadata.METADATA_FILENAME);
+      String query = String.format("select * from %s", absolutePathsMetadata);
+      int expectedRowCount = 50;
+      int expectedNumFiles = 1; // point to selectionRoot since no pruning is done in this query
+      int actualRowCount = testSql(query);
+      assertEquals("An incorrect result was obtained while querying a table with metadata cache files",
+          expectedRowCount, actualRowCount);
+      String numFilesPattern = "numFiles=" + expectedNumFiles;
+      String usedMetaPattern = "usedMetadataFile=true";
+      String cacheFileRootPattern = String.format("cacheFileRoot=%s/%s", getDfsTestTmpSchemaLocation(), absolutePathsMetadata);
+      PlanTestBase.testPlanMatchingPatterns(query, new String[]{numFilesPattern, usedMetaPattern, cacheFileRootPattern},
+          new String[] {"Filter"});
+    } finally {
+      test("drop table if exists %s", absolutePathsMetadata);
+    }
+  }
 
-      int rowCount = testSql(String.format("select * from %s", RELATIVE_PATHS_METADATA));
-      assertEquals("An incorrect result was obtained while querying a table with metadata cache files", 50, rowCount);
+  @Test
+  public void testSpacesInMetadataCachePath() throws Exception {
+    final String pathWithSpaces = "path with spaces";
+    try {
+      test("use dfs_test.tmp");
+      // creating multilevel table to store path with spaces in both metadata files (METADATA and METADATA_DIRECTORIES)
+      test("create table `%s` as select * from cp.`tpch/nation.parquet`", pathWithSpaces);
+      test("create table `%1$s/%1$s` as select * from cp.`tpch/nation.parquet`", pathWithSpaces);
+      test("refresh table metadata `%s`", pathWithSpaces);
+      checkForMetadataFile(pathWithSpaces);
+      String query = String.format("select * from `%s`", pathWithSpaces);
+      int expectedRowCount = 50;
+      int expectedNumFiles = 1; // point to selectionRoot since no pruning is done in this query
+      int actualRowCount = testSql(query);
+      assertEquals("An incorrect result was obtained while querying a table with metadata cache files",
+          expectedRowCount, actualRowCount);
+      String numFilesPattern = "numFiles=" + expectedNumFiles;
+      String usedMetaPattern = "usedMetadataFile=true";
+      String cacheFileRootPattern = String.format("cacheFileRoot=%s/%s", getDfsTestTmpSchemaLocation(), pathWithSpaces);
+      PlanTestBase.testPlanMatchingPatterns(query, new String[]{numFilesPattern, usedMetaPattern, cacheFileRootPattern},
+          new String[] {"Filter"});
+    } finally {
+      test("drop table if exists `%s`", pathWithSpaces);
+    }
+  }
+
+  @Test
+  public void testFutureUnsupportedMetadataVersion() throws Exception {
+    final String unsupportedMetadataVersion = "unsupported_metadata_version";
+    try {
+      test("use dfs_test.tmp");
+      test("create table `%s` as select * from cp.`tpch/nation.parquet`", unsupportedMetadataVersion);
+      MetadataVersion lastVersion = MetadataVersion.Constants.SUPPORTED_VERSIONS.last();
+      // Get the future version, which is absent in MetadataVersions.SUPPORTED_VERSIONS set
+      String futureVersion = new MetadataVersion(lastVersion.getMajor() + 1, 0).toString();
+      copyMetaDataCacheToTempWithReplacements("parquet/unsupported_metadata/unsupported_metadata_version.requires_replace.txt",
+          unsupportedMetadataVersion, Metadata.METADATA_FILENAME, futureVersion);
+      String query = String.format("select * from %s", unsupportedMetadataVersion);
+      int expectedRowCount = 25;
+      int expectedNumFiles = 1;
+      int actualRowCount = testSql(query);
+      assertEquals("An incorrect result was obtained while querying a table with metadata cache files",
+          expectedRowCount, actualRowCount);
+      String numFilesPattern = "numFiles=" + expectedNumFiles;
+      String usedMetaPattern = "usedMetadataFile=false"; // ignoring metadata cache file
+      PlanTestBase.testPlanMatchingPatterns(query, new String[]{numFilesPattern, usedMetaPattern},
+          new String[] {"Filter"});
+    } finally {
+      test("drop table if exists %s", unsupportedMetadataVersion);
+    }
+  }
+
+  @Test
+  public void testCorruptedMetadataFile() throws Exception {
+    final String corruptedMetadata = "corrupted_metadata";
+    try {
+      test("use dfs_test.tmp");
+      test("create table `%s` as select * from cp.`tpch/nation.parquet`", corruptedMetadata);
+      copyMetaDataCacheToTempReplacingInternalPaths("parquet/unsupported_metadata/" +
+          "corrupted_metadata.requires_replace.txt", corruptedMetadata, Metadata.METADATA_FILENAME);
+      String query = String.format("select * from %s", corruptedMetadata);
+      int expectedRowCount = 25;
+      int expectedNumFiles = 1;
+      int actualRowCount = testSql(query);
+      assertEquals("An incorrect result was obtained while querying a table with metadata cache files",
+          expectedRowCount, actualRowCount);
+      String numFilesPattern = "numFiles=" + expectedNumFiles;
+      String usedMetaPattern = "usedMetadataFile=false"; // ignoring metadata cache file
+      PlanTestBase.testPlanMatchingPatterns(query, new String[]{numFilesPattern, usedMetaPattern},
+          new String[] {"Filter"});
+    } finally {
+      test("drop table if exists %s", corruptedMetadata);
+    }
+  }
+
+  @Test
+  public void testEmptyMetadataFile() throws Exception {
+    final String emptyMetadataFile = "empty_metadata_file";
+    try {
+      test("use dfs_test.tmp");
+      test("create table `%s` as select * from cp.`tpch/nation.parquet`", emptyMetadataFile);
+      copyMetaDataCacheToTempReplacingInternalPaths("parquet/unsupported_metadata/" +
+          "empty_metadata_file.requires_replace.txt", emptyMetadataFile, Metadata.METADATA_FILENAME);
+      String query = String.format("select * from %s", emptyMetadataFile);
+      int expectedRowCount = 25;
+      int expectedNumFiles = 1;
+      int actualRowCount = testSql(query);
+      assertEquals("An incorrect result was obtained while querying a table with metadata cache files",
+          expectedRowCount, actualRowCount);
+      String numFilesPattern = "numFiles=" + expectedNumFiles;
+      String usedMetaPattern = "usedMetadataFile=false"; // ignoring metadata cache file
+      PlanTestBase.testPlanMatchingPatterns(query, new String[]{numFilesPattern, usedMetaPattern},
+          new String[] {"Filter"});
+    } finally {
+      test("drop table if exists %s", emptyMetadataFile);
+    }
+  }
+
+  @Test
+  public void testRootMetadataFileIsAbsent() throws Exception {
+    final String tmpDir = getDfsTestTmpSchemaLocation();
+    final String rootMetaCorruptedTable = "root_meta_corrupted_table";
+    File dataDir = new File(tmpDir, rootMetaCorruptedTable);
+    try {
+      // copy the data into the temporary location, delete root metadata file
+      dataDir.mkdir();
+      FileUtils.copyDirectory(new File(String.format(String.format("%s/multilevel/parquet", TEST_RES_PATH))), dataDir);
+
+      test("use dfs_test.tmp");
+      test("refresh table metadata `%s`", rootMetaCorruptedTable);
+      checkForMetadataFile(rootMetaCorruptedTable);
+      File rootMetadataFile = FileUtils.getFile(tmpDir, rootMetaCorruptedTable,  Metadata.METADATA_FILENAME);
+      assertTrue(String.format("Metadata cache file '%s' isn't deleted", rootMetadataFile.getPath()),
+          rootMetadataFile.delete());
+
+      // mock Metadata tableModified method to avoid occasional metadata files updating
+      new MockUp<Metadata>() {
+        @Mock
+        boolean tableModified(List<String> directories, Path metaFilePath, Path parentDir, MetadataContext metaContext) {
+          return false;
+        }
+      };
+
+      String query = String.format("select dir0, dir1, o_custkey, o_orderdate from `%s` " +
+          " where dir0=1994 or dir1='Q3'", rootMetaCorruptedTable);
+      int expectedRowCount = 60;
+      int expectedNumFiles = 6;
+      int actualRowCount = testSql(query);
+      assertEquals("An incorrect result was obtained while querying a table with metadata cache files",
+          expectedRowCount, actualRowCount);
+      String numFilesPattern = "numFiles=" + expectedNumFiles;
+      String usedMetaPattern = "usedMetadataFile=false";
+      PlanTestBase.testPlanMatchingPatterns(query, new String[]{numFilesPattern, usedMetaPattern},
+          new String[] {"cacheFileRoot", "Filter"});
+    } finally {
+      FileUtils.deleteQuietly(dataDir);
+    }
+  }
+
+  @Test
+  public void testInnerMetadataFilesAreAbsent() throws Exception {
+    final String tmpDir = getDfsTestTmpSchemaLocation();
+    final String innerMetaCorruptedTable = "inner_meta_corrupted_table";
+    File dataDir = new File(tmpDir, innerMetaCorruptedTable);
+    try {
+      // copy the data into the temporary location, delete a few inner metadata files
+      dataDir.mkdir();
+      FileUtils.copyDirectory(new File(String.format(String.format("%s/multilevel/parquet", TEST_RES_PATH))), dataDir);
+
+      test("use dfs_test.tmp");
+      test("refresh table metadata `%s`", innerMetaCorruptedTable);
+      checkForMetadataFile(innerMetaCorruptedTable);
+      File firstInnerMetadataFile = FileUtils.getFile(tmpDir, innerMetaCorruptedTable, "1994", Metadata.METADATA_FILENAME);
+      File secondInnerMetadataFile = FileUtils.getFile(tmpDir, innerMetaCorruptedTable, "1994", "Q3", Metadata.METADATA_FILENAME);
+      assertTrue(String.format("Metadata cache file '%s' isn't deleted", firstInnerMetadataFile.getPath()),
+          firstInnerMetadataFile.delete());
+      assertTrue(String.format("Metadata cache file '%s' isn't deleted", secondInnerMetadataFile.getPath()),
+          secondInnerMetadataFile.delete());
+
+      // mock Metadata tableModified method to avoid occasional metadata files updating
+      new MockUp<Metadata>() {
+        @Mock
+        boolean tableModified(List<String> directories, Path metaFilePath, Path parentDir, MetadataContext metaContext) {
+          return false;
+        }
+      };
+
+      String query = String.format("select dir0, dir1, o_custkey, o_orderdate from `%s` " +
+          " where dir0=1994 or dir1='Q3'", innerMetaCorruptedTable);
+      int expectedRowCount = 60;
+      int expectedNumFiles = 6;
+      int actualRowCount = testSql(query);
+      assertEquals("An incorrect result was obtained while querying a table with metadata cache files",
+          expectedRowCount, actualRowCount);
+      String numFilesPattern = "numFiles=" + expectedNumFiles;
+      String usedMetaPattern = "usedMetadataFile=false";
+      PlanTestBase.testPlanMatchingPatterns(query, new String[]{numFilesPattern, usedMetaPattern},
+          new String[] {"cacheFileRoot", "Filter"});
     } finally {
-      test("drop table if exists %s", RELATIVE_PATHS_METADATA);
+      FileUtils.deleteQuietly(dataDir);
     }
   }
 
-  private void checkForMetadataFile(String table) throws Exception {
+  /**
+   * Helper method for checking the metadata file existence
+   *
+   * @param table table name or table path
+   */
+  private void checkForMetadataFile(String table) {
     String tmpDir = getDfsTestTmpSchemaLocation();
-    String metaFile = Joiner.on("/").join(tmpDir, table, Metadata.METADATA_FILENAME);
+    File metaFile = table.startsWith(tmpDir) ? FileUtils.getFile(table, Metadata.METADATA_FILENAME)
+        : FileUtils.getFile(tmpDir, table, Metadata.METADATA_FILENAME);
     assertTrue(String.format("There is no metadata cache file for the %s table", table),
-        Files.exists(new File(metaFile).toPath()));
+        Files.exists(metaFile.toPath()));
   }
 
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/e9065b55/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestParquetMetadataVersion.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestParquetMetadataVersion.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestParquetMetadataVersion.java
new file mode 100644
index 0000000..4640728
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestParquetMetadataVersion.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.parquet;
+
+import org.apache.drill.common.exceptions.DrillRuntimeException;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+
+public class TestParquetMetadataVersion {
+
+  @Test
+  public void testFirstLetter() throws Exception {
+    MetadataVersion versionWithFirstLetter = new MetadataVersion("v4");
+    MetadataVersion expectedVersion = new MetadataVersion(4, 0);
+    assertEquals("Parquet metadata version is parsed incorrectly", expectedVersion, versionWithFirstLetter);
+    MetadataVersion versionWithoutFirstLetter = new MetadataVersion("4");
+    assertEquals("Parquet metadata version is parsed incorrectly", expectedVersion, versionWithoutFirstLetter);
+  }
+
+  @Test(expected = DrillRuntimeException.class)
+  public void testWrongFirstLetter() throws Exception {
+    String versionWithFirstLetterInUpperCase = "V2";
+    try {
+      new MetadataVersion(versionWithFirstLetterInUpperCase);
+    } catch (DrillRuntimeException e) {
+      assertTrue("Not expected exception is obtained while parsing parquet metadata version",
+          e.getMessage().contains(String.format("Could not parse metadata version '%s'", versionWithFirstLetterInUpperCase)));
+      throw e;
+    }
+  }
+
+  @Test
+  public void testTwoDigitsMajorVersion() throws Exception {
+    MetadataVersion twoDigitsMetadataVersion = new MetadataVersion("10.2");
+    MetadataVersion expectedVersion = new MetadataVersion(10, 2);
+    assertEquals("Parquet metadata version is parsed incorrectly", expectedVersion, twoDigitsMetadataVersion);
+  }
+
+  @Test
+  public void testMinorVersion() throws Exception {
+    MetadataVersion withMinorVersion = new MetadataVersion("3.1");
+    MetadataVersion expectedVersionWithMinorVersion = new MetadataVersion(3, 1);
+    assertEquals("Parquet metadata version is parsed incorrectly", expectedVersionWithMinorVersion, withMinorVersion);
+  }
+
+  @Test
+  public void testTwoDigitsMinorVersion() throws Exception {
+    MetadataVersion twoDigitsMinorVersion = new MetadataVersion("3.13");
+    MetadataVersion expectedVersionWithTwoDigitsMinorVersion = new MetadataVersion(3, 13);
+    assertEquals("Parquet metadata version is parsed incorrectly", expectedVersionWithTwoDigitsMinorVersion, twoDigitsMinorVersion);
+  }
+
+  @Test
+  public void testWithoutMinorVersion() throws Exception {
+    MetadataVersion withoutMinorVersion = new MetadataVersion("v3");
+    MetadataVersion expectedVersionWithoutMinorVersion = new MetadataVersion(3, 0);
+    assertEquals("Parquet metadata version is parsed incorrectly", expectedVersionWithoutMinorVersion, withoutMinorVersion);
+  }
+
+  @Test
+  public void testZeroMinorVersion() throws Exception {
+    MetadataVersion zeroMinorVersion = new MetadataVersion("4.0");
+    MetadataVersion expectedVersionZeroMinorVersion = new MetadataVersion(4, 0);
+    assertEquals("Parquet metadata version is parsed incorrectly", expectedVersionZeroMinorVersion, zeroMinorVersion);
+  }
+
+
+  @Test(expected = DrillRuntimeException.class)
+  public void testWrongDelimiter() throws Exception {
+    String versionWithWrongDelimiter = "v3_1";
+    try {
+      new MetadataVersion(versionWithWrongDelimiter);
+    } catch (DrillRuntimeException e) {
+      assertTrue("Not expected exception is obtained while parsing parquet metadata version",
+          e.getMessage().contains(String.format("Could not parse metadata version '%s'", versionWithWrongDelimiter)));
+      throw e;
+    }
+  }
+
+  @Test(expected = DrillRuntimeException.class)
+  public void testZeroMajorVersion() throws Exception {
+    String zeroMajorVersion = "v0.2";
+    try {
+      new MetadataVersion(zeroMajorVersion);
+    } catch (DrillRuntimeException e) {
+      assertTrue("Not expected exception is obtained while parsing parquet metadata version",
+          e.getMessage().contains(String.format("Could not parse metadata version '%s'", zeroMajorVersion)));
+      throw e;
+    }
+  }
+
+  @Test(expected = DrillRuntimeException.class)
+  public void testVersionWithLetterInsteadOfNumber() throws Exception {
+    String versionWithLetterInsteadOfNumber = "v3.O"; // "O" is a letter, not a zero
+    try {
+      new MetadataVersion(versionWithLetterInsteadOfNumber);
+    } catch (DrillRuntimeException e) {
+      assertTrue("Not expected exception is obtained while parsing parquet metadata version",
+          e.getMessage().contains(String.format("Could not parse metadata version '%s'", versionWithLetterInsteadOfNumber)));
+      throw e;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/e9065b55/exec/java-exec/src/test/resources/parquet/metadata_with_absolute_path/metadata_directories_with_absolute_paths.requires_replace.txt
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/resources/parquet/metadata_with_absolute_path/metadata_directories_with_absolute_paths.requires_replace.txt b/exec/java-exec/src/test/resources/parquet/metadata_with_absolute_path/metadata_directories_with_absolute_paths.requires_replace.txt
index 887988d..8a9989d 100644
--- a/exec/java-exec/src/test/resources/parquet/metadata_with_absolute_path/metadata_directories_with_absolute_paths.requires_replace.txt
+++ b/exec/java-exec/src/test/resources/parquet/metadata_with_absolute_path/metadata_directories_with_absolute_paths.requires_replace.txt
@@ -1,3 +1,3 @@
 {
-  "directories" : [ "file:REPLACED_IN_TEST/relative_paths_metadata/t1", "file:REPLACED_IN_TEST/relative_paths_metadata/t2" ]
+  "directories" : [ "file:REPLACED_IN_TEST/absolute_paths_metadata/t1", "file:REPLACED_IN_TEST/absolute_paths_metadata/t2" ]
 }
\ No newline at end of file