You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by vi...@apache.org on 2019/03/05 15:38:29 UTC

[drill] 02/02: DRILL-5603: Replace String file paths to Hadoop Path - replaced all String path representation with org.apache.hadoop.fs.Path - added PathSerDe.Se JSON serializer - refactoring of DFSPartitionLocation code by leveraging existing listPartitionValues() functionality

This is an automated email from the ASF dual-hosted git repository.

vitalii pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git

commit 3d29faf81da593035f6bd38dd56d48e719afe7d4
Author: Vitalii Diravka <vi...@gmail.com>
AuthorDate: Mon Feb 18 22:30:36 2019 +0200

    DRILL-5603: Replace String file paths to Hadoop Path
     - replaced all String path representation with org.apache.hadoop.fs.Path
     - added PathSerDe.Se JSON serializer
     - refactoring of DFSPartitionLocation code by leveraging existing listPartitionValues() functionality
    
    closes #1657
---
 .../exec/store/mapr/db/MapRDBFormatPlugin.java     |   4 +-
 .../store/mapr/streams/StreamsFormatPlugin.java    |  12 +-
 .../exec/store/syslog/SyslogRecordReader.java      |   3 +-
 .../exec/planner/sql/HivePartitionDescriptor.java  |   9 +-
 .../exec/planner/sql/HivePartitionLocation.java    |   8 +-
 .../hive/HiveDrillNativeParquetRowGroupScan.java   |   2 +-
 .../store/hive/HiveDrillNativeParquetScan.java     |   6 +-
 .../HiveDrillNativeParquetScanBatchCreator.java    |   5 +-
 .../drill/exec/store/hive/HivePartitionHolder.java |  11 +-
 .../apache/drill/exec/physical/PhysicalPlan.java   |   3 +-
 .../exec/physical/base/AbstractGroupScan.java      |   3 +-
 .../apache/drill/exec/physical/base/GroupScan.java |   5 +-
 .../drill/exec/physical/base/SchemalessScan.java   |   9 +-
 .../impl/scan/file/BaseFileScanFramework.java      |   2 +-
 .../exec/physical/impl/scan/file/FileMetadata.java |   2 +-
 .../exec/planner/AbstractPartitionDescriptor.java  |   3 +-
 .../exec/planner/DFSDirPartitionLocation.java      |  18 ++-
 .../exec/planner/DFSFilePartitionLocation.java     |  25 +---
 .../planner/FileSystemPartitionDescriptor.java     | 102 ++++++-------
 .../exec/planner/ParquetPartitionDescriptor.java   |  18 +--
 .../exec/planner/ParquetPartitionLocation.java     |   8 +-
 .../drill/exec/planner/PartitionDescriptor.java    |  44 ++++--
 .../drill/exec/planner/PartitionLocation.java      |  14 +-
 .../drill/exec/planner/PhysicalPlanReader.java     |  53 +++----
 .../exec/planner/SimplePartitionLocation.java      |   5 +-
 .../planner/logical/partition/PruneScanRule.java   |  17 ++-
 .../planner/sql/handlers/AnalyzeTableHandler.java  |  28 ++--
 .../sql/handlers/RefreshMetadataHandler.java       |   4 +-
 .../PathSerDe.java}                                |  33 ++---
 .../apache/drill/exec/store/ColumnExplorer.java    |  46 +++---
 .../drill/exec/store/avro/AvroDrillTable.java      |   4 +-
 .../drill/exec/store/avro/AvroRecordReader.java    |  12 +-
 .../apache/drill/exec/store/dfs/FileSelection.java | 161 ++++++++++-----------
 .../drill/exec/store/dfs/FormatSelection.java      |   8 +-
 .../drill/exec/store/dfs/MetadataContext.java      |   7 +-
 .../drill/exec/store/dfs/ReadEntryFromHDFS.java    |   3 +-
 .../drill/exec/store/dfs/ReadEntryWithPath.java    |  13 +-
 .../exec/store/dfs/easy/EasyFormatPlugin.java      |   2 +-
 .../drill/exec/store/dfs/easy/EasyGroupScan.java   |  33 ++---
 .../drill/exec/store/dfs/easy/EasySubScan.java     |  22 +--
 .../apache/drill/exec/store/dfs/easy/FileWork.java |   8 +-
 .../exec/store/direct/MetadataDirectGroupScan.java |  10 +-
 .../exec/store/easy/json/JSONRecordReader.java     |  13 +-
 .../sequencefile/SequenceFileFormatPlugin.java     |   2 +-
 .../exec/store/easy/text/TextFormatPlugin.java     |   2 +-
 .../exec/store/httpd/HttpdLogFormatPlugin.java     |   2 +-
 .../drill/exec/store/image/ImageFormatPlugin.java  |   9 +-
 .../drill/exec/store/image/ImageRecordReader.java  |   4 +-
 .../drill/exec/store/log/LogRecordReader.java      |   5 +-
 .../store/parquet/AbstractParquetGroupScan.java    |  17 ++-
 .../parquet/AbstractParquetScanBatchCreator.java   |  11 +-
 .../drill/exec/store/parquet/ParquetGroupScan.java |  42 +++---
 .../store/parquet/ParquetGroupScanStatistics.java  |   5 +-
 .../exec/store/parquet/ParquetRowGroupScan.java    |  11 +-
 .../store/parquet/ParquetScanBatchCreator.java     |   3 +-
 .../drill/exec/store/parquet/RowGroupInfo.java     |   3 +-
 .../exec/store/parquet/RowGroupReadEntry.java      |   3 +-
 .../parquet/columnreaders/ParquetRecordReader.java |  24 ++-
 .../exec/store/parquet/metadata/Metadata.java      |  37 +++--
 .../exec/store/parquet/metadata/MetadataBase.java  |   5 +-
 .../store/parquet/metadata/MetadataPathUtils.java  |  29 ++--
 .../exec/store/parquet/metadata/Metadata_V1.java   |  13 +-
 .../exec/store/parquet/metadata/Metadata_V2.java   |  15 +-
 .../exec/store/parquet/metadata/Metadata_V3.java   |  15 +-
 .../parquet/metadata/ParquetTableMetadataDirs.java |   7 +-
 .../exec/store/parquet2/DrillParquetReader.java    |   2 +-
 .../drill/exec/store/pcap/PcapRecordReader.java    |   4 +-
 .../exec/store/pcapng/PcapngRecordReader.java      |   4 +-
 .../drill/exec/store/schedule/BlockMapBuilder.java |  22 +--
 .../exec/store/schedule/CompleteFileWork.java      |  42 ++++--
 .../drill/exec/util/DrillFileSystemUtil.java       |  11 ++
 .../test/java/org/apache/drill/PlanTestBase.java   |   3 +-
 .../apache/drill/exec/TestPathSerialization.java   |  65 +++++++++
 .../physical/impl/scan/TestFileScanFramework.java  |   2 +-
 .../exec/physical/unit/MiniPlanUnitTestBase.java   |  13 +-
 .../drill/exec/physical/unit/TestMiniPlan.java     |   5 +-
 .../exec/physical/unit/TestNullInputMiniPlan.java  |   8 +-
 .../drill/exec/store/CachedSingleFileSystem.java   |   8 +-
 .../exec/store/dfs/TestDFSPartitionLocation.java   |  54 +++++++
 .../drill/exec/store/dfs/TestFileSelection.java    |   7 +-
 .../store/parquet/ParquetRecordReaderTest.java     |   4 +-
 .../drill/exec/store/store/TestAssignment.java     |   3 +-
 .../apache/drill/test/PhysicalOpUnitTestBase.java  |   5 +-
 .../java/org/apache/drill/test/QueryBuilder.java   |   2 +-
 84 files changed, 741 insertions(+), 585 deletions(-)

diff --git a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBFormatPlugin.java b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBFormatPlugin.java
index 3011f4e..6501f8c 100644
--- a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBFormatPlugin.java
+++ b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBFormatPlugin.java
@@ -180,9 +180,9 @@ public class MapRDBFormatPlugin extends TableFormatPlugin {
    */
   @JsonIgnore
   public String getTableName(FileSelection selection) {
-    List<String> files = selection.getFiles();
+    List<Path> files = selection.getFiles();
     assert (files.size() == 1);
-    return files.get(0);
+    return files.get(0).toUri().getPath();
   }
 
 }
diff --git a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/streams/StreamsFormatPlugin.java b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/streams/StreamsFormatPlugin.java
index 92f134f..2ddf752 100644
--- a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/streams/StreamsFormatPlugin.java
+++ b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/streams/StreamsFormatPlugin.java
@@ -36,7 +36,7 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 
 public class StreamsFormatPlugin extends TableFormatPlugin {
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(StreamsFormatPlugin.class);
+  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(StreamsFormatPlugin.class);
   private StreamsFormatMatcher matcher;
 
   public StreamsFormatPlugin(String name, DrillbitContext context, Configuration fsConf,
@@ -72,9 +72,8 @@ public class StreamsFormatPlugin extends TableFormatPlugin {
   }
 
   @Override
-  public AbstractGroupScan getGroupScan(String userName, FileSelection selection,
-      List<SchemaPath> columns) throws IOException {
-    List<String> files = selection.getFiles();
+  public AbstractGroupScan getGroupScan(String userName, FileSelection selection, List<SchemaPath> columns) {
+    List<Path> files = selection.getFiles();
     assert (files.size() == 1);
     //TableProperties props = getMaprFS().getTableProperties(new Path(files.get(0)));
     throw UserException.unsupportedError().message("MapR streams can not be querried at this time.").build(logger);
@@ -86,13 +85,12 @@ public class StreamsFormatPlugin extends TableFormatPlugin {
   }
 
   @Override
-  public DrillStatsTable.TableStatistics readStatistics(FileSystem fs, Path statsTablePath) throws IOException {
+  public DrillStatsTable.TableStatistics readStatistics(FileSystem fs, Path statsTablePath) {
     throw new UnsupportedOperationException("unimplemented");
   }
 
   @Override
-  public void writeStatistics(DrillStatsTable.TableStatistics statistics, FileSystem fs, Path statsTablePath) throws IOException {
+  public void writeStatistics(DrillStatsTable.TableStatistics statistics, FileSystem fs, Path statsTablePath) {
     throw new UnsupportedOperationException("unimplemented");
   }
-
 }
diff --git a/contrib/format-syslog/src/main/java/org/apache/drill/exec/store/syslog/SyslogRecordReader.java b/contrib/format-syslog/src/main/java/org/apache/drill/exec/store/syslog/SyslogRecordReader.java
index a198e34..4b2831c 100644
--- a/contrib/format-syslog/src/main/java/org/apache/drill/exec/store/syslog/SyslogRecordReader.java
+++ b/contrib/format-syslog/src/main/java/org/apache/drill/exec/store/syslog/SyslogRecordReader.java
@@ -32,7 +32,6 @@ import org.apache.drill.exec.store.dfs.DrillFileSystem;
 import org.apache.drill.exec.store.dfs.easy.FileWork;
 import org.apache.drill.exec.vector.complex.impl.VectorContainerWriter;
 import org.apache.drill.exec.vector.complex.writer.BaseWriter;
-import org.apache.hadoop.fs.Path;
 import org.realityforge.jsyslog.message.StructuredDataParameter;
 import org.realityforge.jsyslog.message.SyslogMessage;
 
@@ -95,7 +94,7 @@ public class SyslogRecordReader extends AbstractRecordReader {
   private void openFile() {
     InputStream in;
     try {
-      in = fileSystem.open(new Path(fileWork.getPath()));
+      in = fileSystem.open(fileWork.getPath());
     } catch (Exception e) {
       throw UserException
               .dataReadError(e)
diff --git a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/planner/sql/HivePartitionDescriptor.java b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/planner/sql/HivePartitionDescriptor.java
index 25a0c08..a52b48d 100644
--- a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/planner/sql/HivePartitionDescriptor.java
+++ b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/planner/sql/HivePartitionDescriptor.java
@@ -36,6 +36,7 @@ import org.apache.drill.exec.store.hive.HiveUtilities;
 import org.apache.drill.exec.store.hive.HiveReadEntry;
 import org.apache.drill.exec.store.hive.HiveScan;
 import org.apache.drill.exec.vector.ValueVector;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.metastore.api.Partition;
 
 import java.util.BitSet;
@@ -87,9 +88,9 @@ public class HivePartitionDescriptor extends AbstractPartitionDescriptor {
   }
 
   @Override
-  public String getBaseTableLocation() {
+  public Path getBaseTableLocation() {
     HiveReadEntry origEntry = ((HiveScan) scanRel.getGroupScan()).getHiveReadEntry();
-    return origEntry.table.getTable().getSd().getLocation();
+    return new Path(origEntry.table.getTable().getSd().getLocation());
   }
 
   @Override
@@ -145,7 +146,7 @@ public class HivePartitionDescriptor extends AbstractPartitionDescriptor {
     List<PartitionLocation> locations = new LinkedList<>();
     HiveReadEntry origEntry = ((HiveScan) scanRel.getGroupScan()).getHiveReadEntry();
     for (Partition partition: origEntry.getPartitions()) {
-      locations.add(new HivePartitionLocation(partition.getValues(), partition.getSd().getLocation()));
+      locations.add(new HivePartitionLocation(partition.getValues(), new Path(partition.getSd().getLocation())));
     }
     locationSuperList = Lists.partition(locations, PartitionDescriptor.PARTITION_BATCH_SIZE);
     sublistsCreated = true;
@@ -170,7 +171,7 @@ public class HivePartitionDescriptor extends AbstractPartitionDescriptor {
     List<HiveTableWrapper.HivePartitionWrapper> newPartitions = Lists.newLinkedList();
 
     for (HiveTableWrapper.HivePartitionWrapper part: oldPartitions) {
-      String partitionLocation = part.getPartition().getSd().getLocation();
+      Path partitionLocation = new Path(part.getPartition().getSd().getLocation());
       for (PartitionLocation newPartitionLocation: newPartitionLocations) {
         if (partitionLocation.equals(newPartitionLocation.getEntirePartitionLocation())) {
           newPartitions.add(part);
diff --git a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/planner/sql/HivePartitionLocation.java b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/planner/sql/HivePartitionLocation.java
index bb0efe8..25821f5 100644
--- a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/planner/sql/HivePartitionLocation.java
+++ b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/planner/sql/HivePartitionLocation.java
@@ -19,17 +19,19 @@ package org.apache.drill.exec.planner.sql;
 
 import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableList;
 import org.apache.drill.exec.planner.SimplePartitionLocation;
+import org.apache.hadoop.fs.Path;
 
 import java.util.List;
 
 public class HivePartitionLocation extends SimplePartitionLocation {
-  private final String partitionLocation;
+  private final Path partitionLocation;
   private final List<String> partitionValues;
 
-  public HivePartitionLocation(final List<String> partitionValues, final String partitionLocation) {
+  public HivePartitionLocation(List<String> partitionValues, Path partitionLocation) {
     this.partitionValues = ImmutableList.copyOf(partitionValues);
     this.partitionLocation = partitionLocation;
   }
+
   @Override
   public String getPartitionValue(int index) {
     assert index < partitionValues.size();
@@ -37,7 +39,7 @@ public class HivePartitionLocation extends SimplePartitionLocation {
   }
 
   @Override
-  public String getEntirePartitionLocation() {
+  public Path getEntirePartitionLocation() {
     return partitionLocation;
   }
 }
diff --git a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveDrillNativeParquetRowGroupScan.java b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveDrillNativeParquetRowGroupScan.java
index bea06e0..78e107a 100644
--- a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveDrillNativeParquetRowGroupScan.java
+++ b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveDrillNativeParquetRowGroupScan.java
@@ -124,7 +124,7 @@ public class HiveDrillNativeParquetRowGroupScan extends AbstractParquetRowGroupS
 
   @Override
   public Configuration getFsConf(RowGroupReadEntry rowGroupReadEntry) throws IOException {
-    Path path = new Path(rowGroupReadEntry.getPath()).getParent();
+    Path path = rowGroupReadEntry.getPath().getParent();
     return new ProjectionPusher().pushProjectionsAndFilters(
         new JobConf(HiveUtilities.generateHiveConf(hiveStoragePlugin.getHiveConf(), confProperties)),
         path.getParent());
diff --git a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveDrillNativeParquetScan.java b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveDrillNativeParquetScan.java
index 617f6a5..79a07f1 100644
--- a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveDrillNativeParquetScan.java
+++ b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveDrillNativeParquetScan.java
@@ -114,7 +114,7 @@ public class HiveDrillNativeParquetScan extends AbstractParquetGroupScan {
       assert split instanceof FileSplit;
       FileSplit fileSplit = (FileSplit) split;
       Path finalPath = fileSplit.getPath();
-      String pathString = Path.getPathWithoutSchemeAndAuthority(finalPath).toString();
+      Path pathString = Path.getPathWithoutSchemeAndAuthority(finalPath);
       entries.add(new ReadEntryWithPath(pathString));
 
       // store partition values per path
@@ -205,7 +205,7 @@ public class HiveDrillNativeParquetScan extends AbstractParquetGroupScan {
   protected void initInternal() throws IOException {
     Map<FileStatus, FileSystem> fileStatusConfMap = new LinkedHashMap<>();
     for (ReadEntryWithPath entry : entries) {
-      Path path = new Path(entry.getPath());
+      Path path = entry.getPath();
       Configuration conf = new ProjectionPusher().pushProjectionsAndFilters(
           new JobConf(hiveStoragePlugin.getHiveConf()),
           path.getParent());
@@ -221,7 +221,7 @@ public class HiveDrillNativeParquetScan extends AbstractParquetGroupScan {
   }
 
   @Override
-  protected AbstractParquetGroupScan cloneWithFileSelection(Collection<String> filePaths) throws IOException {
+  protected AbstractParquetGroupScan cloneWithFileSelection(Collection<Path> filePaths) throws IOException {
     FileSelection newSelection = new FileSelection(null, new ArrayList<>(filePaths), null, null, false);
     return clone(newSelection);
   }
diff --git a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveDrillNativeParquetScanBatchCreator.java b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveDrillNativeParquetScanBatchCreator.java
index 339e1bd..be906fe 100644
--- a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveDrillNativeParquetScanBatchCreator.java
+++ b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveDrillNativeParquetScanBatchCreator.java
@@ -28,6 +28,7 @@ import org.apache.drill.exec.server.options.OptionManager;
 import org.apache.drill.exec.store.dfs.DrillFileSystem;
 import org.apache.drill.exec.store.parquet.AbstractParquetScanBatchCreator;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
 
 import java.io.IOException;
 import java.util.HashMap;
@@ -54,7 +55,7 @@ public class HiveDrillNativeParquetScanBatchCreator extends AbstractParquetScanB
    */
   private class HiveDrillNativeParquetDrillFileSystemManager extends AbstractDrillFileSystemManager {
 
-    private final Map<String, DrillFileSystem> fileSystems;
+    private final Map<Path, DrillFileSystem> fileSystems;
 
     HiveDrillNativeParquetDrillFileSystemManager(OperatorContext operatorContext) {
       super(operatorContext);
@@ -62,7 +63,7 @@ public class HiveDrillNativeParquetScanBatchCreator extends AbstractParquetScanB
     }
 
     @Override
-    protected DrillFileSystem get(Configuration config, String path) throws ExecutionSetupException {
+    protected DrillFileSystem get(Configuration config, Path path) throws ExecutionSetupException {
       DrillFileSystem fs = fileSystems.get(path);
       if (fs == null) {
         try {
diff --git a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HivePartitionHolder.java b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HivePartitionHolder.java
index 803144e..3e16acb 100644
--- a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HivePartitionHolder.java
+++ b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HivePartitionHolder.java
@@ -19,6 +19,7 @@ package org.apache.drill.exec.store.hive;
 
 import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.hadoop.fs.Path;
 
 import java.util.ArrayList;
 import java.util.Collections;
@@ -34,11 +35,11 @@ import java.util.Map;
  */
 public class HivePartitionHolder {
 
-  private final Map<String, Integer> keyToIndexMapper;
+  private final Map<Path, Integer> keyToIndexMapper;
   private final List<List<String>> partitionValues;
 
   @JsonCreator
-  public HivePartitionHolder(@JsonProperty("keyToIndexMapper") Map<String, Integer> keyToIndexMapper,
+  public HivePartitionHolder(@JsonProperty("keyToIndexMapper") Map<Path, Integer> keyToIndexMapper,
                              @JsonProperty("partitionValues") List<List<String>> partitionValues) {
     this.keyToIndexMapper = keyToIndexMapper;
     this.partitionValues = partitionValues;
@@ -50,7 +51,7 @@ public class HivePartitionHolder {
   }
 
   @JsonProperty
-  public Map<String, Integer> getKeyToIndexMapper() {
+  public Map<Path, Integer> getKeyToIndexMapper() {
     return keyToIndexMapper;
   }
 
@@ -67,7 +68,7 @@ public class HivePartitionHolder {
    * @param key mapper key
    * @param values partition values
    */
-  public void add(String key, List<String> values) {
+  public void add(Path key, List<String> values) {
     int index = partitionValues.indexOf(values);
     if (index == -1) {
       index = partitionValues.size();
@@ -84,7 +85,7 @@ public class HivePartitionHolder {
    * @param key mapper key
    * @return list of partition values
    */
-  public List<String> get(String key) {
+  public List<String> get(Path key) {
     Integer index = keyToIndexMapper.get(key);
     if (index == null) {
       return Collections.emptyList();
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/PhysicalPlan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/PhysicalPlan.java
index f73afb1..6eba658 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/PhysicalPlan.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/PhysicalPlan.java
@@ -45,7 +45,8 @@ public class PhysicalPlan {
   Graph<PhysicalOperator, Root, Leaf> graph;
 
   @JsonCreator
-  public PhysicalPlan(@JsonProperty("head") PlanProperties properties, @JsonProperty("graph") List<PhysicalOperator> operators){
+  public PhysicalPlan(@JsonProperty("head") PlanProperties properties,
+                      @JsonProperty("graph") List<PhysicalOperator> operators) {
     this.properties = properties;
     this.graph = Graph.newGraph(operators, Root.class, Leaf.class);
   }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractGroupScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractGroupScan.java
index 7e2623a..03b53a4 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractGroupScan.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractGroupScan.java
@@ -34,6 +34,7 @@ import org.apache.drill.exec.planner.physical.PlannerSettings;
 
 import com.fasterxml.jackson.annotation.JsonIgnore;
 import org.apache.drill.exec.server.options.OptionManager;
+import org.apache.hadoop.fs.Path;
 
 public abstract class AbstractGroupScan extends AbstractBase implements GroupScan {
 
@@ -171,7 +172,7 @@ public abstract class AbstractGroupScan extends AbstractBase implements GroupSca
   }
 
   @Override
-  public Collection<String> getFiles() {
+  public Collection<Path> getFiles() {
     return null;
   }
 
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/GroupScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/GroupScan.java
index e42ae2d..6dbad22 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/GroupScan.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/GroupScan.java
@@ -30,6 +30,7 @@ import org.apache.drill.exec.planner.physical.PlannerSettings;
 import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
 import org.apache.drill.exec.server.options.OptionManager;
 import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableList;
+import org.apache.hadoop.fs.Path;
 
 /**
  * A GroupScan operator represents all data which will be scanned by a given physical
@@ -142,8 +143,10 @@ public interface GroupScan extends Scan, HasAffinity{
   /**
    * Returns a collection of file names associated with this GroupScan. This should be called after checking
    * hasFiles().  If this GroupScan cannot provide file names, it returns null.
+   *
+   * @return collection of files paths
    */
-  Collection<String> getFiles();
+  Collection<Path> getFiles();
 
   @JsonIgnore
   LogicalExpression getFilter();
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/SchemalessScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/SchemalessScan.java
index 999c417..fb60ddd 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/SchemalessScan.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/SchemalessScan.java
@@ -23,6 +23,7 @@ import com.fasterxml.jackson.annotation.JsonTypeName;
 import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.exec.proto.CoordinationProtos;
 import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
+import org.apache.hadoop.fs.Path;
 
 import java.util.List;
 
@@ -32,17 +33,17 @@ import java.util.List;
 @JsonTypeName("schemaless-scan")
 public class SchemalessScan extends AbstractFileGroupScan implements SubScan {
 
-  private final String selectionRoot;
+  private final Path selectionRoot;
 
   @JsonCreator
   public SchemalessScan(@JsonProperty("userName") String userName,
-                        @JsonProperty("selectionRoot") String selectionRoot,
+                        @JsonProperty("selectionRoot") Path selectionRoot,
                         @JsonProperty("columns") List<SchemaPath> columns) {
     this(userName, selectionRoot);
   }
 
   public SchemalessScan(@JsonProperty("userName") String userName,
-                        @JsonProperty("selectionRoot") String selectionRoot) {
+                        @JsonProperty("selectionRoot") Path selectionRoot) {
     super(userName);
     this.selectionRoot = selectionRoot;
   }
@@ -53,7 +54,7 @@ public class SchemalessScan extends AbstractFileGroupScan implements SubScan {
   }
 
   @JsonProperty
-  public String getSelectionRoot() {
+  public Path getSelectionRoot() {
     return selectionRoot;
   }
 
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/file/BaseFileScanFramework.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/file/BaseFileScanFramework.java
index 5a8c526..8352dfa 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/file/BaseFileScanFramework.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/file/BaseFileScanFramework.java
@@ -110,7 +110,7 @@ public abstract class BaseFileScanFramework<T extends BaseFileScanFramework.File
 
     List<Path> paths = new ArrayList<>();
     for (FileWork work : files) {
-      Path path = dfs.makeQualified(new Path(work.getPath()));
+      Path path = dfs.makeQualified(work.getPath());
       paths.add(path);
       FileSplit split = new FileSplit(path, work.getStart(), work.getLength(), new String[]{""});
       spilts.add(split);
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/file/FileMetadata.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/file/FileMetadata.java
index ff449d4..2eb8af9 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/file/FileMetadata.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/file/FileMetadata.java
@@ -52,7 +52,7 @@ public class FileMetadata {
       return;
     }
 
-    dirPath = ColumnExplorer.parsePartitions(filePath.toString(), rootPath.toString());
+    dirPath = ColumnExplorer.parsePartitions(filePath, rootPath, false);
     if (dirPath == null) {
       throw new IllegalArgumentException(
           String.format("Selection root of \"%s\" is not a leading path of \"%s\"",
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/AbstractPartitionDescriptor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/AbstractPartitionDescriptor.java
index cf01256..d414827 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/AbstractPartitionDescriptor.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/AbstractPartitionDescriptor.java
@@ -22,6 +22,7 @@ import java.util.List;
 
 import org.apache.calcite.rel.core.TableScan;
 import org.apache.drill.exec.store.dfs.MetadataContext;
+import org.apache.hadoop.fs.Path;
 
 /**
  * Abstract base class for file system based partition descriptors and Hive partition descriptors.
@@ -65,7 +66,7 @@ public abstract class AbstractPartitionDescriptor implements PartitionDescriptor
 
 
   @Override
-  public TableScan createTableScan(List<PartitionLocation> newPartitions, String cacheFileRoot,
+  public TableScan createTableScan(List<PartitionLocation> newPartitions, Path cacheFileRoot,
       boolean isAllPruned, MetadataContext metaContext) throws Exception {
     throw new UnsupportedOperationException();
   }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/DFSDirPartitionLocation.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/DFSDirPartitionLocation.java
index 46afb30..440178b 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/DFSDirPartitionLocation.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/DFSDirPartitionLocation.java
@@ -21,7 +21,9 @@
 package org.apache.drill.exec.planner;
 
 
+import org.apache.drill.exec.util.DrillFileSystemUtil;
 import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
+import org.apache.hadoop.fs.Path;
 
 import java.util.Collection;
 import java.util.List;
@@ -46,7 +48,7 @@ public class DFSDirPartitionLocation implements PartitionLocation {
   }
 
   @Override
-  public String getEntirePartitionLocation() {
+  public Path getEntirePartitionLocation() {
     throw new UnsupportedOperationException("Should not call getEntirePartitionLocation for composite partition location!");
   }
 
@@ -67,15 +69,17 @@ public class DFSDirPartitionLocation implements PartitionLocation {
   }
 
   @Override
-  public String getCompositePartitionPath() {
-    String path = "";
-    for (int i=0; i < dirs.length; i++) {
-      if (dirs[i] == null) { // get the prefix
+  public Path getCompositePartitionPath() {
+    StringBuilder path = new StringBuilder();
+    for (String dir : dirs) {
+      if (dir == null) { // get the prefix
         break;
       }
-      path += "/" + dirs[i];
+      path.append("/")
+          .append(dir);
     }
-    return path;
+
+    return DrillFileSystemUtil.createPathSafe(path.toString());
   }
 
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/DFSFilePartitionLocation.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/DFSFilePartitionLocation.java
index ecfa622..0dfe316 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/DFSFilePartitionLocation.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/DFSFilePartitionLocation.java
@@ -17,6 +17,7 @@
  */
 package org.apache.drill.exec.planner;
 
+import org.apache.drill.exec.store.ColumnExplorer;
 import org.apache.hadoop.fs.Path;
 
 /**
@@ -24,28 +25,14 @@ import org.apache.hadoop.fs.Path;
  */
 public class DFSFilePartitionLocation extends SimplePartitionLocation {
   private final String[] dirs;
-  private final String file;
+  private final Path file;
 
-  public DFSFilePartitionLocation(int max, String selectionRoot, String file, boolean hasDirsOnly) {
+  public DFSFilePartitionLocation(int max, Path selectionRoot, Path file, boolean hasDirsOnly) {
     this.file = file;
     this.dirs = new String[max];
 
-    // strip the scheme and authority if they exist
-    selectionRoot = Path.getPathWithoutSchemeAndAuthority(new Path(selectionRoot)).toString();
-
-    int start = file.indexOf(selectionRoot) + selectionRoot.length();
-    String postPath = file.substring(start);
-    if (postPath.length() == 0) {
-      return;
-    }
-    if(postPath.charAt(0) == '/'){
-      postPath = postPath.substring(1);
-    }
-    String[] mostDirs = postPath.split("/");
-    int maxLoop = Math.min(max, hasDirsOnly ? mostDirs.length : mostDirs.length - 1);
-    for(int i =0; i < maxLoop; i++) {
-      this.dirs[i] = mostDirs[i];
-    }
+    String[] dirs = ColumnExplorer.parsePartitions(this.file, selectionRoot, hasDirsOnly);
+    System.arraycopy(dirs, 0, this.dirs, 0, Math.min(max, dirs.length));
   }
 
   /**
@@ -64,7 +51,7 @@ public class DFSFilePartitionLocation extends SimplePartitionLocation {
    * @return The partition location.
    */
   @Override
-  public String getEntirePartitionLocation() {
+  public Path getEntirePartitionLocation() {
     return file;
   }
 
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/FileSystemPartitionDescriptor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/FileSystemPartitionDescriptor.java
index 6526892..1880490 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/FileSystemPartitionDescriptor.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/FileSystemPartitionDescriptor.java
@@ -26,6 +26,8 @@ import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 
+import org.apache.calcite.plan.RelOptTable;
+import org.apache.commons.lang3.tuple.Pair;
 import org.apache.drill.common.util.GuavaUtils;
 import org.apache.drill.shaded.guava.com.google.common.base.Charsets;
 import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
@@ -37,7 +39,6 @@ import org.apache.calcite.adapter.enumerable.EnumerableTableScan;
 import org.apache.calcite.prepare.RelOptTableImpl;
 import org.apache.calcite.rel.core.TableScan;
 import org.apache.calcite.util.BitSets;
-import org.apache.calcite.util.Pair;
 import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.common.types.TypeProtos;
 import org.apache.drill.common.types.Types;
@@ -54,6 +55,7 @@ import org.apache.drill.exec.store.dfs.FormatSelection;
 import org.apache.drill.exec.store.dfs.MetadataContext;
 import org.apache.drill.exec.vector.NullableVarCharVector;
 import org.apache.drill.exec.vector.ValueVector;
+import org.apache.hadoop.fs.Path;
 
 
 // partition descriptor for file system based tables
@@ -145,18 +147,18 @@ public class FileSystemPartitionDescriptor extends AbstractPartitionDescriptor {
   }
 
   @Override
-  public String getBaseTableLocation() {
+  public Path getBaseTableLocation() {
     final FormatSelection origSelection = (FormatSelection) table.getSelection();
-    return origSelection.getSelection().selectionRoot;
+    return origSelection.getSelection().getSelectionRoot();
   }
 
   @Override
   protected void createPartitionSublists() {
-    final Pair<Collection<String>, Boolean> fileLocationsAndStatus = getFileLocationsAndStatus();
+    final Pair<Collection<Path>, Boolean> fileLocationsAndStatus = getFileLocationsAndStatus();
     List<PartitionLocation> locations = new LinkedList<>();
-    boolean hasDirsOnly = fileLocationsAndStatus.right;
+    boolean hasDirsOnly = fileLocationsAndStatus.getRight();
 
-    final String selectionRoot = getBaseTableLocation();
+    final Path selectionRoot = getBaseTableLocation();
 
     // map used to map the partition keys (dir0, dir1, ..), to the list of partitions that share the same partition keys.
     // For example,
@@ -166,35 +168,31 @@ public class FileSystemPartitionDescriptor extends AbstractPartitionDescriptor {
 
     // Figure out the list of leaf subdirectories. For each leaf subdirectory, find the list of files (DFSFilePartitionLocation)
     // it contains.
-    for (String file: fileLocationsAndStatus.left) {
-      DFSFilePartitionLocation dfsFilePartitionLocation = new DFSFilePartitionLocation(MAX_NESTED_SUBDIRS, selectionRoot, file, hasDirsOnly);
+    for (Path file: fileLocationsAndStatus.getLeft()) {
+      DFSFilePartitionLocation dfsFilePartitionLocation = new DFSFilePartitionLocation(MAX_NESTED_SUBDIRS,
+          selectionRoot, file, hasDirsOnly);
+      List<String> dirList = Arrays.asList(dfsFilePartitionLocation.getDirs());
 
-      final String[] dirs = dfsFilePartitionLocation.getDirs();
-      final List<String> dirList = Arrays.asList(dirs);
-
-      if (!dirToFileMap.containsKey(dirList)) {
-        dirToFileMap.put(dirList, new ArrayList<PartitionLocation>());
-      }
+      dirToFileMap.putIfAbsent(dirList, new ArrayList<>());
       dirToFileMap.get(dirList).add(dfsFilePartitionLocation);
     }
 
     // build a list of DFSDirPartitionLocation.
-    for (final List<String> dirs : dirToFileMap.keySet()) {
-      locations.add( new DFSDirPartitionLocation(dirs.toArray(new String[dirs.size()]), dirToFileMap.get(dirs)));
-    }
+    dirToFileMap.keySet().stream()
+        .map(dirs -> new DFSDirPartitionLocation(dirs.toArray(new String[dirs.size()]), dirToFileMap.get(dirs)))
+        .forEach(locations::add);
 
     locationSuperList = Lists.partition(locations, PartitionDescriptor.PARTITION_BATCH_SIZE);
     sublistsCreated = true;
   }
 
-  protected Pair<Collection<String>, Boolean> getFileLocationsAndStatus() {
-    Collection<String> fileLocations = null;
-    Pair<Collection<String>, Boolean> fileLocationsAndStatus = null;
+  protected Pair<Collection<Path>, Boolean> getFileLocationsAndStatus() {
+    Collection<Path> fileLocations = null;
     boolean isExpandedPartial = false;
     if (scanRel instanceof DrillScanRel) {
       // If a particular GroupScan provides files, get the list of files from there rather than
       // DrillTable because GroupScan would have the updated version of the selection
-      final DrillScanRel drillScan = (DrillScanRel) scanRel;
+      DrillScanRel drillScan = (DrillScanRel) scanRel;
       if (drillScan.getGroupScan().hasFiles()) {
         fileLocations = drillScan.getGroupScan().getFiles();
         isExpandedPartial = false;
@@ -208,67 +206,59 @@ public class FileSystemPartitionDescriptor extends AbstractPartitionDescriptor {
       fileLocations = selection.getFiles();
       isExpandedPartial = selection.isExpandedPartial();
     }
-    fileLocationsAndStatus = Pair.of(fileLocations, isExpandedPartial);
-    return fileLocationsAndStatus;
+    return Pair.of(fileLocations, isExpandedPartial);
   }
 
   @Override
-  public TableScan createTableScan(List<PartitionLocation> newPartitionLocation, String cacheFileRoot,
+  public TableScan createTableScan(List<PartitionLocation> newPartitionLocation, Path cacheFileRoot,
       boolean wasAllPartitionsPruned, MetadataContext metaContext) throws Exception {
-    List<String> newFiles = Lists.newArrayList();
-    for (final PartitionLocation location : newPartitionLocation) {
+    List<Path> newFiles = new ArrayList<>();
+    for (PartitionLocation location : newPartitionLocation) {
       if (!location.isCompositePartition()) {
         newFiles.add(location.getEntirePartitionLocation());
       } else {
         final Collection<SimplePartitionLocation> subPartitions = location.getPartitionLocationRecursive();
-        for (final PartitionLocation subPart : subPartitions) {
+        for (PartitionLocation subPart : subPartitions) {
           newFiles.add(subPart.getEntirePartitionLocation());
         }
       }
     }
 
+    FormatSelection formatSelection = (FormatSelection) table.getSelection();
+    FileSelection newFileSelection = new FileSelection(null, newFiles, getBaseTableLocation(),
+        cacheFileRoot, wasAllPartitionsPruned, formatSelection.getSelection().getDirStatus());
+    newFileSelection.setMetaContext(metaContext);
+    RelOptTable relOptTable = scanRel.getTable();
+
     if (scanRel instanceof DrillScanRel) {
-      final FormatSelection formatSelection = (FormatSelection)table.getSelection();
-      final FileSelection newFileSelection = new FileSelection(null, newFiles, getBaseTableLocation(),
-          cacheFileRoot, wasAllPartitionsPruned, formatSelection.getSelection().getDirStatus());
-      newFileSelection.setMetaContext(metaContext);
-      final FileGroupScan newGroupScan =
-          ((FileGroupScan)((DrillScanRel)scanRel).getGroupScan()).clone(newFileSelection);
+      FileGroupScan newGroupScan =
+          ((FileGroupScan) ((DrillScanRel) scanRel).getGroupScan()).clone(newFileSelection);
       return new DrillScanRel(scanRel.getCluster(),
                       scanRel.getTraitSet().plus(DrillRel.DRILL_LOGICAL),
-                      scanRel.getTable(),
+                      relOptTable,
                       newGroupScan,
                       scanRel.getRowType(),
                       ((DrillScanRel) scanRel).getColumns(),
                       true /*filter pushdown*/);
     } else if (scanRel instanceof EnumerableTableScan) {
-      return createNewTableScanFromSelection((EnumerableTableScan)scanRel, newFiles, cacheFileRoot,
-          wasAllPartitionsPruned, metaContext);
+      FormatSelection newFormatSelection = new FormatSelection(formatSelection.getFormat(), newFileSelection);
+
+      DynamicDrillTable dynamicDrillTable = new DynamicDrillTable(table.getPlugin(), table.getStorageEngineName(),
+          table.getUserName(), newFormatSelection);
+      /* Copy statistics from the original relOptTable */
+      dynamicDrillTable.setStatsTable(table.getStatsTable());
+      DrillTranslatableTable newTable = new DrillTranslatableTable(dynamicDrillTable);
+
+      RelOptTableImpl newOptTableImpl = RelOptTableImpl.create(relOptTable.getRelOptSchema(), relOptTable.getRowType(),
+          newTable, GuavaUtils.convertToUnshadedImmutableList(ImmutableList.of()));
+
+      // return an EnumerableTableScan with fileSelection being part of digest of TableScan node.
+      return DirPrunedEnumerableTableScan.create(scanRel.getCluster(), newOptTableImpl, newFileSelection.toString());
     } else {
       throw new UnsupportedOperationException("Only DrillScanRel and EnumerableTableScan is allowed!");
     }
   }
 
-  private TableScan createNewTableScanFromSelection(EnumerableTableScan oldScan, List<String> newFiles, String cacheFileRoot,
-      boolean wasAllPartitionsPruned, MetadataContext metaContext) {
-    final RelOptTableImpl t = (RelOptTableImpl) oldScan.getTable();
-    final FormatSelection formatSelection = (FormatSelection) table.getSelection();
-    final FileSelection newFileSelection = new FileSelection(null, newFiles, getBaseTableLocation(),
-            cacheFileRoot, wasAllPartitionsPruned, formatSelection.getSelection().getDirStatus());
-    newFileSelection.setMetaContext(metaContext);
-    final FormatSelection newFormatSelection = new FormatSelection(formatSelection.getFormat(), newFileSelection);
-    final DynamicDrillTable dynamicDrillTable = new DynamicDrillTable(table.getPlugin(), table.getStorageEngineName(),
-            table.getUserName(), newFormatSelection);
-    /* Copy statistics from the original table */
-    dynamicDrillTable.setStatsTable(table.getStatsTable());
-    final DrillTranslatableTable newTable = new DrillTranslatableTable(dynamicDrillTable);
-    final RelOptTableImpl newOptTableImpl = RelOptTableImpl.create(t.getRelOptSchema(), t.getRowType(), newTable,
-            GuavaUtils.convertToUnshadedImmutableList(ImmutableList.of()));
-
-    // return an EnumerableTableScan with fileSelection being part of digest of TableScan node.
-    return DirPrunedEnumerableTableScan.create(oldScan.getCluster(), newOptTableImpl, newFileSelection.toString());
-  }
-
   @Override
   public TableScan createTableScan(List<PartitionLocation> newPartitionLocation,
       boolean wasAllPartitionsPruned) throws Exception {
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/ParquetPartitionDescriptor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/ParquetPartitionDescriptor.java
index df6decc..23675d1 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/ParquetPartitionDescriptor.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/ParquetPartitionDescriptor.java
@@ -134,17 +134,17 @@ public class ParquetPartitionDescriptor extends AbstractPartitionDescriptor {
   }
 
   @Override
-  public String getBaseTableLocation() {
+  public Path getBaseTableLocation() {
     final FormatSelection origSelection = (FormatSelection) scanRel.getDrillTable().getSelection();
     return origSelection.getSelection().selectionRoot;
   }
 
   @Override
   public TableScan createTableScan(List<PartitionLocation> newPartitionLocation,
-                                   String cacheFileRoot,
+                                   Path cacheFileRoot,
                                    boolean wasAllPartitionsPruned,
                                    MetadataContext metaContext) throws Exception {
-    List<String> newFiles = new ArrayList<>();
+    List<Path> newFiles = new ArrayList<>();
     for (final PartitionLocation location : newPartitionLocation) {
       newFiles.add(location.getEntirePartitionLocation());
     }
@@ -172,17 +172,17 @@ public class ParquetPartitionDescriptor extends AbstractPartitionDescriptor {
 
   @Override
   protected void createPartitionSublists() {
-    Set<String> fileLocations = groupScan.getFileSet();
+    Set<Path> fileLocations = groupScan.getFileSet();
     List<PartitionLocation> locations = new LinkedList<>();
-    for (String file : fileLocations) {
+    for (Path file : fileLocations) {
       locations.add(new ParquetPartitionLocation(file));
     }
     locationSuperList = Lists.partition(locations, PartitionDescriptor.PARTITION_BATCH_SIZE);
     sublistsCreated = true;
   }
 
-  private GroupScan createNewGroupScan(List<String> newFiles,
-                                       String cacheFileRoot,
+  private GroupScan createNewGroupScan(List<Path> newFiles,
+                                       Path cacheFileRoot,
                                        boolean wasAllPartitionsPruned,
                                        MetadataContext metaContext) throws IOException {
 
@@ -194,8 +194,8 @@ public class ParquetPartitionDescriptor extends AbstractPartitionDescriptor {
     return groupScan.clone(newSelection);
   }
 
-  private void populatePruningVector(ValueVector v, int index, SchemaPath column, String file) {
-    String path = Path.getPathWithoutSchemeAndAuthority(new Path(file)).toString();
+  private void populatePruningVector(ValueVector v, int index, SchemaPath column, Path file) {
+    Path path = Path.getPathWithoutSchemeAndAuthority(file);
     TypeProtos.MajorType majorType = getVectorType(column, null);
     TypeProtos.MinorType type = majorType.getMinorType();
     switch (type) {
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/ParquetPartitionLocation.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/ParquetPartitionLocation.java
index 49e5109..eec0d5e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/ParquetPartitionLocation.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/ParquetPartitionLocation.java
@@ -17,6 +17,8 @@
  */
 package org.apache.drill.exec.planner;
 
+import org.apache.hadoop.fs.Path;
+
 /*
  * PartitionLocation for the parquet auto partitioning scheme. We just store
  * the location of each partition within this class. Since the partition value
@@ -25,9 +27,9 @@ package org.apache.drill.exec.planner;
  * invoked.
  */
 public class ParquetPartitionLocation extends SimplePartitionLocation {
-  private final String file;
+  private final Path file;
 
-  public ParquetPartitionLocation(String file) {
+  public ParquetPartitionLocation(Path file) {
     this.file = file;
   }
 
@@ -48,7 +50,7 @@ public class ParquetPartitionLocation extends SimplePartitionLocation {
    * @return String location of the partition
    */
   @Override
-  public String getEntirePartitionLocation() {
+  public Path getEntirePartitionLocation() {
     return file;
   }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/PartitionDescriptor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/PartitionDescriptor.java
index 220bf29..8759078 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/PartitionDescriptor.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/PartitionDescriptor.java
@@ -23,36 +23,52 @@ import org.apache.drill.common.types.TypeProtos;
 import org.apache.drill.exec.planner.physical.PlannerSettings;
 import org.apache.drill.exec.store.dfs.MetadataContext;
 import org.apache.drill.exec.vector.ValueVector;
+import org.apache.hadoop.fs.Path;
 
 import java.util.BitSet;
 import java.util.List;
 import java.util.Map;
 
-// Interface used to describe partitions. Currently used by file system based partitions and hive partitions
+/**
+ * Interface used to describe partitions. Currently used by file system based partitions and hive partitions
+ */
 public interface PartitionDescriptor extends Iterable<List<PartitionLocation>> {
 
-  public static final int PARTITION_BATCH_SIZE = Character.MAX_VALUE;
+  int PARTITION_BATCH_SIZE = Character.MAX_VALUE;
 
-  /* Get the hierarchy index of the given partition
+  /**
+   * Get the hierarchy index of the given partition
    * For eg: if we have the partition laid out as follows
    * 1997/q1/jan
-   *
    * then getPartitionHierarchyIndex("jan") => 2
+   *
+   * @param partitionName Partition name
+   * @return the index of specified partition name in the hierarchy
    */
-  public int getPartitionHierarchyIndex(String partitionName);
+  int getPartitionHierarchyIndex(String partitionName);
 
-  // Given a column name return boolean to indicate if its a partition column or not
-  public boolean isPartitionName(String name);
+  /**
+   * Given a column name return boolean to indicate if its a partition column or not
+   *
+   * @param name of Partition
+   * @return true, if this is the partition name and vise versa.
+   */
+  boolean isPartitionName(String name);
 
   /**
    * Check to see if the name is a partition name.
+   *
    * @param name The field name you want to compare to partition names.
    * @return Return index if valid, otherwise return null;
    */
-  public Integer getIdIfValid(String name);
+  Integer getIdIfValid(String name);
 
-  // Maximum level of partition nesting/ hierarchy supported
-  public int getMaxHierarchyLevel();
+  /**
+   * Maximum level of partition nesting/ hierarchy supported
+   *
+   * @return maximum supported level number of partition hierarchy
+   */
+  int getMaxHierarchyLevel();
 
   /**
    * Method creates an in memory representation of all the partitions. For each level of partitioning we
@@ -79,7 +95,7 @@ public interface PartitionDescriptor extends Iterable<List<PartitionLocation>> {
    * @param wasAllPartitionsPruned
    * @throws Exception
    */
-  public TableScan createTableScan(List<PartitionLocation> newPartitions,
+  TableScan createTableScan(List<PartitionLocation> newPartitions,
       boolean wasAllPartitionsPruned) throws Exception;
 
   /**
@@ -91,11 +107,11 @@ public interface PartitionDescriptor extends Iterable<List<PartitionLocation>> {
    * @param metaContext
    * @throws Exception
    */
-  public TableScan createTableScan(List<PartitionLocation> newPartitions, String cacheFileRoot,
+  TableScan createTableScan(List<PartitionLocation> newPartitions, Path cacheFileRoot,
       boolean wasAllPartitionsPruned, MetadataContext metaContext) throws Exception;
 
-  public boolean supportsMetadataCachePruning();
+  boolean supportsMetadataCachePruning();
 
-  public String getBaseTableLocation();
+  Path getBaseTableLocation();
 
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/PartitionLocation.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/PartitionLocation.java
index 22088f7..2ef4e17 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/PartitionLocation.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/PartitionLocation.java
@@ -17,6 +17,8 @@
  */
 package org.apache.drill.exec.planner;
 
+import org.apache.hadoop.fs.Path;
+
 import java.util.List;
 
 /**
@@ -37,27 +39,27 @@ public interface PartitionLocation {
   /**
    * Returns the value of the 'index' partition column
    */
-  public String getPartitionValue(int index);
+  String getPartitionValue(int index);
 
   /**
-   * Returns the string representation of this partition.
+   * Returns the path of this partition.
    * Only a non-composite partition supports this.
    */
-  public String getEntirePartitionLocation();
+  Path getEntirePartitionLocation();
 
   /**
    * Returns the list of the non-composite partitions that this partition consists of.
    */
-  public List<SimplePartitionLocation> getPartitionLocationRecursive();
+  List<SimplePartitionLocation> getPartitionLocationRecursive();
 
   /**
    * Returns if this is a simple or composite partition.
    */
-  public boolean isCompositePartition();
+  boolean isCompositePartition();
 
   /**
    * Returns the path string of directory names only for composite partition
    */
-  public String getCompositePartitionPath();
+  Path getCompositePartitionPath();
 
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/PhysicalPlanReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/PhysicalPlanReader.java
index 20b06c4..920f9b2 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/PhysicalPlanReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/PhysicalPlanReader.java
@@ -33,6 +33,7 @@ import org.apache.drill.exec.physical.base.PhysicalOperator;
 import org.apache.drill.exec.physical.base.PhysicalOperatorUtil;
 import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
 import org.apache.drill.exec.record.MajorTypeSerDe;
+import org.apache.drill.exec.serialization.PathSerDe;
 import org.apache.drill.exec.server.options.OptionList;
 import org.apache.drill.exec.store.StoragePluginRegistry;
 import org.apache.drill.exec.store.pojo.DynamicPojoRecordReader;
@@ -44,43 +45,43 @@ import com.fasterxml.jackson.databind.ObjectReader;
 import com.fasterxml.jackson.databind.deser.std.StdDelegatingDeserializer;
 import com.fasterxml.jackson.databind.module.SimpleModule;
 import org.apache.drill.shaded.guava.com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.fs.Path;
 
 public class PhysicalPlanReader {
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(PhysicalPlanReader.class);
+  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(PhysicalPlanReader.class);
 
   private final ObjectReader physicalPlanReader;
   private final ObjectMapper mapper;
   private final ObjectReader operatorReader;
   private final ObjectReader logicalPlanReader;
 
-  public PhysicalPlanReader(DrillConfig config, ScanResult scanResult, LogicalPlanPersistence lpPersistance, final DrillbitEndpoint endpoint,
-                            final StoragePluginRegistry pluginRegistry) {
+  public PhysicalPlanReader(DrillConfig config, ScanResult scanResult, LogicalPlanPersistence lpPersistance,
+                            final DrillbitEndpoint endpoint, final StoragePluginRegistry pluginRegistry) {
 
     ObjectMapper lpMapper = lpPersistance.getMapper();
 
     // Endpoint serializer/deserializer.
-    SimpleModule deserModule = new SimpleModule("PhysicalOperatorModule") //
-        .addSerializer(DrillbitEndpoint.class, new DrillbitEndpointSerDe.Se()) //
-        .addDeserializer(DrillbitEndpoint.class, new DrillbitEndpointSerDe.De()) //
+    SimpleModule serDeModule = new SimpleModule("PhysicalOperatorModule")
+        .addSerializer(DrillbitEndpoint.class, new DrillbitEndpointSerDe.Se())
+        .addDeserializer(DrillbitEndpoint.class, new DrillbitEndpointSerDe.De())
         .addSerializer(MajorType.class, new MajorTypeSerDe.Se())
         .addDeserializer(MajorType.class, new MajorTypeSerDe.De())
         .addDeserializer(DynamicPojoRecordReader.class,
-            new StdDelegatingDeserializer<>(new DynamicPojoRecordReader.Converter(lpMapper)));
+            new StdDelegatingDeserializer<>(new DynamicPojoRecordReader.Converter(lpMapper)))
+        .addSerializer(Path.class, new PathSerDe.Se());
 
-    lpMapper.registerModule(deserModule);
+    lpMapper.registerModule(serDeModule);
     Set<Class<? extends PhysicalOperator>> subTypes = PhysicalOperatorUtil.getSubTypes(scanResult);
-    for (Class<? extends PhysicalOperator> subType : subTypes) {
-      lpMapper.registerSubtypes(subType);
-    }
+    subTypes.forEach(lpMapper::registerSubtypes);
     lpMapper.registerSubtypes(DynamicPojoRecordReader.class);
-    InjectableValues injectables = new InjectableValues.Std() //
-            .addValue(StoragePluginRegistry.class, pluginRegistry) //
-        .addValue(DrillbitEndpoint.class, endpoint); //
+    InjectableValues injectables = new InjectableValues.Std()
+        .addValue(StoragePluginRegistry.class, pluginRegistry)
+        .addValue(DrillbitEndpoint.class, endpoint);
 
     this.mapper = lpMapper;
-    this.physicalPlanReader = mapper.reader(PhysicalPlan.class).with(injectables);
-    this.operatorReader = mapper.reader(PhysicalOperator.class).with(injectables);
-    this.logicalPlanReader = mapper.reader(LogicalPlan.class).with(injectables);
+    this.physicalPlanReader = mapper.readerFor(PhysicalPlan.class).with(injectables);
+    this.operatorReader = mapper.readerFor(PhysicalOperator.class).with(injectables);
+    this.logicalPlanReader = mapper.readerFor(LogicalPlan.class).with(injectables);
   }
 
   public String writeJson(OptionList list) throws JsonProcessingException{
@@ -91,33 +92,35 @@ public class PhysicalPlanReader {
     return mapper.writeValueAsString(op);
   }
 
-  public PhysicalPlan readPhysicalPlan(String json) throws JsonProcessingException, IOException {
+  public PhysicalPlan readPhysicalPlan(String json) throws IOException {
     logger.debug("Reading physical plan {}", json);
     return physicalPlanReader.readValue(json);
   }
 
-  public FragmentRoot readFragmentRoot(String json) throws JsonProcessingException, IOException {
+  public FragmentRoot readFragmentRoot(String json) throws IOException {
     logger.debug("Attempting to read {}", json);
     PhysicalOperator op = operatorReader.readValue(json);
-    if(op instanceof FragmentRoot){
+    if (op instanceof FragmentRoot) {
       return (FragmentRoot) op;
-    }else{
-      throw new UnsupportedOperationException(String.format("The provided json fragment doesn't have a FragmentRoot as its root operator.  The operator was %s.", op.getClass().getCanonicalName()));
+    } else {
+      throw new UnsupportedOperationException(String.format("The provided json fragment doesn't have a FragmentRoot " +
+          "as its root operator.  The operator was %s.", op.getClass().getCanonicalName()));
     }
   }
 
   @VisibleForTesting
-  public FragmentLeaf readFragmentLeaf(String json) throws JsonProcessingException, IOException {
+  public FragmentLeaf readFragmentLeaf(String json) throws IOException {
     logger.debug("Attempting to read {}", json);
     PhysicalOperator op = operatorReader.readValue(json);
     if (op instanceof FragmentLeaf){
       return (FragmentLeaf) op;
     } else {
-      throw new UnsupportedOperationException(String.format("The provided json fragment is not a FragmentLeaf. The operator was %s.", op.getClass().getCanonicalName()));
+      throw new UnsupportedOperationException(String.format("The provided json fragment is not a FragmentLeaf. " +
+          "The operator was %s.", op.getClass().getCanonicalName()));
     }
   }
 
-  public LogicalPlan readLogicalPlan(String json) throws JsonProcessingException, IOException{
+  public LogicalPlan readLogicalPlan(String json) throws IOException{
     logger.debug("Reading logical plan {}", json);
     return logicalPlanReader.readValue(json);
   }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/SimplePartitionLocation.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/SimplePartitionLocation.java
index 7c9afb0..ca00446 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/SimplePartitionLocation.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/SimplePartitionLocation.java
@@ -18,6 +18,7 @@
 package org.apache.drill.exec.planner;
 
 import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableList;
+import org.apache.hadoop.fs.Path;
 
 import java.util.List;
 
@@ -26,14 +27,14 @@ import java.util.List;
  * location of the entire partition and also stores the
  * value of the individual partition keys for this partition.
  */
-public abstract  class SimplePartitionLocation implements PartitionLocation{
+public abstract class SimplePartitionLocation implements PartitionLocation{
   @Override
   public boolean isCompositePartition() {
     return false;
   }
 
   @Override
-  public String getCompositePartitionPath() {
+  public Path getCompositePartitionPath() {
     throw new UnsupportedOperationException();
   }
 
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/partition/PruneScanRule.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/partition/PruneScanRule.java
index 6f48b85..8d3a8ce 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/partition/PruneScanRule.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/partition/PruneScanRule.java
@@ -23,6 +23,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
 
+import org.apache.drill.exec.util.DrillFileSystemUtil;
 import org.apache.drill.shaded.guava.com.google.common.base.Stopwatch;
 import org.apache.calcite.adapter.enumerable.EnumerableTableScan;
 import org.apache.calcite.rel.core.Filter;
@@ -74,7 +75,7 @@ import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
 import org.apache.drill.shaded.guava.com.google.common.collect.Maps;
 
 import org.apache.drill.exec.vector.ValueVector;
-
+import org.apache.hadoop.fs.Path;
 
 public abstract class PruneScanRule extends StoragePluginOptimizerRule {
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(PruneScanRule.class);
@@ -372,7 +373,7 @@ public abstract class PruneScanRule extends StoragePluginOptimizerRule {
       // handle the case all partitions are filtered out.
       boolean canDropFilter = true;
       boolean wasAllPartitionsPruned = false;
-      String cacheFileRoot = null;
+      Path cacheFileRoot = null;
 
       if (newPartitions.isEmpty()) {
         assert firstLocation != null;
@@ -388,7 +389,7 @@ public abstract class PruneScanRule extends StoragePluginOptimizerRule {
 
         // set the cacheFileRoot appropriately
         if (firstLocation.isCompositePartition()) {
-          cacheFileRoot = descriptor.getBaseTableLocation() + firstLocation.getCompositePartitionPath();
+          cacheFileRoot = Path.mergePaths(descriptor.getBaseTableLocation(), firstLocation.getCompositePartitionPath());
         }
       }
 
@@ -408,7 +409,7 @@ public abstract class PruneScanRule extends StoragePluginOptimizerRule {
         // if metadata cache file could potentially be used, then assign a proper cacheFileRoot
         int index = -1;
         if (!matchBitSet.isEmpty()) {
-          String path = "";
+          StringBuilder path = new StringBuilder();
           index = matchBitSet.length() - 1;
 
           for (int j = 0; j < matchBitSet.length(); j++) {
@@ -419,10 +420,12 @@ public abstract class PruneScanRule extends StoragePluginOptimizerRule {
               break;
             }
           }
-          for (int j=0; j <= index; j++) {
-            path += "/" + spInfo[j];
+          for (int j = 0; j <= index; j++) {
+            path.append("/")
+                .append(spInfo[j]);
           }
-          cacheFileRoot = descriptor.getBaseTableLocation() + path;
+          cacheFileRoot = Path.mergePaths(descriptor.getBaseTableLocation(),
+              DrillFileSystemUtil.createPathSafe(path.toString()));
         }
         if (index != maxIndex) {
           // if multiple partitions are being selected, we should not drop the filter
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/AnalyzeTableHandler.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/AnalyzeTableHandler.java
index f1acc71..8f62713 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/AnalyzeTableHandler.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/AnalyzeTableHandler.java
@@ -133,15 +133,13 @@ public class AnalyzeTableHandler extends DefaultSqlHandler {
       DrillFileSystem fs = new DrillFileSystem(plugin.getFormatPlugin(
           formatSelection.getFormat()).getFsConf());
 
-      String selectionRoot = formatSelection.getSelection().getSelectionRoot();
-      if (!selectionRoot.contains(tableName)
-          || !fs.getFileStatus(new Path(selectionRoot)).isDirectory()) {
+      Path selectionRoot = formatSelection.getSelection().getSelectionRoot();
+      if (!selectionRoot.getName().equals(tableName) || !fs.getFileStatus(selectionRoot).isDirectory()) {
         return DrillStatsTable.notSupported(context, tableName);
       }
       // Do not recompute statistics, if stale
-      Path statsFilePath = new Path(new Path(selectionRoot), DotDrillType.STATS.getEnding());
-      if (fs.exists(statsFilePath)
-          && !isStatsStale(fs, statsFilePath)) {
+      Path statsFilePath = new Path(selectionRoot, DotDrillType.STATS.getEnding());
+      if (fs.exists(statsFilePath) && !isStatsStale(fs, statsFilePath)) {
        return DrillStatsTable.notRequired(context, tableName);
       }
     }
@@ -165,13 +163,8 @@ public class AnalyzeTableHandler extends DefaultSqlHandler {
     Path parentPath = statsFilePath.getParent();
     FileStatus directoryStatus = fs.getFileStatus(parentPath);
     // Parent directory modified after stats collection?
-    if (directoryStatus.getModificationTime() > statsFileModifyTime) {
-      return true;
-    }
-    if (tableModified(fs, parentPath, statsFileModifyTime)) {
-      return true;
-    }
-    return false;
+    return directoryStatus.getModificationTime() > statsFileModifyTime ||
+        tableModified(fs, parentPath, statsFileModifyTime);
   }
 
   /* Determines if the table was modified after computing statistics based on
@@ -185,10 +178,8 @@ public class AnalyzeTableHandler extends DefaultSqlHandler {
         return true;
       }
       // For a directory, we should recursively check sub-directories
-      if (file.isDirectory()) {
-        if (tableModified(fs, file.getPath(), statsModificationTime)) {
-          return true;
-        }
+      if (file.isDirectory() && tableModified(fs, file.getPath(), statsModificationTime)) {
+        return true;
       }
     }
     return false;
@@ -215,8 +206,7 @@ public class AnalyzeTableHandler extends DefaultSqlHandler {
 
   /* Converts to Drill logical plan */
   protected DrillRel convertToDrel(RelNode relNode, AbstractSchema schema, String analyzeTableName,
-      double samplePercent)
-      throws RelConversionException, SqlUnsupportedException {
+      double samplePercent) throws SqlUnsupportedException {
     DrillRel convertedRelNode = convertToRawDrel(relNode);
 
     if (convertedRelNode instanceof DrillStoreRel) {
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/RefreshMetadataHandler.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/RefreshMetadataHandler.java
index 4684251..3ca7787 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/RefreshMetadataHandler.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/RefreshMetadataHandler.java
@@ -108,8 +108,8 @@ public class RefreshMetadataHandler extends DefaultSqlHandler {
       FileSystemPlugin plugin = (FileSystemPlugin) drillTable.getPlugin();
       DrillFileSystem fs = new DrillFileSystem(plugin.getFormatPlugin(formatSelection.getFormat()).getFsConf());
 
-      String selectionRoot = formatSelection.getSelection().selectionRoot;
-      if (!fs.getFileStatus(new Path(selectionRoot)).isDirectory()) {
+      Path selectionRoot = formatSelection.getSelection().getSelectionRoot();
+      if (!fs.getFileStatus(selectionRoot).isDirectory()) {
         return notSupported(tableName);
       }
 
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/ReadEntryWithPath.java b/exec/java-exec/src/main/java/org/apache/drill/exec/serialization/PathSerDe.java
similarity index 55%
copy from exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/ReadEntryWithPath.java
copy to exec/java-exec/src/main/java/org/apache/drill/exec/serialization/PathSerDe.java
index 4564e15..03bb37e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/ReadEntryWithPath.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/serialization/PathSerDe.java
@@ -15,28 +15,25 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.drill.exec.store.dfs;
+package org.apache.drill.exec.serialization;
 
+import com.fasterxml.jackson.core.JsonGenerator;
+import com.fasterxml.jackson.databind.JsonSerializer;
+import com.fasterxml.jackson.databind.SerializerProvider;
+import org.apache.hadoop.fs.Path;
 
-public class ReadEntryWithPath {
+import java.io.IOException;
 
-  protected String path;
-
-
-  public ReadEntryWithPath(String path) {
-    super();
-    this.path = path;
-  }
-
-  public ReadEntryWithPath(){}
+/**
+ * Path serializer to simple String path. Without it the hadoop Path serialization creates a big JSON object.
+ */
+public class PathSerDe {
 
-  public String getPath(){
-   return path;
-  }
+  public static class Se extends JsonSerializer<Path> {
 
-  @Override
-  public String toString() {
-    return "ReadEntryWithPath [path=" + path + "]";
+    @Override
+    public void serialize(Path value, JsonGenerator gen, SerializerProvider serializers) throws IOException {
+      gen.writeString(value.toUri().getPath());
+    }
   }
-
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/ColumnExplorer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/ColumnExplorer.java
index d5d9784..33b5000 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/ColumnExplorer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/ColumnExplorer.java
@@ -17,6 +17,7 @@
  */
 package org.apache.drill.exec.store;
 
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.LinkedHashMap;
@@ -133,20 +134,20 @@ public class ColumnExplorer {
   public static List<String> getPartitionColumnNames(FileSelection selection, SchemaConfig schemaConfig) {
     int partitionsCount = 0;
     // a depth of table root path
-    int rootDepth = new Path(selection.getSelectionRoot()).depth();
+    int rootDepth = selection.getSelectionRoot().depth();
 
-    for (String file : selection.getFiles()) {
+    for (Path file : selection.getFiles()) {
       // Calculates partitions count for the concrete file:
       // depth of file path - depth of table root path - 1.
       // The depth of file path includes file itself,
       // so we should subtract 1 to consider only directories.
-      int currentPartitionsCount = new Path(file).depth() - rootDepth - 1;
+      int currentPartitionsCount = file.depth() - rootDepth - 1;
       // max depth of files path should be used to handle all partitions
       partitionsCount = Math.max(partitionsCount, currentPartitionsCount);
     }
 
     String partitionColumnLabel = schemaConfig.getOption(ExecConstants.FILESYSTEM_PARTITION_COLUMN_LABEL).string_val;
-    List<String> partitions = Lists.newArrayList();
+    List<String> partitions = new ArrayList<>();
 
     // generates partition column names: dir0, dir1 etc.
     for (int i = 0; i < partitionsCount; i++) {
@@ -165,7 +166,7 @@ public class ColumnExplorer {
    * @param includeFileImplicitColumns if file implicit columns should be included into the result
    * @return implicit columns map
    */
-  public Map<String, String> populateImplicitColumns(String filePath,
+  public Map<String, String> populateImplicitColumns(Path filePath,
                                                      List<String> partitionValues,
                                                      boolean includeFileImplicitColumns) {
     Map<String, String> implicitValues = new LinkedHashMap<>();
@@ -177,7 +178,7 @@ public class ColumnExplorer {
     }
 
     if (includeFileImplicitColumns) {
-      Path path = Path.getPathWithoutSchemeAndAuthority(new Path(filePath));
+      Path path = Path.getPathWithoutSchemeAndAuthority(filePath);
       for (Map.Entry<String, ImplicitFileColumns> entry : selectedImplicitColumns.entrySet()) {
         implicitValues.put(entry.getKey(), entry.getValue().getValue(path));
       }
@@ -189,15 +190,16 @@ public class ColumnExplorer {
   /**
    * Compares root and file path to determine directories
    * that are present in the file path but absent in root.
-   * Example: root - a/b/c, filePath - a/b/c/d/e/0_0_0.parquet, result - d/e.
+   * Example: root - a/b/c, file - a/b/c/d/e/0_0_0.parquet, result - d/e.
    * Stores different directory names in the list in successive order.
    *
-   * @param filePath file path
+   * @param file file path
    * @param root root directory
+   * @param hasDirsOnly whether it is file or directory
    * @return list of directory names
    */
-  public static List<String> listPartitionValues(String filePath, String root) {
-    String[] dirs = parsePartitions(filePath, root);
+  public static List<String> listPartitionValues(Path file, Path root, boolean hasDirsOnly) {
+    String[] dirs = parsePartitions(file, root, hasDirsOnly);
     if (dirs == null) {
       return Collections.emptyList();
     }
@@ -208,21 +210,23 @@ public class ColumnExplorer {
    * Low-level parse of partitions, returned as a string array. Returns a
    * null array for invalid values.
    *
-   * @param filePath file path
+   * @param file file path
    * @param root root directory
+   * @param hasDirsOnly whether it is file or directory
    * @return array of directory names, or null if the arguments are invalid
    */
-  public static String[] parsePartitions(String filePath, String root) {
-    if (filePath == null || root == null) {
+  public static String[] parsePartitions(Path file, Path root, boolean hasDirsOnly) {
+    if (file == null || root == null) {
       return null;
     }
 
-    int rootDepth = new Path(root).depth();
-    Path path = new Path(filePath);
-    int parentDepth = path.getParent().depth();
-
-    int diffCount = parentDepth - rootDepth;
+    if (!hasDirsOnly) {
+      file = file.getParent();
+    }
 
+    int rootDepth = root.depth();
+    int fileDepth = file.depth();
+    int diffCount = fileDepth - rootDepth;
     if (diffCount < 0) {
       return null;
     }
@@ -230,10 +234,10 @@ public class ColumnExplorer {
     String[] diffDirectoryNames = new String[diffCount];
 
     // start filling in array from the end
-    for (int i = rootDepth; parentDepth > i; i++) {
-      path = path.getParent();
+    for (int i = rootDepth; fileDepth > i; i++) {
       // place in the end of array
-      diffDirectoryNames[parentDepth - i - 1] = path.getName();
+      diffDirectoryNames[fileDepth - i - 1] = file.getName();
+      file = file.getParent();
     }
 
     return diffDirectoryNames;
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/AvroDrillTable.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/AvroDrillTable.java
index 50d7ee8..10d90d4 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/AvroDrillTable.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/AvroDrillTable.java
@@ -54,8 +54,8 @@ public class AvroDrillTable extends DrillTable {
                        SchemaConfig schemaConfig,
                        FormatSelection selection) {
     super(storageEngineName, plugin, schemaConfig.getUserName(), selection);
-    List<String> asFiles = selection.getAsFiles();
-    Path path = new Path(asFiles.get(0));
+    List<Path> asFiles = selection.getAsFiles();
+    Path path = asFiles.get(0);
     this.schemaConfig = schemaConfig;
     try {
       reader = new DataFileReader<>(new FsInput(path, plugin.getFsConf()), new GenericDatumReader<GenericContainer>());
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/AvroRecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/AvroRecordReader.java
index 1d7226a..07444e9 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/AvroRecordReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/AvroRecordReader.java
@@ -90,13 +90,13 @@ public class AvroRecordReader extends AbstractRecordReader {
 
 
   public AvroRecordReader(final FragmentContext fragmentContext,
-                          final String inputPath,
+                          final Path inputPath,
                           final long start,
                           final long length,
                           final FileSystem fileSystem,
                           final List<SchemaPath> projectedColumns,
                           final String userName) {
-    hadoop = new Path(inputPath);
+    hadoop = inputPath;
     this.start = start;
     this.end = start + length;
     buffer = fragmentContext.getManagedBuffer();
@@ -111,12 +111,8 @@ public class AvroRecordReader extends AbstractRecordReader {
   private DataFileReader<GenericContainer> getReader(final Path hadoop, final FileSystem fs) throws ExecutionSetupException {
     try {
       final UserGroupInformation ugi = ImpersonationUtil.createProxyUgi(this.opUserName, this.queryUserName);
-      return ugi.doAs(new PrivilegedExceptionAction<DataFileReader<GenericContainer>>() {
-        @Override
-        public DataFileReader<GenericContainer> run() throws Exception {
-          return new DataFileReader<>(new FsInput(hadoop, fs.getConf()), new GenericDatumReader<GenericContainer>());
-        }
-      });
+      return ugi.doAs((PrivilegedExceptionAction<DataFileReader<GenericContainer>>) () ->
+              new DataFileReader<>(new FsInput(hadoop, fs.getConf()), new GenericDatumReader<GenericContainer>()));
     } catch (IOException | InterruptedException e) {
       throw new ExecutionSetupException(
         String.format("Error in creating avro reader for file: %s", hadoop), e);
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSelection.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSelection.java
index 2fa9558..902abda 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSelection.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSelection.java
@@ -19,12 +19,15 @@ package org.apache.drill.exec.store.dfs;
 
 import java.io.IOException;
 import java.net.URI;
+import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.List;
+import java.util.Objects;
 import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
 
 import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
 import org.apache.drill.shaded.guava.com.google.common.base.Stopwatch;
-import org.apache.drill.shaded.guava.com.google.common.base.Strings;
 import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
 
 import org.apache.drill.common.exceptions.DrillRuntimeException;
@@ -41,15 +44,15 @@ public class FileSelection {
 
   private List<FileStatus> statuses;
 
-  public List<String> files;
+  public List<Path> files;
   /**
    * root path for the selections
    */
-  public final String selectionRoot;
+  public final Path selectionRoot;
   /**
    * root path for the metadata cache file (if any)
    */
-  public final String cacheFileRoot;
+  public final Path cacheFileRoot;
 
   /**
    * metadata context useful for metadata operations (if any)
@@ -82,17 +85,17 @@ public class FileSelection {
    * @param files  list of files
    * @param selectionRoot  root path for selections
    */
-  public FileSelection(final List<FileStatus> statuses, final List<String> files, final String selectionRoot) {
+  public FileSelection(List<FileStatus> statuses, List<Path> files, Path selectionRoot) {
     this(statuses, files, selectionRoot, null, false, StatusType.NOT_CHECKED);
   }
 
-  public FileSelection(final List<FileStatus> statuses, final List<String> files, final String selectionRoot,
-      final String cacheFileRoot, final boolean wasAllPartitionsPruned) {
+  public FileSelection(List<FileStatus> statuses, List<Path> files, Path selectionRoot, Path cacheFileRoot,
+      boolean wasAllPartitionsPruned) {
     this(statuses, files, selectionRoot, cacheFileRoot, wasAllPartitionsPruned, StatusType.NOT_CHECKED);
   }
 
-  public FileSelection(final List<FileStatus> statuses, final List<String> files, final String selectionRoot,
-      final String cacheFileRoot, final boolean wasAllPartitionsPruned, final StatusType dirStatus) {
+  public FileSelection(List<FileStatus> statuses, List<Path> files, Path selectionRoot, Path cacheFileRoot,
+      boolean wasAllPartitionsPruned, StatusType dirStatus) {
     this.statuses = statuses;
     this.files = files;
     this.selectionRoot = selectionRoot;
@@ -104,7 +107,7 @@ public class FileSelection {
   /**
    * Copy constructor for convenience.
    */
-  protected FileSelection(final FileSelection selection) {
+  protected FileSelection(FileSelection selection) {
     Preconditions.checkNotNull(selection, "selection cannot be null");
     this.statuses = selection.statuses;
     this.files = selection.files;
@@ -116,17 +119,17 @@ public class FileSelection {
     this.wasAllPartitionsPruned = selection.wasAllPartitionsPruned;
   }
 
-  public String getSelectionRoot() {
+  public Path getSelectionRoot() {
     return selectionRoot;
   }
 
-  public List<FileStatus> getStatuses(final DrillFileSystem fs) throws IOException {
+  public List<FileStatus> getStatuses(DrillFileSystem fs) throws IOException {
     Stopwatch timer = logger.isDebugEnabled() ? Stopwatch.createStarted() : null;
 
     if (statuses == null)  {
-      final List<FileStatus> newStatuses = Lists.newArrayList();
-      for (final String pathStr:files) {
-        newStatuses.add(fs.getFileStatus(new Path(pathStr)));
+      List<FileStatus> newStatuses = Lists.newArrayList();
+      for (Path pathStr : files) {
+        newStatuses.add(fs.getFileStatus(pathStr));
       }
       statuses = newStatuses;
     }
@@ -139,11 +142,11 @@ public class FileSelection {
     return statuses;
   }
 
-  public List<String> getFiles() {
+  public List<Path> getFiles() {
     if (files == null) {
-      final List<String> newFiles = Lists.newArrayList();
-      for (final FileStatus status:statuses) {
-        newFiles.add(status.getPath().toString());
+      List<Path> newFiles = Lists.newArrayList();
+      for (FileStatus status:statuses) {
+        newFiles.add(status.getPath());
       }
       files = newFiles;
     }
@@ -153,7 +156,7 @@ public class FileSelection {
   public boolean containsDirectories(DrillFileSystem fs) throws IOException {
     if (dirStatus == StatusType.NOT_CHECKED) {
       dirStatus = StatusType.NO_DIRS;
-      for (final FileStatus status : getStatuses(fs)) {
+      for (FileStatus status : getStatuses(fs)) {
         if (status.isDirectory()) {
           dirStatus = StatusType.HAS_DIRS;
           break;
@@ -175,7 +178,7 @@ public class FileSelection {
       nonDirectories.addAll(DrillFileSystemUtil.listFiles(fs, status.getPath(), true));
     }
 
-    final FileSelection fileSel = create(nonDirectories, null, selectionRoot);
+    FileSelection fileSel = create(nonDirectories, null, selectionRoot);
     if (timer != null) {
       logger.debug("FileSelection.minusDirectories() took {} ms, numFiles: {}", timer.elapsed(TimeUnit.MILLISECONDS), statuses.size());
       timer.stop();
@@ -223,38 +226,36 @@ public class FileSelection {
    * @param files  list of files.
    * @return  longest common path
    */
-  private static String commonPathForFiles(final List<String> files) {
+  private static Path commonPathForFiles(List<Path> files) {
     if (files == null || files.isEmpty()) {
-      return "";
+      return new Path("/");
     }
 
-    final int total = files.size();
-    final String[][] folders = new String[total][];
+    int total = files.size();
+    String[][] folders = new String[total][];
     int shortest = Integer.MAX_VALUE;
     for (int i = 0; i < total; i++) {
-      final Path path = new Path(files.get(i));
-      folders[i] = Path.getPathWithoutSchemeAndAuthority(path).toString().split(Path.SEPARATOR);
+      folders[i] = files.get(i).toUri().getPath().split(Path.SEPARATOR);
       shortest = Math.min(shortest, folders[i].length);
     }
 
     int latest;
     out:
     for (latest = 0; latest < shortest; latest++) {
-      final String current = folders[0][latest];
+      String current = folders[0][latest];
       for (int i = 1; i < folders.length; i++) {
         if (!current.equals(folders[i][latest])) {
           break out;
         }
       }
     }
-    final Path path = new Path(files.get(0));
-    final URI uri = path.toUri();
-    final String pathString = buildPath(folders[0], latest);
-    return new Path(uri.getScheme(), uri.getAuthority(), pathString).toString();
+    URI uri = files.get(0).toUri();
+    String pathString = buildPath(folders[0], latest);
+    return new Path(uri.getScheme(), uri.getAuthority(), pathString);
   }
 
-  private static String buildPath(final String[] path, final int folderIndex) {
-    final StringBuilder builder = new StringBuilder();
+  private static String buildPath(String[] path, int folderIndex) {
+    StringBuilder builder = new StringBuilder();
     for (int i=0; i<folderIndex; i++) {
       builder.append(path[i]).append(Path.SEPARATOR);
     }
@@ -262,20 +263,20 @@ public class FileSelection {
     return builder.toString();
   }
 
-  public static FileSelection create(final DrillFileSystem fs, final String parent, final String path,
-      final boolean allowAccessOutsideWorkspace) throws IOException {
+  public static FileSelection create(DrillFileSystem fs, String parent, String path,
+      boolean allowAccessOutsideWorkspace) throws IOException {
     Stopwatch timer = logger.isDebugEnabled() ? Stopwatch.createStarted() : null;
     boolean hasWildcard = path.contains(WILD_CARD);
 
-    final Path combined = new Path(parent, removeLeadingSlash(path));
+    Path combined = new Path(parent, removeLeadingSlash(path));
     if (!allowAccessOutsideWorkspace) {
       checkBackPaths(new Path(parent).toUri().getPath(), combined.toUri().getPath(), path);
     }
-    final FileStatus[] statuses = fs.globStatus(combined); // note: this would expand wildcards
+    FileStatus[] statuses = fs.globStatus(combined); // note: this would expand wildcards
     if (statuses == null) {
       return null;
     }
-    final FileSelection fileSel = create(Lists.newArrayList(statuses), null, combined.toUri().getPath());
+    FileSelection fileSel = create(Arrays.asList(statuses), null, combined);
     if (timer != null) {
       logger.debug("FileSelection.create() took {} ms ", timer.elapsed(TimeUnit.MILLISECONDS));
       timer.stop();
@@ -298,62 +299,51 @@ public class FileSelection {
    * @return  null if creation of {@link FileSelection} fails with an {@link IllegalArgumentException}
    *          otherwise a new selection.
    *
-   * @see FileSelection#FileSelection(List, List, String)
+   * @see FileSelection#FileSelection(List, List, Path)
    */
-  public static FileSelection create(final List<FileStatus> statuses, final List<String> files, final String root,
-      final String cacheFileRoot, final boolean wasAllPartitionsPruned) {
-    final boolean bothNonEmptySelection = (statuses != null && statuses.size() > 0) && (files != null && files.size() > 0);
-    final boolean bothEmptySelection = (statuses == null || statuses.size() == 0) && (files == null || files.size() == 0);
+  public static FileSelection create(List<FileStatus> statuses, List<Path> files, Path root,
+      Path cacheFileRoot, boolean wasAllPartitionsPruned) {
+    boolean bothNonEmptySelection = (statuses != null && statuses.size() > 0) && (files != null && files.size() > 0);
+    boolean bothEmptySelection = (statuses == null || statuses.size() == 0) && (files == null || files.size() == 0);
 
     if (bothNonEmptySelection || bothEmptySelection) {
       return null;
     }
 
-    final String selectionRoot;
+    Path selectionRoot;
     if (statuses == null || statuses.isEmpty()) {
       selectionRoot = commonPathForFiles(files);
     } else {
-      if (Strings.isNullOrEmpty(root)) {
-        throw new DrillRuntimeException("Selection root is null or empty" + root);
-      }
-      final Path rootPath = handleWildCard(root);
-      final URI uri = statuses.get(0).getPath().toUri();
-      final Path path = new Path(uri.getScheme(), uri.getAuthority(), rootPath.toUri().getPath());
-      selectionRoot = path.toString();
+      Objects.requireNonNull(root, "Selection root is null");
+      Path rootPath = handleWildCard(root);
+      URI uri = statuses.get(0).getPath().toUri();
+      selectionRoot = new Path(uri.getScheme(), uri.getAuthority(), rootPath.toUri().getPath());
     }
     return new FileSelection(statuses, files, selectionRoot, cacheFileRoot, wasAllPartitionsPruned);
   }
 
-  public static FileSelection create(final List<FileStatus> statuses, final List<String> files, final String root) {
+  public static FileSelection create(List<FileStatus> statuses, List<Path> files, Path root) {
     return FileSelection.create(statuses, files, root, null, false);
   }
 
-  public static FileSelection createFromDirectories(final List<String> dirPaths, final FileSelection selection,
-      final String cacheFileRoot) {
+  public static FileSelection createFromDirectories(List<Path> dirPaths, FileSelection selection,
+      Path cacheFileRoot) {
     Stopwatch timer = logger.isDebugEnabled() ? Stopwatch.createStarted() : null;
-    final String root = selection.getSelectionRoot();
-    if (Strings.isNullOrEmpty(root)) {
-      throw new DrillRuntimeException("Selection root is null or empty" + root);
-    }
+    Path root = selection.getSelectionRoot();
+    Objects.requireNonNull(root, "Selection root is null");
     if (dirPaths == null || dirPaths.isEmpty()) {
       throw new DrillRuntimeException("List of directories is null or empty");
     }
 
-    List<String> dirs = Lists.newArrayList();
+    // for wildcard the directory list should have already been expanded
+    List<Path> dirs = selection.hadWildcard() ? selection.getFileStatuses().stream()
+        .map(FileStatus::getPath)
+        .collect(Collectors.toList()) : new ArrayList<>(dirPaths);
 
-    if (selection.hadWildcard()) { // for wildcard the directory list should have already been expanded
-      for (FileStatus status : selection.getFileStatuses()) {
-        dirs.add(status.getPath().toString());
-      }
-    } else {
-      dirs.addAll(dirPaths);
-    }
-
-    final Path rootPath = handleWildCard(root);
-    // final URI uri = dirPaths.get(0).toUri();
-    final URI uri = selection.getFileStatuses().get(0).getPath().toUri();
-    final Path path = new Path(uri.getScheme(), uri.getAuthority(), rootPath.toUri().getPath());
-    FileSelection fileSel = new FileSelection(null, dirs, path.toString(), cacheFileRoot, false);
+    Path rootPath = handleWildCard(root);
+    URI uri = selection.getFileStatuses().get(0).getPath().toUri();
+    Path path = new Path(uri.getScheme(), uri.getAuthority(), rootPath.toUri().getPath());
+    FileSelection fileSel = new FileSelection(null, dirs, path, cacheFileRoot, false);
     fileSel.setHadWildcard(selection.hadWildcard());
     if (timer != null) {
       logger.debug("FileSelection.createFromDirectories() took {} ms ", timer.elapsed(TimeUnit.MILLISECONDS));
@@ -362,18 +352,15 @@ public class FileSelection {
     return fileSel;
   }
 
-  private static Path handleWildCard(final String root) {
-    if (root.contains(WILD_CARD)) {
-      int idx = root.indexOf(WILD_CARD); // first wild card in the path
-      idx = root.lastIndexOf('/', idx); // file separator right before the first wild card
-      final String newRoot = root.substring(0, idx);
-      if (newRoot.length() == 0) {
-          // Ensure that we always return a valid root.
-          return new Path("/");
-      }
-      return new Path(newRoot);
+  private static Path handleWildCard(Path root) {
+    String stringRoot = root.toUri().getPath();
+    if (stringRoot.contains(WILD_CARD)) {
+      int idx = stringRoot.indexOf(WILD_CARD); // first wild card in the path
+      idx = stringRoot.lastIndexOf('/', idx); // file separator right before the first wild card
+      String newRoot = stringRoot.substring(0, idx);
+      return DrillFileSystemUtil.createPathSafe(newRoot);
     } else {
-      return new Path(root);
+      return new Path(stringRoot);
     }
   }
 
@@ -426,7 +413,7 @@ public class FileSelection {
     return this.hadWildcard;
   }
 
-  public String getCacheFileRoot() {
+  public Path getCacheFileRoot() {
     return cacheFileRoot;
   }
 
@@ -456,12 +443,12 @@ public class FileSelection {
 
   @Override
   public String toString() {
-    final StringBuilder sb = new StringBuilder();
+    StringBuilder sb = new StringBuilder();
     sb.append("root=").append(this.selectionRoot);
 
     sb.append("files=[");
     boolean isFirst = true;
-    for (final String file : this.files) {
+    for (Path file : this.files) {
       if (isFirst) {
         isFirst = false;
         sb.append(file);
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FormatSelection.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FormatSelection.java
index 40549cc..7d7bcfa 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FormatSelection.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FormatSelection.java
@@ -24,6 +24,7 @@ import org.apache.drill.common.logical.FormatPluginConfig;
 import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonIgnore;
 import com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.hadoop.fs.Path;
 
 
 public class FormatSelection {
@@ -32,16 +33,15 @@ public class FormatSelection {
   private FormatPluginConfig format;
   private FileSelection selection;
 
-  public FormatSelection(){}
+  public FormatSelection() {}
 
   @JsonCreator
-  public FormatSelection(@JsonProperty("format") FormatPluginConfig format, @JsonProperty("files") List<String> files){
+  public FormatSelection(@JsonProperty("format") FormatPluginConfig format, @JsonProperty("files") List<Path> files){
     this.format = format;
     this.selection = FileSelection.create(null, files, null);
   }
 
   public FormatSelection(FormatPluginConfig format, FileSelection selection) {
-    super();
     this.format = format;
     this.selection = selection;
   }
@@ -52,7 +52,7 @@ public class FormatSelection {
   }
 
   @JsonProperty("files")
-  public List<String> getAsFiles(){
+  public List<Path> getAsFiles(){
     return selection.getFiles();
   }
 
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/MetadataContext.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/MetadataContext.java
index 877ceb6..0738478 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/MetadataContext.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/MetadataContext.java
@@ -20,6 +20,7 @@ package org.apache.drill.exec.store.dfs;
 import java.util.Map;
 
 import org.apache.drill.shaded.guava.com.google.common.collect.Maps;
+import org.apache.hadoop.fs.Path;
 
 /**
  * A metadata context that holds state across multiple invocations of
@@ -32,17 +33,17 @@ public class MetadataContext {
    *  Note: the #directories is typically a small percentage of the #files, so the memory footprint
    *  is expected to be relatively small.
    */
-  private Map<String, Boolean> dirModifCheckMap = Maps.newHashMap();
+  private Map<Path, Boolean> dirModifCheckMap = Maps.newHashMap();
 
   private PruneStatus pruneStatus = PruneStatus.NOT_STARTED;
 
   private boolean metadataCacheCorrupted;
 
-  public void setStatus(String dir) {
+  public void setStatus(Path dir) {
     dirModifCheckMap.put(dir,  true);
   }
 
-  public void clearStatus(String dir) {
+  public void clearStatus(Path dir) {
     dirModifCheckMap.put(dir,  false);
   }
 
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/ReadEntryFromHDFS.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/ReadEntryFromHDFS.java
index 15107ac..ec925e3 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/ReadEntryFromHDFS.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/ReadEntryFromHDFS.java
@@ -21,6 +21,7 @@ import org.apache.drill.exec.store.dfs.easy.FileWork;
 
 import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.hadoop.fs.Path;
 
 public class ReadEntryFromHDFS extends ReadEntryWithPath implements FileWork {
 
@@ -28,7 +29,7 @@ public class ReadEntryFromHDFS extends ReadEntryWithPath implements FileWork {
   private long length;
 
   @JsonCreator
-  public ReadEntryFromHDFS(@JsonProperty("path") String path,@JsonProperty("start") long start, @JsonProperty("length") long length) {
+  public ReadEntryFromHDFS(@JsonProperty("path") Path path, @JsonProperty("start") long start, @JsonProperty("length") long length) {
     super(path);
     this.start = start;
     this.length = length;
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/ReadEntryWithPath.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/ReadEntryWithPath.java
index 4564e15..88bd9fb 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/ReadEntryWithPath.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/ReadEntryWithPath.java
@@ -18,19 +18,20 @@
 package org.apache.drill.exec.store.dfs;
 
 
+import org.apache.hadoop.fs.Path;
+
 public class ReadEntryWithPath {
 
-  protected String path;
+  protected Path path;
 
+  // Default constructor is needed for deserialization
+  public ReadEntryWithPath() {}
 
-  public ReadEntryWithPath(String path) {
-    super();
+  public ReadEntryWithPath(Path path) {
     this.path = path;
   }
 
-  public ReadEntryWithPath(){}
-
-  public String getPath(){
+  public Path getPath() {
    return path;
   }
 
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyFormatPlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyFormatPlugin.java
index d3bed8f..d76c648 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyFormatPlugin.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyFormatPlugin.java
@@ -156,7 +156,7 @@ public abstract class EasyFormatPlugin<T extends FormatPluginConfig> implements
     for (FileWork work : scan.getWorkUnits()){
       RecordReader recordReader = getRecordReader(context, dfs, work, scan.getColumns(), scan.getUserName());
       readers.add(recordReader);
-      List<String> partitionValues = ColumnExplorer.listPartitionValues(work.getPath(), scan.getSelectionRoot());
+      List<String> partitionValues = ColumnExplorer.listPartitionValues(work.getPath(), scan.getSelectionRoot(), false);
       Map<String, String> implicitValues = columnExplorer.populateImplicitColumns(work.getPath(), partitionValues, supportsFileImplicitColumns);
       implicitColumns.add(implicitValues);
       if (implicitValues.size() > mapWithMaxColumns.size()) {
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyGroupScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyGroupScan.java
index 759d07f..4449ec0 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyGroupScan.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyGroupScan.java
@@ -18,7 +18,6 @@
 package org.apache.drill.exec.store.dfs.easy;
 
 import java.io.IOException;
-import java.util.Iterator;
 import java.util.List;
 
 import org.apache.drill.common.exceptions.ExecutionSetupException;
@@ -52,6 +51,7 @@ import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
 import org.apache.drill.shaded.guava.com.google.common.collect.Iterators;
 import org.apache.drill.shaded.guava.com.google.common.collect.ListMultimap;
 import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
+import org.apache.hadoop.fs.Path;
 
 @JsonTypeName("fs-scan")
 public class EasyGroupScan extends AbstractFileGroupScan {
@@ -65,17 +65,17 @@ public class EasyGroupScan extends AbstractFileGroupScan {
   private ListMultimap<Integer, CompleteFileWork> mappings;
   private List<CompleteFileWork> chunks;
   private List<EndpointAffinity> endpointAffinities;
-  private String selectionRoot;
+  private Path selectionRoot;
 
   @JsonCreator
   public EasyGroupScan(
       @JsonProperty("userName") String userName,
-      @JsonProperty("files") List<String> files, //
-      @JsonProperty("storage") StoragePluginConfig storageConfig, //
-      @JsonProperty("format") FormatPluginConfig formatConfig, //
-      @JacksonInject StoragePluginRegistry engineRegistry, //
+      @JsonProperty("files") List<Path> files,
+      @JsonProperty("storage") StoragePluginConfig storageConfig,
+      @JsonProperty("format") FormatPluginConfig formatConfig,
+      @JacksonInject StoragePluginRegistry engineRegistry,
       @JsonProperty("columns") List<SchemaPath> columns,
-      @JsonProperty("selectionRoot") String selectionRoot
+      @JsonProperty("selectionRoot") Path selectionRoot
       ) throws IOException, ExecutionSetupException {
         this(ImpersonationUtil.resolveUserName(userName),
             FileSelection.create(null, files, selectionRoot),
@@ -84,17 +84,17 @@ public class EasyGroupScan extends AbstractFileGroupScan {
             selectionRoot);
   }
 
-  public EasyGroupScan(String userName, FileSelection selection, EasyFormatPlugin<?> formatPlugin, String selectionRoot)
+  public EasyGroupScan(String userName, FileSelection selection, EasyFormatPlugin<?> formatPlugin, Path selectionRoot)
       throws IOException {
     this(userName, selection, formatPlugin, ALL_COLUMNS, selectionRoot);
   }
 
   public EasyGroupScan(
       String userName,
-      FileSelection selection, //
-      EasyFormatPlugin<?> formatPlugin, //
+      FileSelection selection,
+      EasyFormatPlugin<?> formatPlugin,
       List<SchemaPath> columns,
-      String selectionRoot
+      Path selectionRoot
       ) throws IOException{
     super(userName);
     this.selection = Preconditions.checkNotNull(selection);
@@ -106,12 +106,7 @@ public class EasyGroupScan extends AbstractFileGroupScan {
 
   @JsonIgnore
   public Iterable<CompleteFileWork> getWorkIterable() {
-    return new Iterable<CompleteFileWork>() {
-      @Override
-      public Iterator<CompleteFileWork> iterator() {
-        return Iterators.unmodifiableIterator(chunks.iterator());
-      }
-    };
+    return () -> Iterators.unmodifiableIterator(chunks.iterator());
   }
 
   private EasyGroupScan(final EasyGroupScan that) {
@@ -136,7 +131,7 @@ public class EasyGroupScan extends AbstractFileGroupScan {
     this.endpointAffinities = AffinityCreator.getAffinityMap(chunks);
   }
 
-  public String getSelectionRoot() {
+  public Path getSelectionRoot() {
     return selectionRoot;
   }
 
@@ -158,7 +153,7 @@ public class EasyGroupScan extends AbstractFileGroupScan {
 
   @JsonProperty("files")
   @Override
-  public List<String> getFiles() {
+  public List<Path> getFiles() {
     return selection.getFiles();
   }
 
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasySubScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasySubScan.java
index 0dbae1e..fbb3f47 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasySubScan.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasySubScan.java
@@ -17,7 +17,6 @@
  */
 package org.apache.drill.exec.store.dfs.easy;
 
-import java.io.IOException;
 import java.util.List;
 
 import org.apache.drill.common.exceptions.ExecutionSetupException;
@@ -34,6 +33,7 @@ import com.fasterxml.jackson.annotation.JsonIgnore;
 import com.fasterxml.jackson.annotation.JsonProperty;
 import com.fasterxml.jackson.annotation.JsonTypeName;
 import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
+import org.apache.hadoop.fs.Path;
 
 @JsonTypeName("fs-sub-scan")
 public class EasySubScan extends AbstractSubScan{
@@ -42,18 +42,18 @@ public class EasySubScan extends AbstractSubScan{
   private final List<FileWorkImpl> files;
   private final EasyFormatPlugin<?> formatPlugin;
   private final List<SchemaPath> columns;
-  private String selectionRoot;
+  private Path selectionRoot;
 
   @JsonCreator
   public EasySubScan(
       @JsonProperty("userName") String userName,
-      @JsonProperty("files") List<FileWorkImpl> files, //
-      @JsonProperty("storage") StoragePluginConfig storageConfig, //
-      @JsonProperty("format") FormatPluginConfig formatConfig, //
-      @JacksonInject StoragePluginRegistry engineRegistry, //
-      @JsonProperty("columns") List<SchemaPath> columns, //
-      @JsonProperty("selectionRoot") String selectionRoot
-      ) throws IOException, ExecutionSetupException {
+      @JsonProperty("files") List<FileWorkImpl> files,
+      @JsonProperty("storage") StoragePluginConfig storageConfig,
+      @JsonProperty("format") FormatPluginConfig formatConfig,
+      @JacksonInject StoragePluginRegistry engineRegistry,
+      @JsonProperty("columns") List<SchemaPath> columns,
+      @JsonProperty("selectionRoot") Path selectionRoot
+      ) throws ExecutionSetupException {
     super(userName);
     this.formatPlugin = (EasyFormatPlugin<?>) engineRegistry.getFormatPlugin(storageConfig, formatConfig);
     Preconditions.checkNotNull(this.formatPlugin);
@@ -63,7 +63,7 @@ public class EasySubScan extends AbstractSubScan{
   }
 
   public EasySubScan(String userName, List<FileWorkImpl> files, EasyFormatPlugin<?> plugin, List<SchemaPath> columns,
-      String selectionRoot){
+      Path selectionRoot){
     super(userName);
     this.formatPlugin = plugin;
     this.files = files;
@@ -72,7 +72,7 @@ public class EasySubScan extends AbstractSubScan{
   }
 
   @JsonProperty
-  public String getSelectionRoot() {
+  public Path getSelectionRoot() {
     return selectionRoot;
   }
 
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/FileWork.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/FileWork.java
index 587201e..3aeb2c2 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/FileWork.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/FileWork.java
@@ -17,8 +17,14 @@
  */
 package org.apache.drill.exec.store.dfs.easy;
 
+
+import org.apache.hadoop.fs.Path;
+
 public interface FileWork {
-  String getPath();
+
+  Path getPath();
+
   long getStart();
+
   long getLength();
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/direct/MetadataDirectGroupScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/direct/MetadataDirectGroupScan.java
index 505d68e..6de8842 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/direct/MetadataDirectGroupScan.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/direct/MetadataDirectGroupScan.java
@@ -18,12 +18,12 @@
 package org.apache.drill.exec.store.direct;
 
 import com.fasterxml.jackson.annotation.JsonTypeName;
-import org.apache.drill.common.exceptions.ExecutionSetupException;
 import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.exec.physical.base.GroupScan;
 import org.apache.drill.exec.physical.base.PhysicalOperator;
 import org.apache.drill.exec.physical.base.ScanStats;
 import org.apache.drill.exec.store.RecordReader;
+import org.apache.hadoop.fs.Path;
 
 import java.util.Collection;
 import java.util.List;
@@ -37,20 +37,20 @@ import java.util.List;
 @JsonTypeName("metadata-direct-scan")
 public class MetadataDirectGroupScan extends DirectGroupScan {
 
-  private final Collection<String> files;
+  private final Collection<Path> files;
 
-  public MetadataDirectGroupScan(RecordReader reader, Collection<String> files) {
+  public MetadataDirectGroupScan(RecordReader reader, Collection<Path> files) {
     super(reader);
     this.files = files;
   }
 
-  public MetadataDirectGroupScan(RecordReader reader, Collection<String> files, ScanStats stats) {
+  public MetadataDirectGroupScan(RecordReader reader, Collection<Path> files, ScanStats stats) {
     super(reader, stats);
     this.files = files;
   }
 
   @Override
-  public PhysicalOperator getNewWithChildren(List<PhysicalOperator> children) throws ExecutionSetupException {
+  public PhysicalOperator getNewWithChildren(List<PhysicalOperator> children) {
     assert children == null || children.isEmpty();
     return new MetadataDirectGroupScan(reader, files, stats);
   }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONRecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONRecordReader.java
index 428a4e1..d3fcc5a 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONRecordReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONRecordReader.java
@@ -77,7 +77,7 @@ public class JSONRecordReader extends AbstractRecordReader {
    * @param columns  pathnames of columns/subfields to read
    * @throws OutOfMemoryException
    */
-  public JSONRecordReader(final FragmentContext fragmentContext, final String inputPath, final DrillFileSystem fileSystem,
+  public JSONRecordReader(final FragmentContext fragmentContext, final Path inputPath, final DrillFileSystem fileSystem,
       final List<SchemaPath> columns) throws OutOfMemoryException {
     this(fragmentContext, inputPath, null, fileSystem, columns);
   }
@@ -90,14 +90,13 @@ public class JSONRecordReader extends AbstractRecordReader {
    * @param columns  pathnames of columns/subfields to read
    * @throws OutOfMemoryException
    */
-  public JSONRecordReader(final FragmentContext fragmentContext, final JsonNode embeddedContent,
-      final DrillFileSystem fileSystem, final List<SchemaPath> columns) throws OutOfMemoryException {
+  public JSONRecordReader(FragmentContext fragmentContext, JsonNode embeddedContent, DrillFileSystem fileSystem,
+      List<SchemaPath> columns) throws OutOfMemoryException {
     this(fragmentContext, null, embeddedContent, fileSystem, columns);
   }
 
-  private JSONRecordReader(final FragmentContext fragmentContext, final String inputPath,
-      final JsonNode embeddedContent, final DrillFileSystem fileSystem,
-      final List<SchemaPath> columns) {
+  private JSONRecordReader(FragmentContext fragmentContext, Path inputPath, JsonNode embeddedContent,
+      DrillFileSystem fileSystem, List<SchemaPath> columns) {
 
     Preconditions.checkArgument(
         (inputPath == null && embeddedContent != null) ||
@@ -106,7 +105,7 @@ public class JSONRecordReader extends AbstractRecordReader {
         );
 
     if (inputPath != null) {
-      this.hadoopPath = new Path(inputPath);
+      this.hadoopPath = inputPath;
     } else {
       this.embeddedContent = embeddedContent;
     }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/sequencefile/SequenceFileFormatPlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/sequencefile/SequenceFileFormatPlugin.java
index 9dbe715..ec4bb12 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/sequencefile/SequenceFileFormatPlugin.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/sequencefile/SequenceFileFormatPlugin.java
@@ -87,7 +87,7 @@ public class SequenceFileFormatPlugin extends EasyFormatPlugin<SequenceFileForma
                                       FileWork fileWork,
                                       List<SchemaPath> columns,
                                       String userName) throws ExecutionSetupException {
-    final Path path = dfs.makeQualified(new Path(fileWork.getPath()));
+    final Path path = dfs.makeQualified(fileWork.getPath());
     final FileSplit split = new FileSplit(path, fileWork.getStart(), fileWork.getLength(), new String[]{""});
     return new SequenceFileRecordReader(split, dfs, context.getQueryUserName(), userName);
   }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/TextFormatPlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/TextFormatPlugin.java
index 1c53a37..03ae6f5 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/TextFormatPlugin.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/TextFormatPlugin.java
@@ -82,7 +82,7 @@ public class TextFormatPlugin extends EasyFormatPlugin<TextFormatPlugin.TextForm
                                       FileWork fileWork,
                                       List<SchemaPath> columns,
                                       String userName) {
-    Path path = dfs.makeQualified(new Path(fileWork.getPath()));
+    Path path = dfs.makeQualified(fileWork.getPath());
     FileSplit split = new FileSplit(path, fileWork.getStart(), fileWork.getLength(), new String[]{""});
 
     if (context.getOptions().getBoolean(ExecConstants.ENABLE_NEW_TEXT_READER_KEY)) {
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/httpd/HttpdLogFormatPlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/httpd/HttpdLogFormatPlugin.java
index 3958a32..5a78732 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/httpd/HttpdLogFormatPlugin.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/httpd/HttpdLogFormatPlugin.java
@@ -192,7 +192,7 @@ public class HttpdLogFormatPlugin extends EasyFormatPlugin<HttpdLogFormatPlugin.
             HttpdLogFormatPlugin.this.getConfig().getTimestampFormat(),
             fieldMapping);
 
-        final Path path = fs.makeQualified(new Path(work.getPath()));
+        final Path path = fs.makeQualified(work.getPath());
         FileSplit split = new FileSplit(path, work.getStart(), work.getLength(), new String[]{""});
         TextInputFormat inputFormat = new TextInputFormat();
         JobConf job = new JobConf(fs.getConf());
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/image/ImageFormatPlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/image/ImageFormatPlugin.java
index 15ea1b4..048aa82 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/image/ImageFormatPlugin.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/image/ImageFormatPlugin.java
@@ -22,7 +22,6 @@ import java.io.IOException;
 import java.util.Collections;
 import java.util.List;
 
-import org.apache.drill.common.exceptions.ExecutionSetupException;
 import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.common.logical.StoragePluginConfig;
 import org.apache.drill.exec.ops.FragmentContext;
@@ -57,11 +56,9 @@ public class ImageFormatPlugin extends EasyFormatPlugin<ImageFormatConfig> {
 
   @Override
   public RecordReader getRecordReader(FragmentContext context, DrillFileSystem dfs, FileWork fileWork,
-      List<SchemaPath> columns, String userName) throws ExecutionSetupException {
-    return new ImageRecordReader(context, dfs, fileWork.getPath(),
-        ((ImageFormatConfig)formatConfig).hasFileSystemMetadata(),
-        ((ImageFormatConfig)formatConfig).isDescriptive(),
-        ((ImageFormatConfig)formatConfig).getTimeZone());
+      List<SchemaPath> columns, String userName) {
+    return new ImageRecordReader(context, dfs, fileWork.getPath(), formatConfig.hasFileSystemMetadata(),
+        formatConfig.isDescriptive(), formatConfig.getTimeZone());
   }
 
   @Override
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/image/ImageRecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/image/ImageRecordReader.java
index 2a4b4fb..08ed4fd 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/image/ImageRecordReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/image/ImageRecordReader.java
@@ -99,10 +99,10 @@ public class ImageRecordReader extends AbstractRecordReader {
   private DrillBuf managedBuffer;
   private boolean finish;
 
-  public ImageRecordReader(FragmentContext context, DrillFileSystem fs, String inputPath,
+  public ImageRecordReader(FragmentContext context, DrillFileSystem fs, Path inputPath,
                            boolean fileSystemMetadata, boolean descriptive, String timeZone) {
     this.fs = fs;
-    hadoopPath = fs.makeQualified(new Path(inputPath));
+    hadoopPath = fs.makeQualified(inputPath);
     this.fileSystemMetadata = fileSystemMetadata;
     this.descriptive = descriptive;
     this.timeZone = (timeZone != null) ? TimeZone.getTimeZone(timeZone) : TimeZone.getDefault();
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/log/LogRecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/log/LogRecordReader.java
index e5d1dc4..ae0b733 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/log/LogRecordReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/log/LogRecordReader.java
@@ -41,9 +41,6 @@ import org.apache.drill.exec.vector.NullableVarCharVector;
 import org.apache.drill.exec.vector.NullableTimeStampVector;
 import org.apache.drill.exec.vector.NullableTimeVector;
 
-
-import org.apache.hadoop.fs.Path;
-
 import java.io.BufferedReader;
 import java.io.IOException;
 import java.io.InputStream;
@@ -522,7 +519,7 @@ public class LogRecordReader extends AbstractRecordReader {
   private void openFile() {
     InputStream in;
     try {
-      in = dfs.open(new Path(fileWork.getPath()));
+      in = dfs.open(fileWork.getPath());
     } catch (Exception e) {
       throw UserException
           .dataReadError(e)
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/AbstractParquetGroupScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/AbstractParquetGroupScan.java
index e2d3569..5528d02 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/AbstractParquetGroupScan.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/AbstractParquetGroupScan.java
@@ -52,6 +52,7 @@ import org.apache.drill.exec.store.schedule.AffinityCreator;
 import org.apache.drill.exec.store.schedule.AssignmentCreator;
 import org.apache.drill.exec.store.schedule.EndpointByteMap;
 import org.apache.drill.exec.store.schedule.EndpointByteMapImpl;
+import org.apache.hadoop.fs.Path;
 
 import java.io.IOException;
 import java.util.ArrayList;
@@ -80,7 +81,7 @@ public abstract class AbstractParquetGroupScan extends AbstractFileGroupScan {
   protected ParquetTableMetadataBase parquetTableMetadata;
   protected List<RowGroupInfo> rowGroupInfos;
   protected ListMultimap<Integer, RowGroupInfo> mappings;
-  protected Set<String> fileSet;
+  protected Set<Path> fileSet;
   protected ParquetReaderConfig readerConfig;
 
   private List<EndpointAffinity> endpointAffinities;
@@ -146,7 +147,7 @@ public abstract class AbstractParquetGroupScan extends AbstractFileGroupScan {
 
   @JsonIgnore
   @Override
-  public Collection<String> getFiles() {
+  public Collection<Path> getFiles() {
     return fileSet;
   }
 
@@ -428,12 +429,12 @@ public abstract class AbstractParquetGroupScan extends AbstractFileGroupScan {
   }
 
   @JsonIgnore
-  public <T> T getPartitionValue(String path, SchemaPath column, Class<T> clazz) {
+  public <T> T getPartitionValue(Path path, SchemaPath column, Class<T> clazz) {
     return clazz.cast(parquetGroupScanStatistics.getPartitionValue(path, column));
   }
 
   @JsonIgnore
-  public Set<String> getFileSet() {
+  public Set<Path> getFileSet() {
     return fileSet;
   }
   // partition pruning methods end
@@ -441,7 +442,7 @@ public abstract class AbstractParquetGroupScan extends AbstractFileGroupScan {
   // helper method used for partition pruning and filter push down
   @Override
   public void modifyFileSelection(FileSelection selection) {
-    List<String> files = selection.getFiles();
+    List<Path> files = selection.getFiles();
     fileSet = new HashSet<>(files);
     entries = new ArrayList<>(files.size());
 
@@ -464,7 +465,7 @@ public abstract class AbstractParquetGroupScan extends AbstractFileGroupScan {
     if (fileSet == null) {
       fileSet = new HashSet<>();
       fileSet.addAll(parquetTableMetadata.getFiles().stream()
-          .map((Function<ParquetFileMetadata, String>) ParquetFileMetadata::getPath)
+          .map((Function<ParquetFileMetadata, Path>) ParquetFileMetadata::getPath)
           .collect(Collectors.toSet()));
     }
 
@@ -505,7 +506,7 @@ public abstract class AbstractParquetGroupScan extends AbstractFileGroupScan {
   // abstract methods block start
   protected abstract void initInternal() throws IOException;
   protected abstract Collection<CoordinationProtos.DrillbitEndpoint> getDrillbits();
-  protected abstract AbstractParquetGroupScan cloneWithFileSelection(Collection<String> filePaths) throws IOException;
+  protected abstract AbstractParquetGroupScan cloneWithFileSelection(Collection<Path> filePaths) throws IOException;
   protected abstract boolean supportsFileImplicitColumns();
   protected abstract List<String> getPartitionValues(RowGroupInfo rowGroupInfo);
   // abstract methods block end
@@ -520,7 +521,7 @@ public abstract class AbstractParquetGroupScan extends AbstractFileGroupScan {
    * @return new parquet group scan
    */
   private AbstractParquetGroupScan cloneWithRowGroupInfos(List<RowGroupInfo> rowGroupInfos) throws IOException {
-    Set<String> filePaths = rowGroupInfos.stream()
+    Set<Path> filePaths = rowGroupInfos.stream()
       .map(ReadEntryWithPath::getPath)
       .collect(Collectors.toSet()); // set keeps file names unique
     AbstractParquetGroupScan scan = cloneWithFileSelection(filePaths);
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/AbstractParquetScanBatchCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/AbstractParquetScanBatchCreator.java
index 99161fd..b1819e6 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/AbstractParquetScanBatchCreator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/AbstractParquetScanBatchCreator.java
@@ -52,7 +52,8 @@ public abstract class AbstractParquetScanBatchCreator {
 
   private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(AbstractParquetScanBatchCreator.class);
 
-  protected ScanBatch getBatch(ExecutorFragmentContext context, AbstractParquetRowGroupScan rowGroupScan, OperatorContext oContext) throws ExecutionSetupException {
+  protected ScanBatch getBatch(ExecutorFragmentContext context, AbstractParquetRowGroupScan rowGroupScan,
+                               OperatorContext oContext) throws ExecutionSetupException {
     final ColumnExplorer columnExplorer = new ColumnExplorer(context.getOptions(), rowGroupScan.getColumns());
 
     if (!columnExplorer.isStarQuery()) {
@@ -63,7 +64,7 @@ public abstract class AbstractParquetScanBatchCreator {
     AbstractDrillFileSystemManager fsManager = getDrillFileSystemCreator(oContext, context.getOptions());
 
     // keep footers in a map to avoid re-reading them
-    Map<String, ParquetMetadata> footers = new HashMap<>();
+    Map<Path, ParquetMetadata> footers = new HashMap<>();
     List<RecordReader> readers = new LinkedList<>();
     List<Map<String, String>> implicitColumns = new ArrayList<>();
     Map<String, String> mapWithMaxColumns = new LinkedHashMap<>();
@@ -150,8 +151,8 @@ public abstract class AbstractParquetScanBatchCreator {
 
   protected abstract AbstractDrillFileSystemManager getDrillFileSystemCreator(OperatorContext operatorContext, OptionManager optionManager);
 
-  private ParquetMetadata readFooter(Configuration conf, String path, ParquetReaderConfig readerConfig) throws IOException {
-    try (ParquetFileReader reader = ParquetFileReader.open(HadoopInputFile.fromPath(new Path(path),
+  private ParquetMetadata readFooter(Configuration conf, Path path, ParquetReaderConfig readerConfig) throws IOException {
+    try (ParquetFileReader reader = ParquetFileReader.open(HadoopInputFile.fromPath(path,
       readerConfig.addCountersToConf(conf)), readerConfig.toReadOptions())) {
       return reader.getFooter();
     }
@@ -168,6 +169,6 @@ public abstract class AbstractParquetScanBatchCreator {
       this.operatorContext = operatorContext;
     }
 
-    protected abstract DrillFileSystem get(Configuration config, String path) throws ExecutionSetupException;
+    protected abstract DrillFileSystem get(Configuration config, Path path) throws ExecutionSetupException;
   }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java
index a1d9f51..cb9e9b9 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java
@@ -68,8 +68,8 @@ public class ParquetGroupScan extends AbstractParquetGroupScan {
   private final MetadataContext metaContext;
   private boolean usedMetadataCache; // false by default
   // may change when filter push down / partition pruning is applied
-  private String selectionRoot;
-  private String cacheFileRoot;
+  private Path selectionRoot;
+  private Path cacheFileRoot;
 
   @JsonCreator
   public ParquetGroupScan(@JacksonInject StoragePluginRegistry engineRegistry,
@@ -78,8 +78,8 @@ public class ParquetGroupScan extends AbstractParquetGroupScan {
                           @JsonProperty("storage") StoragePluginConfig storageConfig,
                           @JsonProperty("format") FormatPluginConfig formatConfig,
                           @JsonProperty("columns") List<SchemaPath> columns,
-                          @JsonProperty("selectionRoot") String selectionRoot,
-                          @JsonProperty("cacheFileRoot") String cacheFileRoot,
+                          @JsonProperty("selectionRoot") Path selectionRoot,
+                          @JsonProperty("cacheFileRoot") Path cacheFileRoot,
                           @JsonProperty("readerConfig") ParquetReaderConfig readerConfig,
                           @JsonProperty("filter") LogicalExpression filter) throws IOException, ExecutionSetupException {
     super(ImpersonationUtil.resolveUserName(userName), columns, entries, readerConfig, filter);
@@ -127,7 +127,7 @@ public class ParquetGroupScan extends AbstractParquetGroupScan {
         // The fully expanded list is already stored as part of the fileSet
         entries.add(new ReadEntryWithPath(fileSelection.getSelectionRoot()));
       } else {
-        for (String fileName : fileSelection.getFiles()) {
+        for (Path fileName : fileSelection.getFiles()) {
           entries.add(new ReadEntryWithPath(fileName));
         }
       }
@@ -169,12 +169,12 @@ public class ParquetGroupScan extends AbstractParquetGroupScan {
   }
 
   @JsonProperty
-  public String getSelectionRoot() {
+  public Path getSelectionRoot() {
     return selectionRoot;
   }
 
   @JsonProperty
-  public String getCacheFileRoot() {
+  public Path getCacheFileRoot() {
     return cacheFileRoot;
   }
   // getters for serialization / deserialization end
@@ -224,8 +224,8 @@ public class ParquetGroupScan extends AbstractParquetGroupScan {
       // For EXPLAIN, remove the URI prefix from cacheFileRoot.  If cacheFileRoot is null, we
       // would have read the cache file from selectionRoot
       String cacheFileRootString = (cacheFileRoot == null) ?
-          Path.getPathWithoutSchemeAndAuthority(new Path(selectionRoot)).toString() :
-          Path.getPathWithoutSchemeAndAuthority(new Path(cacheFileRoot)).toString();
+          Path.getPathWithoutSchemeAndAuthority(selectionRoot).toString() :
+          Path.getPathWithoutSchemeAndAuthority(cacheFileRoot).toString();
       builder.append(", cacheFileRoot=").append(cacheFileRootString);
     }
 
@@ -241,7 +241,7 @@ public class ParquetGroupScan extends AbstractParquetGroupScan {
     FileSystem processUserFileSystem = ImpersonationUtil.createFileSystem(ImpersonationUtil.getProcessUserName(), fs.getConf());
     Path metaPath = null;
     if (entries.size() == 1 && parquetTableMetadata == null) {
-      Path p = Path.getPathWithoutSchemeAndAuthority(new Path(entries.get(0).getPath()));
+      Path p = Path.getPathWithoutSchemeAndAuthority(entries.get(0).getPath());
       if (fs.isDirectory(p)) {
         // Using the metadata file makes sense when querying a directory; otherwise
         // if querying a single file we can look up the metadata directly from the file
@@ -257,9 +257,9 @@ public class ParquetGroupScan extends AbstractParquetGroupScan {
         parquetTableMetadata = Metadata.getParquetTableMetadata(processUserFileSystem, p.toString(), readerConfig);
       }
     } else {
-      Path p = Path.getPathWithoutSchemeAndAuthority(new Path(selectionRoot));
+      Path p = Path.getPathWithoutSchemeAndAuthority(selectionRoot);
       metaPath = new Path(p, Metadata.METADATA_FILENAME);
-      if (!metaContext.isMetadataCacheCorrupted() && fs.isDirectory(new Path(selectionRoot))
+      if (!metaContext.isMetadataCacheCorrupted() && fs.isDirectory(selectionRoot)
           && fs.exists(metaPath)) {
         if (parquetTableMetadata == null) {
           parquetTableMetadata = Metadata.readBlockMeta(processUserFileSystem, metaPath, metaContext, readerConfig);
@@ -275,7 +275,7 @@ public class ParquetGroupScan extends AbstractParquetGroupScan {
         final List<FileStatus> fileStatuses = new ArrayList<>();
         for (ReadEntryWithPath entry : entries) {
           fileStatuses.addAll(
-              DrillFileSystemUtil.listFiles(fs, Path.getPathWithoutSchemeAndAuthority(new Path(entry.getPath())), true));
+              DrillFileSystemUtil.listFiles(fs, Path.getPathWithoutSchemeAndAuthority(entry.getPath()), true));
         }
 
         Map<FileStatus, FileSystem> statusMap = fileStatuses.stream()
@@ -292,7 +292,7 @@ public class ParquetGroupScan extends AbstractParquetGroupScan {
   }
 
   @Override
-  protected AbstractParquetGroupScan cloneWithFileSelection(Collection<String> filePaths) throws IOException {
+  protected AbstractParquetGroupScan cloneWithFileSelection(Collection<Path> filePaths) throws IOException {
     FileSelection newSelection = new FileSelection(null, new ArrayList<>(filePaths), getSelectionRoot(), cacheFileRoot, false);
     return clone(newSelection);
   }
@@ -309,7 +309,7 @@ public class ParquetGroupScan extends AbstractParquetGroupScan {
 
   @Override
   protected List<String> getPartitionValues(RowGroupInfo rowGroupInfo) {
-    return ColumnExplorer.listPartitionValues(rowGroupInfo.getPath(), selectionRoot);
+    return ColumnExplorer.listPartitionValues(rowGroupInfo.getPath(), selectionRoot, false);
   }
   // overridden protected methods block end
 
@@ -425,7 +425,7 @@ public class ParquetGroupScan extends AbstractParquetGroupScan {
           }
         } else {
           final Path path = Path.getPathWithoutSchemeAndAuthority(cacheFileRoot);
-          fileSet.add(path.toString());
+          fileSet.add(path);
         }
       }
     }
@@ -436,21 +436,21 @@ public class ParquetGroupScan extends AbstractParquetGroupScan {
       return null;
     }
 
-    List<String> fileNames = new ArrayList<>(fileSet);
+    List<Path> fileNames = new ArrayList<>(fileSet);
 
     // when creating the file selection, set the selection root without the URI prefix
     // The reason is that the file names above have been created in the form
     // /a/b/c.parquet and the format of the selection root must match that of the file names
     // otherwise downstream operations such as partition pruning can break.
-    final Path metaRootPath = Path.getPathWithoutSchemeAndAuthority(new Path(selection.getSelectionRoot()));
-    this.selectionRoot = metaRootPath.toString();
+    final Path metaRootPath = Path.getPathWithoutSchemeAndAuthority(selection.getSelectionRoot());
+    this.selectionRoot = metaRootPath;
 
     // Use the FileSelection constructor directly here instead of the FileSelection.create() method
     // because create() changes the root to include the scheme and authority; In future, if create()
     // is the preferred way to instantiate a file selection, we may need to do something different...
     // WARNING: file statuses and file names are inconsistent
-    FileSelection newSelection = new FileSelection(selection.getStatuses(fs), fileNames, metaRootPath.toString(),
-        cacheFileRoot, selection.wasAllPartitionsPruned());
+    FileSelection newSelection = new FileSelection(selection.getStatuses(fs), fileNames, metaRootPath, cacheFileRoot,
+        selection.wasAllPartitionsPruned());
 
     newSelection.setExpandedFully();
     newSelection.setMetaContext(metaContext);
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScanStatistics.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScanStatistics.java
index 9381043..9156524 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScanStatistics.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScanStatistics.java
@@ -21,6 +21,7 @@ import org.apache.commons.lang3.mutable.MutableLong;
 import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.common.types.TypeProtos;
 import org.apache.drill.exec.physical.base.GroupScan;
+import org.apache.hadoop.fs.Path;
 import org.apache.parquet.schema.OriginalType;
 import org.apache.parquet.schema.PrimitiveType;
 
@@ -41,7 +42,7 @@ import static org.apache.drill.exec.store.parquet.metadata.Metadata_V3.ParquetTa
 public class ParquetGroupScanStatistics {
 
   // map from file names to maps of column name to partition value mappings
-  private Map<String, Map<SchemaPath, Object>> partitionValueMap;
+  private Map<Path, Map<SchemaPath, Object>> partitionValueMap;
   // only for partition columns : value is unique for each partition
   private Map<SchemaPath, TypeProtos.MajorType> partitionColTypeMap;
   // total number of non-null value for each column in parquet files
@@ -78,7 +79,7 @@ public class ParquetGroupScanStatistics {
     return rowCount;
   }
 
-  public Object getPartitionValue(String path, SchemaPath column) {
+  public Object getPartitionValue(Path path, SchemaPath column) {
     return partitionValueMap.get(path).get(column);
   }
 
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRowGroupScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRowGroupScan.java
index eabe2df..2513772 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRowGroupScan.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRowGroupScan.java
@@ -37,6 +37,7 @@ import com.fasterxml.jackson.annotation.JsonProperty;
 import com.fasterxml.jackson.annotation.JsonTypeName;
 import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
 
 // Class containing information for reading a single parquet row group from HDFS
 @JsonTypeName("parquet-row-group-scan")
@@ -45,7 +46,7 @@ public class ParquetRowGroupScan extends AbstractParquetRowGroupScan {
 
   private final ParquetFormatPlugin formatPlugin;
   private final ParquetFormatConfig formatConfig;
-  private final String selectionRoot;
+  private final Path selectionRoot;
 
   @JsonCreator
   public ParquetRowGroupScan(@JacksonInject StoragePluginRegistry registry,
@@ -55,7 +56,7 @@ public class ParquetRowGroupScan extends AbstractParquetRowGroupScan {
                              @JsonProperty("rowGroupReadEntries") LinkedList<RowGroupReadEntry> rowGroupReadEntries,
                              @JsonProperty("columns") List<SchemaPath> columns,
                              @JsonProperty("readerConfig") ParquetReaderConfig readerConfig,
-                             @JsonProperty("selectionRoot") String selectionRoot,
+                             @JsonProperty("selectionRoot") Path selectionRoot,
                              @JsonProperty("filter") LogicalExpression filter) throws ExecutionSetupException {
     this(userName,
         (ParquetFormatPlugin) registry.getFormatPlugin(Preconditions.checkNotNull(storageConfig), Preconditions.checkNotNull(formatConfig)),
@@ -71,7 +72,7 @@ public class ParquetRowGroupScan extends AbstractParquetRowGroupScan {
                              List<RowGroupReadEntry> rowGroupReadEntries,
                              List<SchemaPath> columns,
                              ParquetReaderConfig readerConfig,
-                             String selectionRoot,
+                             Path selectionRoot,
                              LogicalExpression filter) {
     super(userName, rowGroupReadEntries, columns, readerConfig, filter);
     this.formatPlugin = Preconditions.checkNotNull(formatPlugin, "Could not find format config for the given configuration");
@@ -90,7 +91,7 @@ public class ParquetRowGroupScan extends AbstractParquetRowGroupScan {
   }
 
   @JsonProperty
-  public String getSelectionRoot() {
+  public Path getSelectionRoot() {
     return selectionRoot;
   }
 
@@ -127,7 +128,7 @@ public class ParquetRowGroupScan extends AbstractParquetRowGroupScan {
 
   @Override
   public List<String> getPartitionValues(RowGroupReadEntry rowGroupReadEntry) {
-    return ColumnExplorer.listPartitionValues(rowGroupReadEntry.getPath(), selectionRoot);
+    return ColumnExplorer.listPartitionValues(rowGroupReadEntry.getPath(), selectionRoot, false);
   }
 }
 
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java
index f0ef639..8c91200 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java
@@ -28,6 +28,7 @@ import org.apache.drill.exec.record.RecordBatch;
 import org.apache.drill.exec.server.options.OptionManager;
 import org.apache.drill.exec.store.dfs.DrillFileSystem;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
 
 import java.io.IOException;
 import java.util.List;
@@ -61,7 +62,7 @@ public class ParquetScanBatchCreator extends AbstractParquetScanBatchCreator imp
     }
 
     @Override
-    protected DrillFileSystem get(Configuration config, String path) throws ExecutionSetupException {
+    protected DrillFileSystem get(Configuration config, Path path) throws ExecutionSetupException {
       if (fs == null) {
         try {
           fs =  useAsyncPageReader ? operatorContext.newNonTrackingFileSystem(config) : operatorContext.newFileSystem(config);
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/RowGroupInfo.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/RowGroupInfo.java
index 1c9ce10..5fbadd6 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/RowGroupInfo.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/RowGroupInfo.java
@@ -23,6 +23,7 @@ import org.apache.drill.exec.store.dfs.ReadEntryFromHDFS;
 import org.apache.drill.exec.store.dfs.easy.FileWork;
 import org.apache.drill.exec.store.schedule.CompleteWork;
 import org.apache.drill.exec.store.schedule.EndpointByteMap;
+import org.apache.hadoop.fs.Path;
 
 import java.util.List;
 
@@ -37,7 +38,7 @@ public class RowGroupInfo extends ReadEntryFromHDFS implements CompleteWork, Fil
   private long numRecordsToRead;
 
   @JsonCreator
-  public RowGroupInfo(@JsonProperty("path") String path,
+  public RowGroupInfo(@JsonProperty("path") Path path,
                       @JsonProperty("start") long start,
                       @JsonProperty("length") long length,
                       @JsonProperty("rowGroupIndex") int rowGroupIndex,
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/RowGroupReadEntry.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/RowGroupReadEntry.java
index 665179f..be3f50c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/RowGroupReadEntry.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/RowGroupReadEntry.java
@@ -22,6 +22,7 @@ import org.apache.drill.exec.store.dfs.ReadEntryFromHDFS;
 
 import com.fasterxml.jackson.annotation.JsonIgnore;
 import com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.hadoop.fs.Path;
 
 public class RowGroupReadEntry extends ReadEntryFromHDFS {
 
@@ -29,7 +30,7 @@ public class RowGroupReadEntry extends ReadEntryFromHDFS {
   private long numRecordsToRead;
 
   @JsonCreator
-  public RowGroupReadEntry(@JsonProperty("path") String path, @JsonProperty("start") long start,
+  public RowGroupReadEntry(@JsonProperty("path") Path path, @JsonProperty("start") long start,
                            @JsonProperty("length") long length, @JsonProperty("rowGroupIndex") int rowGroupIndex,
                            @JsonProperty("numRecordsToRead") long numRecordsToRead) {
     super(path, start, length);
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetRecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetRecordReader.java
index 17cf8c4..ba4c493 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetRecordReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetRecordReader.java
@@ -90,7 +90,7 @@ public class ParquetRecordReader extends AbstractRecordReader {
   public boolean useBulkReader;
 
   @SuppressWarnings("unused")
-  private String name;
+  private Path name;
 
   public ParquetReaderStats parquetReaderStats = new ParquetReaderStats();
   private BatchReader batchReader;
@@ -123,44 +123,42 @@ public class ParquetRecordReader extends AbstractRecordReader {
   }
 
   public ParquetRecordReader(FragmentContext fragmentContext,
-      String path,
+      Path path,
       int rowGroupIndex,
       long numRecordsToRead,
       FileSystem fs,
       CodecFactory codecFactory,
       ParquetMetadata footer,
       List<SchemaPath> columns,
-      ParquetReaderUtility.DateCorruptionStatus dateCorruptionStatus) throws ExecutionSetupException {
-    this(fragmentContext, numRecordsToRead,
-         path, rowGroupIndex, fs, codecFactory, footer, columns, dateCorruptionStatus);
+      ParquetReaderUtility.DateCorruptionStatus dateCorruptionStatus) {
+    this(fragmentContext, numRecordsToRead, path, rowGroupIndex, fs, codecFactory, footer, columns, dateCorruptionStatus);
   }
 
   public ParquetRecordReader(FragmentContext fragmentContext,
-      String path,
+      Path path,
       int rowGroupIndex,
       FileSystem fs,
       CodecFactory codecFactory,
       ParquetMetadata footer,
       List<SchemaPath> columns,
-      ParquetReaderUtility.DateCorruptionStatus dateCorruptionStatus)
-      throws ExecutionSetupException {
-      this(fragmentContext, footer.getBlocks().get(rowGroupIndex).getRowCount(),
-           path, rowGroupIndex, fs, codecFactory, footer, columns, dateCorruptionStatus);
+      ParquetReaderUtility.DateCorruptionStatus dateCorruptionStatus) {
+    this(fragmentContext, footer.getBlocks().get(rowGroupIndex).getRowCount(), path, rowGroupIndex, fs, codecFactory,
+        footer, columns, dateCorruptionStatus);
   }
 
   public ParquetRecordReader(
       FragmentContext fragmentContext,
       long numRecordsToRead,
-      String path,
+      Path path,
       int rowGroupIndex,
       FileSystem fs,
       CodecFactory codecFactory,
       ParquetMetadata footer,
       List<SchemaPath> columns,
-      ParquetReaderUtility.DateCorruptionStatus dateCorruptionStatus) throws ExecutionSetupException {
+      ParquetReaderUtility.DateCorruptionStatus dateCorruptionStatus) {
 
     this.name = path;
-    this.hadoopPath = new Path(path);
+    this.hadoopPath = path;
     this.fileSystem = fs;
     this.codecFactory = codecFactory;
     this.rowGroupIndex = rowGroupIndex;
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/metadata/Metadata.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/metadata/Metadata.java
index 0db007a..d0e2734 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/metadata/Metadata.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/metadata/Metadata.java
@@ -24,6 +24,7 @@ import com.fasterxml.jackson.databind.DeserializationFeature;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.databind.module.SimpleModule;
 import com.fasterxml.jackson.module.afterburner.AfterburnerModule;
+import org.apache.drill.exec.serialization.PathSerDe;
 import org.apache.drill.exec.store.parquet.ParquetReaderConfig;
 import org.apache.drill.shaded.guava.com.google.common.base.Stopwatch;
 import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
@@ -82,6 +83,9 @@ import static org.apache.drill.exec.store.parquet.metadata.Metadata_V3.ParquetFi
 import static org.apache.drill.exec.store.parquet.metadata.Metadata_V3.ParquetTableMetadata_v3;
 import static org.apache.drill.exec.store.parquet.metadata.Metadata_V3.RowGroupMetadata_v3;
 
+/**
+ * This is an utility class, holder for Parquet Table Metadata and {@link ParquetReaderConfig}
+ */
 public class Metadata {
   private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(Metadata.class);
 
@@ -106,7 +110,7 @@ public class Metadata {
    * @param path path
    * @param readerConfig parquet reader configuration
    */
-  public static void createMeta(FileSystem fs, String path, ParquetReaderConfig readerConfig) throws IOException {
+  public static void createMeta(FileSystem fs, Path path, ParquetReaderConfig readerConfig) throws IOException {
     Metadata metadata = new Metadata(readerConfig);
     metadata.createMetaFilesRecursively(path, fs);
   }
@@ -208,13 +212,13 @@ public class Metadata {
    *         {@code path} directory).
    * @throws IOException if parquet metadata can't be serialized and written to the json file
    */
-  private Pair<ParquetTableMetadata_v3, ParquetTableMetadataDirs> createMetaFilesRecursively(final String path, FileSystem fs) throws IOException {
+  private Pair<ParquetTableMetadata_v3, ParquetTableMetadataDirs> createMetaFilesRecursively(final Path path, FileSystem fs) throws IOException {
     Stopwatch timer = logger.isDebugEnabled() ? Stopwatch.createStarted() : null;
     List<ParquetFileMetadata_v3> metaDataList = Lists.newArrayList();
-    List<String> directoryList = Lists.newArrayList();
+    List<Path> directoryList = Lists.newArrayList();
     ConcurrentHashMap<ColumnTypeMetadata_v3.Key, ColumnTypeMetadata_v3> columnTypeInfoSet =
         new ConcurrentHashMap<>();
-    Path p = new Path(path);
+    Path p = path;
     FileStatus fileStatus = fs.getFileStatus(p);
     assert fileStatus.isDirectory() : "Expected directory";
 
@@ -222,10 +226,10 @@ public class Metadata {
 
     for (final FileStatus file : DrillFileSystemUtil.listAll(fs, p, false)) {
       if (file.isDirectory()) {
-        ParquetTableMetadata_v3 subTableMetadata = (createMetaFilesRecursively(file.getPath().toString(), fs)).getLeft();
+        ParquetTableMetadata_v3 subTableMetadata = (createMetaFilesRecursively(file.getPath(), fs)).getLeft();
         metaDataList.addAll(subTableMetadata.files);
         directoryList.addAll(subTableMetadata.directories);
-        directoryList.add(file.getPath().toString());
+        directoryList.add(file.getPath());
         // Merge the schema from the child level into the current level
         //TODO: We need a merge method that merges two columns with the same name but different types
         columnTypeInfoSet.putAll(subTableMetadata.columnTypeInfo);
@@ -268,7 +272,7 @@ public class Metadata {
       ParquetTableMetadataDirs parquetTableMetadataDirs = new ParquetTableMetadataDirs(directoryList);
       return Pair.of(parquetTableMetadata, parquetTableMetadataDirs);
     }
-    List<String> emptyDirList = Lists.newArrayList();
+    List<Path> emptyDirList = new ArrayList<>();
     if (timer != null) {
       logger.debug("Creating metadata files recursively took {} ms", timer.elapsed(TimeUnit.MILLISECONDS));
       timer.stop();
@@ -495,7 +499,7 @@ public class Metadata {
 
       rowGroupMetadataList.add(rowGroupMeta);
     }
-    String path = Path.getPathWithoutSchemeAndAuthority(file.getPath()).toString();
+    Path path = Path.getPathWithoutSchemeAndAuthority(file.getPath());
 
     return new ParquetFileMetadata_v3(path, file.getLen(), rowGroupMetadataList);
   }
@@ -535,6 +539,8 @@ public class Metadata {
    *
    * @param parquetTableMetadata parquet table metadata
    * @param p file path
+   * @param fs Drill file system
+   * @throws IOException if metadata can't be serialized
    */
   private void writeFile(ParquetTableMetadata_v3 parquetTableMetadata, Path p, FileSystem fs) throws IOException {
     JsonFactory jsonFactory = new JsonFactory();
@@ -542,6 +548,7 @@ public class Metadata {
     jsonFactory.configure(JsonParser.Feature.AUTO_CLOSE_SOURCE, false);
     ObjectMapper mapper = new ObjectMapper(jsonFactory);
     SimpleModule module = new SimpleModule();
+    module.addSerializer(Path.class, new PathSerDe.Se());
     module.addSerializer(ColumnMetadata_v3.class, new ColumnMetadata_v3.Serializer());
     mapper.registerModule(module);
     OutputStream os = fs.create(p);
@@ -556,6 +563,7 @@ public class Metadata {
     jsonFactory.configure(JsonParser.Feature.AUTO_CLOSE_SOURCE, false);
     ObjectMapper mapper = new ObjectMapper(jsonFactory);
     SimpleModule module = new SimpleModule();
+    module.addSerializer(Path.class, new PathSerDe.Se());
     mapper.registerModule(module);
     OutputStream os = fs.create(p);
     mapper.writerWithDefaultPrettyPrinter().writeValue(os, parquetTableMetadataDirs);
@@ -602,7 +610,7 @@ public class Metadata {
         parquetTableMetadataDirs.updateRelativePaths(metadataParentDirPath);
         if (!alreadyCheckedModification && tableModified(parquetTableMetadataDirs.getDirectories(), path, metadataParentDir, metaContext, fs)) {
           parquetTableMetadataDirs =
-              (createMetaFilesRecursively(Path.getPathWithoutSchemeAndAuthority(path.getParent()).toString(), fs)).getRight();
+              (createMetaFilesRecursively(Path.getPathWithoutSchemeAndAuthority(path.getParent()), fs)).getRight();
           newMetadata = true;
         }
       } else {
@@ -616,7 +624,7 @@ public class Metadata {
         }
         if (!alreadyCheckedModification && tableModified(parquetTableMetadata.getDirectories(), path, metadataParentDir, metaContext, fs)) {
           parquetTableMetadata =
-              (createMetaFilesRecursively(Path.getPathWithoutSchemeAndAuthority(path.getParent()).toString(), fs)).getLeft();
+              (createMetaFilesRecursively(Path.getPathWithoutSchemeAndAuthority(path.getParent()), fs)).getLeft();
           newMetadata = true;
         }
 
@@ -647,9 +655,10 @@ public class Metadata {
    * @return true if metadata needs to be updated, false otherwise
    * @throws IOException if some resources are not accessible
    */
-  private boolean tableModified(List<String> directories, Path metaFilePath, Path parentDir, MetadataContext metaContext, FileSystem fs) throws IOException {
+  private boolean tableModified(List<Path> directories, Path metaFilePath, Path parentDir,
+                                MetadataContext metaContext, FileSystem fs) throws IOException {
     Stopwatch timer = logger.isDebugEnabled() ? Stopwatch.createStarted() : null;
-    metaContext.setStatus(parentDir.toUri().getPath());
+    metaContext.setStatus(parentDir);
     long metaFileModifyTime = fs.getFileStatus(metaFilePath).getModificationTime();
     FileStatus directoryStatus = fs.getFileStatus(parentDir);
     int numDirs = 1;
@@ -661,10 +670,10 @@ public class Metadata {
       }
       return true;
     }
-    for (String directory : directories) {
+    for (Path directory : directories) {
       numDirs++;
       metaContext.setStatus(directory);
-      directoryStatus = fs.getFileStatus(new Path(directory));
+      directoryStatus = fs.getFileStatus(directory);
       if (directoryStatus.getModificationTime() > metaFileModifyTime) {
         if (timer != null) {
           logger.debug("Directory {} was modified. Took {} ms to check modification time of {} directories",
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/metadata/MetadataBase.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/metadata/MetadataBase.java
index bed8be6..ee07470 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/metadata/MetadataBase.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/metadata/MetadataBase.java
@@ -20,6 +20,7 @@ package org.apache.drill.exec.store.parquet.metadata;
 import com.fasterxml.jackson.annotation.JsonIgnore;
 import com.fasterxml.jackson.annotation.JsonSubTypes;
 import com.fasterxml.jackson.annotation.JsonTypeInfo;
+import org.apache.hadoop.fs.Path;
 import org.apache.parquet.schema.OriginalType;
 import org.apache.parquet.schema.PrimitiveType;
 
@@ -57,7 +58,7 @@ public class MetadataBase {
   public static abstract class ParquetTableMetadataBase {
 
     @JsonIgnore
-    public abstract List<String> getDirectories();
+    public abstract List<Path> getDirectories();
 
     @JsonIgnore public abstract List<? extends ParquetFileMetadata> getFiles();
 
@@ -83,7 +84,7 @@ public class MetadataBase {
   }
 
   public static abstract class ParquetFileMetadata {
-    @JsonIgnore public abstract String getPath();
+    @JsonIgnore public abstract Path getPath();
 
     @JsonIgnore public abstract Long getLength();
 
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/metadata/MetadataPathUtils.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/metadata/MetadataPathUtils.java
index 3e7c2ff..2794e2b 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/metadata/MetadataPathUtils.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/metadata/MetadataPathUtils.java
@@ -21,6 +21,7 @@ import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
 import org.apache.drill.common.util.DrillVersionInfo;
 import org.apache.hadoop.fs.Path;
 
+import java.util.ArrayList;
 import java.util.List;
 
 import static org.apache.drill.exec.store.parquet.metadata.MetadataVersion.Constants.SUPPORTED_VERSIONS;
@@ -39,12 +40,12 @@ public class MetadataPathUtils {
    * @param baseDir base parent directory
    * @return list of absolute paths
    */
-  public static List<String> convertToAbsolutePaths(List<String> paths, String baseDir) {
+  public static List<Path> convertToAbsolutePaths(List<Path> paths, String baseDir) {
     if (!paths.isEmpty()) {
-      List<String> absolutePaths = Lists.newArrayList();
-      for (String relativePath : paths) {
-        String absolutePath = (new Path(relativePath).isAbsolute()) ? relativePath
-            : new Path(baseDir, relativePath).toUri().getPath();
+      List<Path> absolutePaths = Lists.newArrayList();
+      for (Path relativePath : paths) {
+        Path absolutePath = (relativePath.isAbsolute()) ? relativePath
+            : new Path(baseDir, relativePath);
         absolutePaths.add(absolutePath);
       }
       return absolutePaths;
@@ -64,10 +65,10 @@ public class MetadataPathUtils {
     if (!files.isEmpty()) {
       List<ParquetFileMetadata_v3> filesWithAbsolutePaths = Lists.newArrayList();
       for (ParquetFileMetadata_v3 file : files) {
-        Path relativePath = new Path(file.getPath());
+        Path relativePath = file.getPath();
         // create a new file if old one contains a relative path, otherwise use an old file
         ParquetFileMetadata_v3 fileWithAbsolutePath = (relativePath.isAbsolute()) ? file
-            : new ParquetFileMetadata_v3(new Path(baseDir, relativePath).toUri().getPath(), file.length, file.rowGroups);
+            : new ParquetFileMetadata_v3(new Path(baseDir, relativePath), file.length, file.rowGroups);
         filesWithAbsolutePaths.add(fileWithAbsolutePath);
       }
       return filesWithAbsolutePaths;
@@ -84,9 +85,9 @@ public class MetadataPathUtils {
    * @return parquet table metadata with relative paths for the files and directories
    */
   public static ParquetTableMetadata_v3 createMetadataWithRelativePaths(
-      ParquetTableMetadata_v3 tableMetadataWithAbsolutePaths, String baseDir) {
-    List<String> directoriesWithRelativePaths = Lists.newArrayList();
-    for (String directory : tableMetadataWithAbsolutePaths.getDirectories()) {
+      ParquetTableMetadata_v3 tableMetadataWithAbsolutePaths, Path baseDir) {
+    List<Path> directoriesWithRelativePaths = new ArrayList<>();
+    for (Path directory : tableMetadataWithAbsolutePaths.getDirectories()) {
       directoriesWithRelativePaths.add(relativize(baseDir, directory));
     }
     List<ParquetFileMetadata_v3> filesWithRelativePaths = Lists.newArrayList();
@@ -105,9 +106,9 @@ public class MetadataPathUtils {
    * @param baseDir base path (the part of the Path, which should be cut off from child path)
    * @return relative path
    */
-  public static String relativize(String baseDir, String childPath) {
-    Path fullPathWithoutSchemeAndAuthority = Path.getPathWithoutSchemeAndAuthority(new Path(childPath));
-    Path basePathWithoutSchemeAndAuthority = Path.getPathWithoutSchemeAndAuthority(new Path(baseDir));
+  public static Path relativize(Path baseDir, Path childPath) {
+    Path fullPathWithoutSchemeAndAuthority = Path.getPathWithoutSchemeAndAuthority(childPath);
+    Path basePathWithoutSchemeAndAuthority = Path.getPathWithoutSchemeAndAuthority(baseDir);
 
     // Since hadoop Path hasn't relativize() we use uri.relativize() to get relative path
     Path relativeFilePath = new Path(basePathWithoutSchemeAndAuthority.toUri()
@@ -116,7 +117,7 @@ public class MetadataPathUtils {
       throw new IllegalStateException(String.format("Path %s is not a subpath of %s.",
           basePathWithoutSchemeAndAuthority.toUri().getPath(), fullPathWithoutSchemeAndAuthority.toUri().getPath()));
     }
-    return relativeFilePath.toUri().getPath();
+    return relativeFilePath;
   }
 
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/metadata/Metadata_V1.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/metadata/Metadata_V1.java
index 92feb5f..4b0dca8 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/metadata/Metadata_V1.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/metadata/Metadata_V1.java
@@ -21,6 +21,7 @@ import com.fasterxml.jackson.annotation.JsonIgnore;
 import com.fasterxml.jackson.annotation.JsonProperty;
 import com.fasterxml.jackson.annotation.JsonTypeName;
 import org.apache.drill.common.expression.SchemaPath;
+import org.apache.hadoop.fs.Path;
 import org.apache.parquet.io.api.Binary;
 import org.apache.parquet.schema.OriginalType;
 import org.apache.parquet.schema.PrimitiveType;
@@ -43,19 +44,19 @@ public class Metadata_V1 {
     @JsonProperty(value = "metadata_version", access = JsonProperty.Access.WRITE_ONLY) private String metadataVersion;
     @JsonProperty
     List<ParquetFileMetadata_v1> files;
-    @JsonProperty List<String> directories;
+    @JsonProperty List<Path> directories;
 
     public ParquetTableMetadata_v1() {
     }
 
-    public ParquetTableMetadata_v1(String metadataVersion, List<ParquetFileMetadata_v1> files, List<String> directories) {
+    public ParquetTableMetadata_v1(String metadataVersion, List<ParquetFileMetadata_v1> files, List<Path> directories) {
       this.metadataVersion = metadataVersion;
       this.files = files;
       this.directories = directories;
     }
 
     @JsonIgnore
-    @Override public List<String> getDirectories() {
+    @Override public List<Path> getDirectories() {
       return directories;
     }
 
@@ -114,7 +115,7 @@ public class Metadata_V1 {
    */
   public static class ParquetFileMetadata_v1 extends ParquetFileMetadata {
     @JsonProperty
-    public String path;
+    public Path path;
     @JsonProperty
     public Long length;
     @JsonProperty
@@ -123,7 +124,7 @@ public class Metadata_V1 {
     public ParquetFileMetadata_v1() {
     }
 
-    public ParquetFileMetadata_v1(String path, Long length, List<RowGroupMetadata_v1> rowGroups) {
+    public ParquetFileMetadata_v1(Path path, Long length, List<RowGroupMetadata_v1> rowGroups) {
       this.path = path;
       this.length = length;
       this.rowGroups = rowGroups;
@@ -134,7 +135,7 @@ public class Metadata_V1 {
       return String.format("path: %s rowGroups: %s", path, rowGroups);
     }
 
-    @JsonIgnore @Override public String getPath() {
+    @JsonIgnore @Override public Path getPath() {
       return path;
     }
 
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/metadata/Metadata_V2.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/metadata/Metadata_V2.java
index 7eddc12..a78fca4 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/metadata/Metadata_V2.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/metadata/Metadata_V2.java
@@ -28,6 +28,7 @@ import com.fasterxml.jackson.databind.JsonDeserializer;
 import com.fasterxml.jackson.databind.JsonSerializer;
 import com.fasterxml.jackson.databind.KeyDeserializer;
 import com.fasterxml.jackson.databind.SerializerProvider;
+import org.apache.hadoop.fs.Path;
 import org.apache.parquet.io.api.Binary;
 import org.apache.parquet.schema.OriginalType;
 import org.apache.parquet.schema.PrimitiveType;
@@ -59,7 +60,7 @@ public class Metadata_V2 {
     @JsonProperty public ConcurrentHashMap<ColumnTypeMetadata_v2.Key, ColumnTypeMetadata_v2> columnTypeInfo;
     @JsonProperty
     List<ParquetFileMetadata_v2> files;
-    @JsonProperty List<String> directories;
+    @JsonProperty List<Path> directories;
     @JsonProperty String drillVersion;
 
     public ParquetTableMetadata_v2() {
@@ -71,7 +72,7 @@ public class Metadata_V2 {
     }
 
     public ParquetTableMetadata_v2(String metadataVersion, ParquetTableMetadataBase parquetTable,
-                                   List<ParquetFileMetadata_v2> files, List<String> directories, String drillVersion) {
+                                   List<ParquetFileMetadata_v2> files, List<Path> directories, String drillVersion) {
       this.metadataVersion = metadataVersion;
       this.files = files;
       this.directories = directories;
@@ -79,7 +80,7 @@ public class Metadata_V2 {
       this.drillVersion = drillVersion;
     }
 
-    public ParquetTableMetadata_v2(String metadataVersion, List<ParquetFileMetadata_v2> files, List<String> directories,
+    public ParquetTableMetadata_v2(String metadataVersion, List<ParquetFileMetadata_v2> files, List<Path> directories,
                                    ConcurrentHashMap<ColumnTypeMetadata_v2.Key, ColumnTypeMetadata_v2> columnTypeInfo, String drillVersion) {
       this.metadataVersion = metadataVersion;
       this.files = files;
@@ -93,7 +94,7 @@ public class Metadata_V2 {
     }
 
     @JsonIgnore
-    @Override public List<String> getDirectories() {
+    @Override public List<Path> getDirectories() {
       return directories;
     }
 
@@ -152,14 +153,14 @@ public class Metadata_V2 {
    * Struct which contains the metadata for a single parquet file
    */
   public static class ParquetFileMetadata_v2 extends ParquetFileMetadata {
-    @JsonProperty public String path;
+    @JsonProperty public Path path;
     @JsonProperty public Long length;
     @JsonProperty public List<RowGroupMetadata_v2> rowGroups;
 
     public ParquetFileMetadata_v2() {
     }
 
-    public ParquetFileMetadata_v2(String path, Long length, List<RowGroupMetadata_v2> rowGroups) {
+    public ParquetFileMetadata_v2(Path path, Long length, List<RowGroupMetadata_v2> rowGroups) {
       this.path = path;
       this.length = length;
       this.rowGroups = rowGroups;
@@ -169,7 +170,7 @@ public class Metadata_V2 {
       return String.format("path: %s rowGroups: %s", path, rowGroups);
     }
 
-    @JsonIgnore @Override public String getPath() {
+    @JsonIgnore @Override public Path getPath() {
       return path;
     }
 
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/metadata/Metadata_V3.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/metadata/Metadata_V3.java
index 4bb07f7..a5ff897 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/metadata/Metadata_V3.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/metadata/Metadata_V3.java
@@ -25,6 +25,7 @@ import com.fasterxml.jackson.databind.JsonSerializer;
 import com.fasterxml.jackson.databind.KeyDeserializer;
 import com.fasterxml.jackson.databind.SerializerProvider;
 import org.apache.drill.common.expression.SchemaPath;
+import org.apache.hadoop.fs.Path;
 import org.apache.parquet.io.api.Binary;
 import org.apache.parquet.schema.OriginalType;
 import org.apache.parquet.schema.PrimitiveType;
@@ -54,7 +55,7 @@ public class Metadata_V3 {
     @JsonProperty public ConcurrentHashMap<ColumnTypeMetadata_v3.Key, ColumnTypeMetadata_v3> columnTypeInfo;
     @JsonProperty
     List<ParquetFileMetadata_v3> files;
-    @JsonProperty List<String> directories;
+    @JsonProperty List<Path> directories;
     @JsonProperty String drillVersion;
 
     /**
@@ -74,7 +75,7 @@ public class Metadata_V3 {
     }
 
     public ParquetTableMetadata_v3(String metadataVersion, ParquetTableMetadataBase parquetTable,
-                                   List<ParquetFileMetadata_v3> files, List<String> directories, String drillVersion) {
+                                   List<ParquetFileMetadata_v3> files, List<Path> directories, String drillVersion) {
       this.metadataVersion = metadataVersion;
       this.files = files;
       this.directories = directories;
@@ -82,7 +83,7 @@ public class Metadata_V3 {
       this.drillVersion = drillVersion;
     }
 
-    public ParquetTableMetadata_v3(String metadataVersion, List<ParquetFileMetadata_v3> files, List<String> directories,
+    public ParquetTableMetadata_v3(String metadataVersion, List<ParquetFileMetadata_v3> files, List<Path> directories,
                                    ConcurrentHashMap<ColumnTypeMetadata_v3.Key, ColumnTypeMetadata_v3> columnTypeInfo,
                                    String drillVersion) {
       this.metadataVersion = metadataVersion;
@@ -97,7 +98,7 @@ public class Metadata_V3 {
     }
 
     @JsonIgnore
-    @Override public List<String> getDirectories() {
+    @Override public List<Path> getDirectories() {
       return directories;
     }
 
@@ -168,14 +169,14 @@ public class Metadata_V3 {
    * Struct which contains the metadata for a single parquet file
    */
   public static class ParquetFileMetadata_v3 extends ParquetFileMetadata {
-    @JsonProperty public String path;
+    @JsonProperty public Path path;
     @JsonProperty public Long length;
     @JsonProperty public List<RowGroupMetadata_v3> rowGroups;
 
     public ParquetFileMetadata_v3() {
     }
 
-    public ParquetFileMetadata_v3(String path, Long length, List<RowGroupMetadata_v3> rowGroups) {
+    public ParquetFileMetadata_v3(Path path, Long length, List<RowGroupMetadata_v3> rowGroups) {
       this.path = path;
       this.length = length;
       this.rowGroups = rowGroups;
@@ -185,7 +186,7 @@ public class Metadata_V3 {
       return String.format("path: %s rowGroups: %s", path, rowGroups);
     }
 
-    @JsonIgnore @Override public String getPath() {
+    @JsonIgnore @Override public Path getPath() {
       return path;
     }
 
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/metadata/ParquetTableMetadataDirs.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/metadata/ParquetTableMetadataDirs.java
index 186f534..b1fd7f2 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/metadata/ParquetTableMetadataDirs.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/metadata/ParquetTableMetadataDirs.java
@@ -19,24 +19,25 @@ package org.apache.drill.exec.store.parquet.metadata;
 
 import com.fasterxml.jackson.annotation.JsonIgnore;
 import com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.hadoop.fs.Path;
 
 import java.util.List;
 
 public class ParquetTableMetadataDirs {
 
   @JsonProperty
-  List<String> directories;
+  List<Path> directories;
 
   public ParquetTableMetadataDirs() {
     // default constructor needed for deserialization
   }
 
-  public ParquetTableMetadataDirs(List<String> directories) {
+  public ParquetTableMetadataDirs(List<Path> directories) {
     this.directories = directories;
   }
 
   @JsonIgnore
-  public List<String> getDirectories() {
+  public List<Path> getDirectories() {
     return directories;
   }
 
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetReader.java
index 09c016a..5c49e04 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetReader.java
@@ -235,7 +235,7 @@ public class DrillParquetReader extends AbstractRecordReader {
         paths.put(md.getPath(), md);
       }
 
-      Path filePath = new Path(entry.getPath());
+      Path filePath = entry.getPath();
 
       BlockMetaData blockMetaData = footer.getBlocks().get(entry.getRowGroupIndex());
 
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/PcapRecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/PcapRecordReader.java
index d688f3b..aef56a3 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/PcapRecordReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/PcapRecordReader.java
@@ -92,11 +92,11 @@ public class PcapRecordReader extends AbstractRecordReader {
         .build();
   }
 
-  public PcapRecordReader(final String pathToFile,
+  public PcapRecordReader(final Path pathToFile,
                           final FileSystem fileSystem,
                           final List<SchemaPath> projectedColumns) {
     this.fs = fileSystem;
-    this.pathToFile = fs.makeQualified(new Path(pathToFile));
+    this.pathToFile = fs.makeQualified(pathToFile);
     this.projectedColumns = projectedColumns;
   }
 
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcapng/PcapngRecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcapng/PcapngRecordReader.java
index b1c5f24..0ad234d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcapng/PcapngRecordReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcapng/PcapngRecordReader.java
@@ -65,11 +65,11 @@ public class PcapngRecordReader extends AbstractRecordReader {
 
   private Iterator<IPcapngType> it;
 
-  public PcapngRecordReader(final String pathToFile,
+  public PcapngRecordReader(final Path pathToFile,
                             final FileSystem fileSystem,
                             final List<SchemaPath> columns) {
     this.fs = fileSystem;
-    this.pathToFile = fs.makeQualified(new Path(pathToFile));
+    this.pathToFile = fs.makeQualified(pathToFile);
     this.columns = columns;
     setColumns(columns);
   }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/schedule/BlockMapBuilder.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/schedule/BlockMapBuilder.java
index 7de31a0..6bc7bb0 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/schedule/BlockMapBuilder.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/schedule/BlockMapBuilder.java
@@ -116,8 +116,8 @@ public class BlockMapBuilder {
         try {
           ImmutableRangeMap<Long, BlockLocation> rangeMap = getBlockMap(status);
           for (Entry<Range<Long>, BlockLocation> l : rangeMap.asMapOfRanges().entrySet()) {
-            work.add(new CompleteFileWork(getEndpointByteMap(noDrillbitHosts, new FileStatusWork(status)), l.getValue().getOffset(), l.getValue().getLength(), status.getPath()
-              .toString()));
+            work.add(new CompleteFileWork(getEndpointByteMap(noDrillbitHosts, new FileStatusWork(status)),
+                l.getValue().getOffset(), l.getValue().getLength(), status.getPath()));
           }
         } catch (IOException e) {
           logger.warn("failure while generating file work.", e);
@@ -127,7 +127,8 @@ public class BlockMapBuilder {
 
 
       if (!blockify || error || compressed(status)) {
-        work.add(new CompleteFileWork(getEndpointByteMap(noDrillbitHosts, new FileStatusWork(status)), 0, status.getLen(), status.getPath().toString()));
+        work.add(new CompleteFileWork(getEndpointByteMap(noDrillbitHosts, new FileStatusWork(status)), 0,
+            status.getLen(), status.getPath()));
       }
 
       // This if-condition is specific for empty CSV file
@@ -135,7 +136,8 @@ public class BlockMapBuilder {
       // And if this CSV file is empty, rangeMap would be empty also
       // Therefore, at the point before this if-condition, work would not be populated
       if(work.isEmpty()) {
-        work.add(new CompleteFileWork(getEndpointByteMap(noDrillbitHosts, new FileStatusWork(status)), 0, 0, status.getPath().toString()));
+        work.add(new CompleteFileWork(getEndpointByteMap(noDrillbitHosts, new FileStatusWork(status)), 0, 0,
+            status.getPath()));
       }
 
       if (noDrillbitHosts != null) {
@@ -162,8 +164,8 @@ public class BlockMapBuilder {
     }
 
     @Override
-    public String getPath() {
-      return status.getPath().toString();
+    public Path getPath() {
+      return status.getPath();
     }
 
     @Override
@@ -231,20 +233,20 @@ public class BlockMapBuilder {
    */
   public EndpointByteMap getEndpointByteMap(Set<String> noDrillbitHosts, FileWork work) throws IOException {
     Stopwatch watch = Stopwatch.createStarted();
-    Path fileName = new Path(work.getPath());
+    Path fileName = work.getPath();
 
 
-    ImmutableRangeMap<Long,BlockLocation> blockMap = getBlockMap(fileName);
+    ImmutableRangeMap<Long, BlockLocation> blockMap = getBlockMap(fileName);
     EndpointByteMapImpl endpointByteMap = new EndpointByteMapImpl();
     long start = work.getStart();
     long end = start + work.getLength();
     Range<Long> rowGroupRange = Range.closedOpen(start, end);
 
     // Find submap of ranges that intersect with the rowGroup
-    ImmutableRangeMap<Long,BlockLocation> subRangeMap = blockMap.subRangeMap(rowGroupRange);
+    ImmutableRangeMap<Long, BlockLocation> subRangeMap = blockMap.subRangeMap(rowGroupRange);
 
     // Iterate through each block in this submap and get the host for the block location
-    for (Map.Entry<Range<Long>,BlockLocation> block : subRangeMap.asMapOfRanges().entrySet()) {
+    for (Map.Entry<Range<Long>, BlockLocation> block : subRangeMap.asMapOfRanges().entrySet()) {
       String[] hosts;
       Range<Long> blockRange = block.getKey();
       try {
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/schedule/CompleteFileWork.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/schedule/CompleteFileWork.java
index 04c4eb0..4b0402b 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/schedule/CompleteFileWork.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/schedule/CompleteFileWork.java
@@ -21,16 +21,17 @@ import org.apache.drill.exec.store.dfs.easy.FileWork;
 
 import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.hadoop.fs.Path;
 
 public class CompleteFileWork implements FileWork, CompleteWork {
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(CompleteFileWork.class);
 
   private long start;
   private long length;
-  private String path;
+  private Path path;
   private EndpointByteMap byteMap;
 
-  public CompleteFileWork(EndpointByteMap byteMap, long start, long length, String path) {
+  public CompleteFileWork(EndpointByteMap byteMap, long start, long length, Path path) {
     super();
     this.start = start;
     this.length = length;
@@ -69,7 +70,7 @@ public class CompleteFileWork implements FileWork, CompleteWork {
   }
 
   @Override
-  public String getPath() {
+  public Path getPath() {
     return path;
   }
 
@@ -87,22 +88,28 @@ public class CompleteFileWork implements FileWork, CompleteWork {
     return new FileWorkImpl(start, length, path);
   }
 
-  public static class FileWorkImpl implements FileWork{
+  @Override
+  public String toString() {
+    return String.format("File: %s start: %d length: %d", path, start, length);
+  }
+
+  public static class FileWorkImpl implements FileWork {
+
+    private long start;
+    private long length;
+    private Path path;
 
     @JsonCreator
-    public FileWorkImpl(@JsonProperty("start") long start, @JsonProperty("length") long length, @JsonProperty("path") String path) {
-      super();
+    public FileWorkImpl(@JsonProperty("start") long start,
+                        @JsonProperty("length") long length,
+                        @JsonProperty("path") Path path) {
       this.start = start;
       this.length = length;
       this.path = path;
     }
 
-    public long start;
-    public long length;
-    public String path;
-
     @Override
-    public String getPath() {
+    public Path getPath() {
       return path;
     }
 
@@ -116,10 +123,13 @@ public class CompleteFileWork implements FileWork, CompleteWork {
       return length;
     }
 
-  }
-
-  @Override
-  public String toString() {
-    return String.format("File: %s start: %d length: %d", path, start, length);
+    @Override
+    public String toString() {
+      return "FileWorkImpl{" +
+          "start=" + start +
+          ", length=" + length +
+          ", path=" + path +
+          '}';
+    }
   }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/util/DrillFileSystemUtil.java b/exec/java-exec/src/main/java/org/apache/drill/exec/util/DrillFileSystemUtil.java
index fcee3b0..bfb83e0 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/util/DrillFileSystemUtil.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/util/DrillFileSystemUtil.java
@@ -18,6 +18,7 @@
 package org.apache.drill.exec.util;
 
 import org.apache.drill.exec.store.dfs.DrillFileSystem;
+import org.apache.drill.shaded.guava.com.google.common.base.Strings;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -133,4 +134,14 @@ public class DrillFileSystemUtil {
     return FileSystemUtil.listAllSafe(fs, path, recursive, FileSystemUtil.mergeFilters(DRILL_SYSTEM_FILTER, filters));
   }
 
+  /**
+   * Safely creates Hadoop Path for null and empty String paths
+   *
+   * @param path String path, which can be null or empty
+   * @return Hadoop Path. Root - for empty or null path
+   */
+  public static Path createPathSafe(String path) {
+    return Strings.isNullOrEmpty(path) ? new Path("/") : new Path(path);
+  }
+
 }
diff --git a/exec/java-exec/src/test/java/org/apache/drill/PlanTestBase.java b/exec/java-exec/src/test/java/org/apache/drill/PlanTestBase.java
index 9e0d95c..247d784 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/PlanTestBase.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/PlanTestBase.java
@@ -143,8 +143,7 @@ public class PlanTestBase extends BaseTestQuery {
    *                     planning process throws an exception
    */
   public static void testPlanWithAttributesMatchingPatterns(String query, String[] expectedPatterns,
-                                                            String[] excludedPatterns)
-          throws Exception {
+                                                            String[] excludedPatterns) throws Exception {
     final String plan = getPlanInString("EXPLAIN PLAN INCLUDING ALL ATTRIBUTES for " +
             QueryTestUtil.normalizeQuery(query), OPTIQ_FORMAT);
 
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/TestPathSerialization.java b/exec/java-exec/src/test/java/org/apache/drill/exec/TestPathSerialization.java
new file mode 100644
index 0000000..cb93565
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/TestPathSerialization.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec;
+
+
+import com.fasterxml.jackson.databind.module.SimpleModule;
+import org.apache.drill.exec.serialization.PathSerDe;
+import org.apache.drill.exec.store.schedule.CompleteFileWork;
+import org.apache.drill.test.DrillTest;
+import org.apache.hadoop.fs.Path;
+import org.junit.Test;
+
+import java.io.IOException;
+
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.junit.Assert.assertThat;
+
+public class TestPathSerialization extends DrillTest {
+
+  @Test
+  public void testDeSerializingWithJsonCreator() throws IOException {
+
+    String jsonString = "{\"start\": 1, \"length\": 2, \"path\": \"/tmp/drill/test\"}";
+
+    SimpleModule module = new SimpleModule();
+    module.addSerializer(Path.class, new PathSerDe.Se());
+    objectMapper.registerModule(module);
+
+    CompleteFileWork.FileWorkImpl bean = objectMapper.readValue(jsonString, CompleteFileWork.FileWorkImpl.class);
+
+    assertThat(bean.getStart() == 1,  equalTo( true ));
+    assertThat(bean.getLength() == 2, equalTo( true ));
+    assertThat(bean.getPath().equals(new Path("/tmp/drill/test")), equalTo( true ));
+  }
+
+  @Test
+  public void testHadoopPathSerDe() throws IOException {
+    CompleteFileWork.FileWorkImpl fileWork = new CompleteFileWork.FileWorkImpl(5, 6, new Path("/tmp"));
+    SimpleModule module = new SimpleModule();
+    module.addSerializer(Path.class, new PathSerDe.Se());
+    objectMapper.registerModule(module);
+
+    CompleteFileWork.FileWorkImpl bean =
+        objectMapper.readValue(objectMapper.writeValueAsString(fileWork), CompleteFileWork.FileWorkImpl.class);
+
+    assertThat(bean.getStart() == 5,  equalTo( true ));
+    assertThat(bean.getLength() == 6, equalTo( true ));
+    assertThat(bean.getPath().equals(new Path("/tmp")), equalTo( true ));
+  }
+}
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestFileScanFramework.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestFileScanFramework.java
index 2fdf3e6..b05bb28 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestFileScanFramework.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestFileScanFramework.java
@@ -85,7 +85,7 @@ public class TestFileScanFramework extends SubOperatorTest {
     }
 
     @Override
-    public String getPath() { return path.toString(); }
+    public Path getPath() { return path; }
 
     @Override
     public long getStart() { return 0; }
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/MiniPlanUnitTestBase.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/MiniPlanUnitTestBase.java
index 9a2bb1f..911a097 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/MiniPlanUnitTestBase.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/MiniPlanUnitTestBase.java
@@ -32,6 +32,7 @@ import org.apache.drill.exec.store.dfs.DrillFileSystem;
 import org.apache.drill.exec.store.parquet.ParquetDirectByteBufferAllocator;
 import org.apache.drill.exec.store.parquet.ParquetReaderUtility;
 import org.apache.drill.exec.store.parquet.columnreaders.ParquetRecordReader;
+import org.apache.drill.test.LegacyOperatorTestBuilder;
 import org.apache.drill.test.PhysicalOpUnitTestBase;
 import org.apache.hadoop.fs.Path;
 import org.apache.parquet.hadoop.CodecFactory;
@@ -358,7 +359,7 @@ public class MiniPlanUnitTestBase extends PhysicalOpUnitTestBase {
    */
   public class JsonScanBuilder extends ScanPopBuider<JsonScanBuilder> {
     List<String> jsonBatches = null;
-    List<String> inputPaths = Collections.emptyList();
+    List<Path> inputPaths = Collections.emptyList();
 
     public JsonScanBuilder(PopBuilder parent) {
       super(parent);
@@ -373,7 +374,7 @@ public class MiniPlanUnitTestBase extends PhysicalOpUnitTestBase {
       return this;
     }
 
-    public JsonScanBuilder inputPaths(List<String> inputPaths) {
+    public JsonScanBuilder inputPaths(List<Path> inputPaths) {
       this.inputPaths = inputPaths;
       return this;
     }
@@ -412,7 +413,7 @@ public class MiniPlanUnitTestBase extends PhysicalOpUnitTestBase {
    * Builder for parquet Scan RecordBatch.
    */
   public class ParquetScanBuilder extends ScanPopBuider<ParquetScanBuilder> {
-    List<String> inputPaths = Collections.emptyList();
+    List<Path> inputPaths = Collections.emptyList();
 
     public ParquetScanBuilder() {
       super();
@@ -422,7 +423,7 @@ public class MiniPlanUnitTestBase extends PhysicalOpUnitTestBase {
       super(parent);
     }
 
-    public ParquetScanBuilder inputPaths(List<String> inputPaths) {
+    public ParquetScanBuilder inputPaths(List<Path> inputPaths) {
       this.inputPaths = inputPaths;
       return this;
     }
@@ -443,8 +444,8 @@ public class MiniPlanUnitTestBase extends PhysicalOpUnitTestBase {
     private RecordBatch getScanBatch() throws Exception {
       List<RecordReader> readers = new LinkedList<>();
 
-      for (String path : inputPaths) {
-        ParquetMetadata footer = ParquetFileReader.readFooter(fs.getConf(), new Path(path));
+      for (Path path : inputPaths) {
+        ParquetMetadata footer = ParquetFileReader.readFooter(fs.getConf(), path);
 
         for (int i = 0; i < footer.getBlocks().size(); i++) {
           readers.add(new ParquetRecordReader(fragContext,
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/TestMiniPlan.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/TestMiniPlan.java
index cc259cc..a1b7001 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/TestMiniPlan.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/TestMiniPlan.java
@@ -32,6 +32,7 @@ import org.apache.drill.exec.record.metadata.SchemaBuilder;
 import org.apache.drill.exec.store.dfs.DrillFileSystem;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
 import org.junit.BeforeClass;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
@@ -59,11 +60,11 @@ public class TestMiniPlan extends MiniPlanUnitTestBase {
   @Test
   public void testSimpleParquetScan() throws Exception {
     String file = DrillFileUtils.getResourceAsFile("/tpchmulti/region/01.parquet").toURI().toString();
-
+    List<Path> filePath = Collections.singletonList(new Path(file));
     RecordBatch scanBatch = new ParquetScanBuilder()
         .fileSystem(fs)
         .columnsToRead("R_REGIONKEY")
-        .inputPaths(Lists.newArrayList(file))
+        .inputPaths(filePath)
         .build();
 
     BatchSchema expectedSchema = new SchemaBuilder()
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/TestNullInputMiniPlan.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/TestNullInputMiniPlan.java
index bdff80b..69a4214 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/TestNullInputMiniPlan.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/TestNullInputMiniPlan.java
@@ -41,6 +41,7 @@ import org.apache.drill.exec.record.metadata.SchemaBuilder;
 import org.apache.drill.exec.store.dfs.DrillFileSystem;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
 import org.junit.BeforeClass;
 import org.junit.Ignore;
 import org.junit.Test;
@@ -272,11 +273,12 @@ public class TestNullInputMiniPlan extends MiniPlanUnitTestBase{
     RecordBatch left = createScanBatchFromJson(SINGLE_EMPTY_JSON);
 
     String file = DrillFileUtils.getResourceAsFile("/tpchmulti/region/01.parquet").toURI().toString();
+    List<Path> filePath = Collections.singletonList(new Path(file));
 
     RecordBatch scanBatch = new ParquetScanBuilder()
         .fileSystem(fs)
         .columnsToRead("R_REGIONKEY")
-        .inputPaths(Lists.newArrayList(file))
+        .inputPaths(filePath)
         .build();
 
     RecordBatch projectBatch = new PopBuilder()
@@ -554,10 +556,10 @@ public class TestNullInputMiniPlan extends MiniPlanUnitTestBase{
   }
 
   private RecordBatch createScanBatchFromJson(String... resourcePaths) throws Exception {
-    List<String> inputPaths = new ArrayList<>();
+    List<Path> inputPaths = new ArrayList<>();
 
     for (String resource : resourcePaths) {
-      inputPaths.add(DrillFileUtils.getResourceAsFile(resource).toURI().toString());
+      inputPaths.add(new Path(DrillFileUtils.getResourceAsFile(resource).toURI()));
     }
 
     RecordBatch scanBatch = new JsonScanBuilder()
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/CachedSingleFileSystem.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/CachedSingleFileSystem.java
index 6bd7e4d..3db9256 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/CachedSingleFileSystem.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/CachedSingleFileSystem.java
@@ -41,16 +41,16 @@ import org.apache.hadoop.util.Progressable;
 public class CachedSingleFileSystem extends FileSystem {
 
   private ByteBuf file;
-  private String path;
+  private Path path;
 
-  public CachedSingleFileSystem(String path) throws IOException {
+  public CachedSingleFileSystem(Path path) throws IOException {
     this.path = path;
-    File f = new File(path);
+    File f = new File(path.toUri().getPath());
     long length = f.length();
     if (length > Integer.MAX_VALUE) {
       throw new UnsupportedOperationException("Cached file system only supports files of less than 2GB.");
     }
-    try (InputStream is = new BufferedInputStream(new FileInputStream(path))) {
+    try (InputStream is = new BufferedInputStream(new FileInputStream(path.toUri().getPath()))) {
       byte[] buffer = new byte[64*1024];
       this.file = UnpooledByteBufAllocator.DEFAULT.directBuffer((int) length);
       int read;
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/dfs/TestDFSPartitionLocation.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/dfs/TestDFSPartitionLocation.java
new file mode 100644
index 0000000..067abec
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/dfs/TestDFSPartitionLocation.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.dfs;
+
+import org.apache.drill.exec.planner.DFSFilePartitionLocation;
+import org.apache.drill.test.DrillTest;
+import org.apache.hadoop.fs.Path;
+import org.junit.Test;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+
+public class TestDFSPartitionLocation extends DrillTest {
+
+  private static final Path SELECTION_ROOT = new Path("/tmp/drill");
+  private static final Path PARTITION = new Path("/tmp/drill/test_table/first_dir/second_dir/");
+
+  @Test
+  public void testDFSFilePartitionLocation() {
+    Path file = new Path(PARTITION, "0_0_0.parquet");
+    DFSFilePartitionLocation dfsPartition = new DFSFilePartitionLocation(4, SELECTION_ROOT, file, false);
+    checkSubdirectories(dfsPartition, file);
+  }
+
+  @Test
+  public void testDFSDirectoryPartitionLocation() {
+    DFSFilePartitionLocation dfsPartition = new DFSFilePartitionLocation(4, SELECTION_ROOT, PARTITION, true);
+    checkSubdirectories(dfsPartition, PARTITION);
+  }
+
+  private void checkSubdirectories(DFSFilePartitionLocation dfsPartition, Path partition) {
+    assertArrayEquals("Wrong partition dirs",  new String[]{"test_table", "first_dir", "second_dir", null}, dfsPartition.getDirs());
+    assertEquals("Wrong partition value","test_table", dfsPartition.getPartitionValue(0));
+    assertEquals("Wrong partition value", "first_dir", dfsPartition.getPartitionValue(1));
+    assertEquals("Wrong partition value", "second_dir", dfsPartition.getPartitionValue(2));
+    assertEquals("Wrong partition location", partition, dfsPartition.getEntirePartitionLocation());
+  }
+
+}
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/dfs/TestFileSelection.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/dfs/TestFileSelection.java
index b1f233e..b853b9b 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/dfs/TestFileSelection.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/dfs/TestFileSelection.java
@@ -22,6 +22,7 @@ import static org.junit.Assert.assertNull;
 
 import java.util.List;
 
+import org.apache.drill.exec.util.DrillFileSystemUtil;
 import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableList;
 import org.apache.drill.test.BaseTestQuery;
 import org.apache.hadoop.fs.FileStatus;
@@ -30,7 +31,7 @@ import org.junit.Test;
 
 public class TestFileSelection extends BaseTestQuery {
   private static final List<FileStatus> EMPTY_STATUSES = ImmutableList.of();
-  private static final List<String> EMPTY_FILES = ImmutableList.of();
+  private static final List<Path> EMPTY_FILES = ImmutableList.of();
   private static final String EMPTY_ROOT = "";
 
   @Test
@@ -38,8 +39,8 @@ public class TestFileSelection extends BaseTestQuery {
     for (final Object statuses : new Object[] { null, EMPTY_STATUSES}) {
       for (final Object files : new Object[]{null, EMPTY_FILES}) {
         for (final Object root : new Object[]{null, EMPTY_ROOT}) {
-          final FileSelection selection = FileSelection.create((List<FileStatus>) statuses, (List<String>) files,
-              (String)root);
+          FileSelection selection = FileSelection.create((List<FileStatus>) statuses, (List<Path>) files,
+                  DrillFileSystemUtil.createPathSafe((String) root));
           assertNull(selection);
         }
       }
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetRecordReaderTest.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetRecordReaderTest.java
index 1bd90b3..16b25ce 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetRecordReaderTest.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetRecordReaderTest.java
@@ -610,13 +610,13 @@ public class ParquetRecordReaderTest extends BaseTestQuery {
     final FunctionImplementationRegistry registry = new FunctionImplementationRegistry(c);
     final FragmentContextImpl context = new FragmentContextImpl(bitContext, BitControl.PlanFragment.getDefaultInstance(), connection, registry);
 
-    final String fileName = "/tmp/parquet_test_performance.parquet";
+    final Path fileName = new Path("/tmp/parquet_test_performance.parquet");
     final HashMap<String, FieldInfo> fields = new HashMap<>();
     final ParquetTestProperties props = new ParquetTestProperties(1, 20 * 1000 * 1000, DEFAULT_BYTES_PER_PAGE, fields);
     populateFieldInfoMap(props);
 
     final Configuration dfsConfig = new Configuration();
-    final List<Footer> footers = ParquetFileReader.readFooters(dfsConfig, new Path(fileName));
+    final List<Footer> footers = ParquetFileReader.readFooters(dfsConfig, fileName);
     final Footer f = footers.iterator().next();
 
     final List<SchemaPath> columns = Lists.newArrayList();
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/store/TestAssignment.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/store/TestAssignment.java
index 51e8c1b..1297967 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/store/TestAssignment.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/store/TestAssignment.java
@@ -26,6 +26,7 @@ import org.apache.drill.exec.store.schedule.AssignmentCreator;
 import org.apache.drill.exec.store.schedule.CompleteFileWork;
 import org.apache.drill.exec.store.schedule.EndpointByteMap;
 import org.apache.drill.exec.store.schedule.EndpointByteMapImpl;
+import org.apache.hadoop.fs.Path;
 import org.junit.Assert;
 import org.junit.BeforeClass;
 import org.junit.Test;
@@ -125,7 +126,7 @@ public class TestAssignment {
   private List<CompleteFileWork> generateChunks(int chunks) {
     List<CompleteFileWork> chunkList = Lists.newArrayList();
     for (int i = 0; i < chunks; i++) {
-      CompleteFileWork chunk = new CompleteFileWork(createByteMap(), 0, FILE_SIZE, "file" + i);
+      CompleteFileWork chunk = new CompleteFileWork(createByteMap(), 0, FILE_SIZE, new Path("file", Integer.toString(i)));
       chunkList.add(chunk);
     }
     return chunkList;
diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/PhysicalOpUnitTestBase.java b/exec/java-exec/src/test/java/org/apache/drill/test/PhysicalOpUnitTestBase.java
index b0820e9..27a4ad6 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/test/PhysicalOpUnitTestBase.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/test/PhysicalOpUnitTestBase.java
@@ -56,6 +56,7 @@ import org.apache.drill.exec.record.VectorAccessible;
 import org.apache.drill.exec.server.DrillbitContext;
 import org.apache.drill.exec.store.RecordReader;
 import org.apache.drill.exec.rpc.NamedThreadFactory;
+import org.apache.hadoop.fs.Path;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Rule;
@@ -399,9 +400,9 @@ public class PhysicalOpUnitTestBase extends ExecTest {
    * @param columnsToRead
    * @return The {@link org.apache.drill.exec.store.easy.json.JSONRecordReader} corresponding to each given input path.
    */
-  public static Iterator<RecordReader> getJsonReadersFromInputFiles(DrillFileSystem fs, List<String> inputPaths, FragmentContext fragContext, List<SchemaPath> columnsToRead) {
+  public static Iterator<RecordReader> getJsonReadersFromInputFiles(DrillFileSystem fs, List<Path> inputPaths, FragmentContext fragContext, List<SchemaPath> columnsToRead) {
     List<RecordReader> readers = new ArrayList<>();
-    for (String inputPath : inputPaths) {
+    for (Path inputPath : inputPaths) {
       readers.add(new JSONRecordReader(fragContext, inputPath, fs, columnsToRead));
     }
     return readers.iterator();
diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/QueryBuilder.java b/exec/java-exec/src/test/java/org/apache/drill/test/QueryBuilder.java
index b4fcedf..629714b 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/test/QueryBuilder.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/test/QueryBuilder.java
@@ -643,7 +643,7 @@ public class QueryBuilder {
    */
 
   protected String queryPlan(String columnName) throws Exception {
-    Preconditions.checkArgument(queryType == QueryType.SQL, "Can only explan an SQL query.");
+    Preconditions.checkArgument(queryType == QueryType.SQL, "Can only explain an SQL query.");
     final List<QueryDataBatch> results = results();
     final RecordBatchLoader loader = new RecordBatchLoader(client.allocator());
     final StringBuilder builder = new StringBuilder();