You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2019/04/12 21:15:53 UTC

[impala] branch master updated: IMPALA-6050: Query profiles should indicate storage layer(s) used

This is an automated email from the ASF dual-hosted git repository.

tarmstrong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git


The following commit(s) were added to refs/heads/master by this push:
     new a103cb8  IMPALA-6050: Query profiles should indicate storage layer(s) used
a103cb8 is described below

commit a103cb8ee2357c220eaf912d9aefd522b09f3e04
Author: stakiar <ta...@gmail.com>
AuthorDate: Tue Jan 8 15:02:28 2019 -0600

    IMPALA-6050: Query profiles should indicate storage layer(s) used
    
    This patch updates Impala explain plans so that the Scan Node section clearly
    displays which filesystems the Scan Node is reading data from (support
    has been added for scans from HDFS, S3, ADLS, and the local filesystem).
    
    Before this patch, if an Impala query scanned a table with partitions
    across different storage layers, the explain plan would look like this:
    
     PLAN-ROOT SINK
     |
     01:EXCHANGE [UNPARTITIONED]
     |
     00:SCAN HDFS [functional.alltypes]
        partitions=24/24 files=24 size=478.45KB
    
    Now the explain plan will look like this:
    
     PLAN-ROOT SINK
     |
     01:EXCHANGE [UNPARTITIONED]
     |
     00:SCAN S3 [functional.alltypes]
        ADLS partitions=4/24 files=4 size=478.45KB
        HDFS partitions=10/24 files=10 size=478.45KB
        S3 partitions=10/24 files=10 size=478.45KB
    
    The explain plan differentiates "SCAN HDFS" vs "SCAN S3" by using the
    root table path. This means that even scans of non-partitioned tables
    will see their explain plans change from "SCAN HDFS" to "SCAN
    [storage-layer-name]". This change affects explain plans that are stored on
    an single storage layer as well: 'partitions=...' will become
    'HDFS partitions-...'.
    
    This patch makes several changes to PlannerTest.java so that by default
    test files do not validate the value of the storage layer displayed in
    the explain plan. This is necessary to support classes such as
    S3PlannerTest which run test files against S3. It makes several changes
    to impala_test_suite.py as well in order to support validation of
    explain plans in test files that run via Python. Specifically, it adds
    support for a new substitution variable in test files called
    $FILESYSTEM_NAME which is the name of the storage layer the test is
    being run against.
    
    Testing:
    * Ran core tests
    * Added new tests to PlannerTest
    * Added ExplainTest to allow for more fine-grained testing of explain
    plan logic
    
    Change-Id: I4b1b4a1bc1a24e9614e3b4dc5a61dc96d075d1c3
    Reviewed-on: http://gerrit.cloudera.org:8080/12282
    Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 fe/pom.xml                                         |   7 +
 .../apache/impala/analysis/ComputeStatsStmt.java   |   4 +-
 .../org/apache/impala/catalog/FeFsPartition.java   |   6 +
 .../java/org/apache/impala/catalog/FeFsTable.java  |  28 +--
 .../org/apache/impala/catalog/HdfsPartition.java   |  15 +-
 .../java/org/apache/impala/catalog/HdfsTable.java  |  18 +-
 .../impala/catalog/local/LocalFsPartition.java     |  10 ++
 .../apache/impala/catalog/local/LocalFsTable.java  |  12 ++
 .../org/apache/impala/common/FileSystemUtil.java   |  43 ++++-
 .../org/apache/impala/planner/HdfsScanNode.java    | 146 ++++++++++++----
 .../org/apache/impala/common/FrontendFixture.java  |   2 +-
 .../org/apache/impala/planner/ExplainTest.java     | 188 +++++++++++++++++++++
 .../org/apache/impala/planner/PlannerTest.java     |  14 ++
 .../org/apache/impala/planner/PlannerTestBase.java |  17 +-
 .../java/org/apache/impala/testutil/TestUtils.java |  37 +++-
 .../queries/PlannerTest/scan-node-fs-scheme.test   |  80 +++++++++
 .../queries/QueryTest/partition-col-types.test     |  50 +++---
 tests/common/impala_test_suite.py                  |  21 ++-
 tests/util/filesystem_utils.py                     |  14 ++
 19 files changed, 624 insertions(+), 88 deletions(-)

diff --git a/fe/pom.xml b/fe/pom.xml
index 5892086..f0fb6ab 100644
--- a/fe/pom.xml
+++ b/fe/pom.xml
@@ -531,6 +531,13 @@ under the License.
       <version>3.2.2</version>
     </dependency>
 
+    <dependency>
+      <groupId>org.mockito</groupId>
+      <artifactId>mockito-core</artifactId>
+      <version>2.23.4</version>
+      <scope>test</scope>
+    </dependency>
+
   </dependencies>
 
   <reporting>
diff --git a/fe/src/main/java/org/apache/impala/analysis/ComputeStatsStmt.java b/fe/src/main/java/org/apache/impala/analysis/ComputeStatsStmt.java
index 3e743fa..f0844bb 100644
--- a/fe/src/main/java/org/apache/impala/analysis/ComputeStatsStmt.java
+++ b/fe/src/main/java/org/apache/impala/analysis/ComputeStatsStmt.java
@@ -43,6 +43,7 @@ import org.apache.impala.catalog.Type;
 import org.apache.impala.common.AnalysisException;
 import org.apache.impala.common.PrintUtils;
 import org.apache.impala.common.RuntimeEnv;
+import org.apache.impala.planner.HdfsScanNode;
 import org.apache.impala.service.BackendConfig;
 import org.apache.impala.service.CatalogOpExecutor;
 import org.apache.impala.service.FrontendProfile;
@@ -747,7 +748,8 @@ public class ComputeStatsStmt extends StatementBase {
     // TODO(todd): can we avoid loading all the partitions for this?
     Collection<? extends FeFsPartition> partitions =
         FeCatalogUtils.loadAllPartitions(hdfsTable);
-    Map<Long, List<FileDescriptor>> sample = FeFsTable.Utils.getFilesSample(hdfsTable,
+    Map<HdfsScanNode.SampledPartitionMetadata, List<FileDescriptor>> sample =
+            FeFsTable.Utils.getFilesSample(hdfsTable,
         partitions, samplePerc, minSampleBytes, sampleSeed);
     long sampleFileBytes = 0;
     for (List<FileDescriptor> fds: sample.values()) {
diff --git a/fe/src/main/java/org/apache/impala/catalog/FeFsPartition.java b/fe/src/main/java/org/apache/impala/catalog/FeFsPartition.java
index ea248a1..00c90af 100644
--- a/fe/src/main/java/org/apache/impala/catalog/FeFsPartition.java
+++ b/fe/src/main/java/org/apache/impala/catalog/FeFsPartition.java
@@ -24,6 +24,7 @@ import javax.annotation.Nullable;
 import org.apache.hadoop.fs.Path;
 import org.apache.impala.analysis.LiteralExpr;
 import org.apache.impala.catalog.HdfsPartition.FileDescriptor;
+import org.apache.impala.common.FileSystemUtil;
 import org.apache.impala.thrift.TAccessLevel;
 import org.apache.impala.thrift.THdfsPartitionLocation;
 import org.apache.impala.thrift.TPartitionStats;
@@ -49,6 +50,11 @@ public interface FeFsPartition {
   FeFsTable getTable();
 
   /**
+   * @return the FsType that this partition is stored on
+   */
+  FileSystemUtil.FsType getFsType();
+
+  /**
    * @return the files that this partition contains
    */
   List<FileDescriptor> getFileDescriptors();
diff --git a/fe/src/main/java/org/apache/impala/catalog/FeFsTable.java b/fe/src/main/java/org/apache/impala/catalog/FeFsTable.java
index 4b18770..dd1699c 100644
--- a/fe/src/main/java/org/apache/impala/catalog/FeFsTable.java
+++ b/fe/src/main/java/org/apache/impala/catalog/FeFsTable.java
@@ -33,7 +33,9 @@ import org.apache.impala.analysis.LiteralExpr;
 import org.apache.impala.analysis.PartitionKeyValue;
 import org.apache.impala.catalog.HdfsPartition.FileDescriptor;
 import org.apache.impala.common.AnalysisException;
+import org.apache.impala.common.FileSystemUtil;
 import org.apache.impala.common.PrintUtils;
+import org.apache.impala.planner.HdfsScanNode;
 import org.apache.impala.service.BackendConfig;
 import org.apache.impala.thrift.TColumn;
 import org.apache.impala.thrift.TNetworkAddress;
@@ -95,6 +97,11 @@ public interface FeFsTable extends FeTable {
   public String getHdfsBaseDir();
 
   /**
+   * @return the FsType where files of this table are stored.
+   */
+  public FileSystemUtil.FsType getFsType();
+
+  /**
    * @return the total number of bytes stored for this table.
    */
   long getTotalHdfsBytes();
@@ -277,11 +284,9 @@ public interface FeFsTable extends FeTable {
      * The given 'randomSeed' is used for random number generation.
      * The 'percentBytes' parameter must be between 0 and 100.
      */
-    public static Map<Long, List<FileDescriptor>> getFilesSample(
-        FeFsTable table,
-        Collection<? extends FeFsPartition> inputParts,
-        long percentBytes, long minSampleBytes,
-        long randomSeed) {
+    public static Map<HdfsScanNode.SampledPartitionMetadata, List<FileDescriptor>>
+        getFilesSample(FeFsTable table, Collection<? extends FeFsPartition> inputParts,
+            long percentBytes, long minSampleBytes, long randomSeed) {
       Preconditions.checkState(percentBytes >= 0 && percentBytes <= 100);
       Preconditions.checkState(minSampleBytes >= 0);
 
@@ -337,16 +342,15 @@ public interface FeFsTable extends FeTable {
       // selected.
       Random rnd = new Random(randomSeed);
       long selectedBytes = 0;
-      Map<Long, List<FileDescriptor>> result = new HashMap<>();
+      Map<HdfsScanNode.SampledPartitionMetadata, List<FileDescriptor>> result =
+          new HashMap<>();
       while (selectedBytes < targetBytes && numFilesRemaining > 0) {
         int selectedIdx = Math.abs(rnd.nextInt()) % numFilesRemaining;
         FeFsPartition part = parts[selectedIdx];
-        Long partId = Long.valueOf(part.getId());
-        List<FileDescriptor> sampleFileIdxs = result.get(partId);
-        if (sampleFileIdxs == null) {
-          sampleFileIdxs = new ArrayList<>();
-          result.put(partId, sampleFileIdxs);
-        }
+        HdfsScanNode.SampledPartitionMetadata sampledPartitionMetadata =
+            new HdfsScanNode.SampledPartitionMetadata(part.getId(), part.getFsType());
+        List<FileDescriptor> sampleFileIdxs = result.computeIfAbsent(
+            sampledPartitionMetadata, partMetadata -> Lists.newArrayList());
         FileDescriptor fd = part.getFileDescriptors().get(fileIdxs[selectedIdx]);
         sampleFileIdxs.add(fd);
         selectedBytes += fd.getFileLength();
diff --git a/fe/src/main/java/org/apache/impala/catalog/HdfsPartition.java b/fe/src/main/java/org/apache/impala/catalog/HdfsPartition.java
index 56a2340..c27d347 100644
--- a/fe/src/main/java/org/apache/impala/catalog/HdfsPartition.java
+++ b/fe/src/main/java/org/apache/impala/catalog/HdfsPartition.java
@@ -704,11 +704,24 @@ public class HdfsPartition implements FeFsPartition, PrunablePartition {
   }
 
   @Override // FeFsPartition
-  public Path getLocationPath() { return new Path(getLocation()); }
+  public Path getLocationPath() {
+    Preconditions.checkNotNull(getLocation(), "HdfsPartition location is null");
+    return new Path(getLocation());
+  }
+
   @Override // FeFsPartition
   public long getId() { return id_; }
+
   @Override // FeFsPartition
   public HdfsTable getTable() { return table_; }
+
+  @Override
+  public FileSystemUtil.FsType getFsType() {
+    Preconditions.checkNotNull(getLocationPath().toUri().getScheme(),
+        "Cannot get scheme from path " + getLocationPath());
+    return FileSystemUtil.FsType.getFsType(getLocationPath().toUri().getScheme());
+  }
+
   public void setNumRows(long numRows) { numRows_ = numRows; }
   @Override // FeFsPartition
   public long getNumRows() { return numRows_; }
diff --git a/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java b/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java
index e8a6e71..49f0e9a 100644
--- a/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java
+++ b/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java
@@ -756,7 +756,7 @@ public class HdfsTable extends Table implements FeFsTable {
    * Resets any partition metadata, creates the prototype partition and sets the base
    * table directory path as well as the caching info from the HMS table.
    */
-  private void initializePartitionMetadata(
+  public void initializePartitionMetadata(
       org.apache.hadoop.hive.metastore.api.Table msTbl) throws CatalogException {
     Preconditions.checkNotNull(msTbl);
     resetPartitions();
@@ -1283,7 +1283,7 @@ public class HdfsTable extends Table implements FeFsTable {
    * Incrementally updates the file metadata of an unpartitioned HdfsTable.
    *
    * This is optimized for the case where few files have changed. See
-   * {@link #refreshFileMetadata()} above for details.
+   * {@link #refreshFileMetadata(Path, List)} above for details.
    */
   private void updateUnpartitionedTableFileMd() throws CatalogException {
     Preconditions.checkState(getNumClusteringCols() == 0);
@@ -1870,7 +1870,12 @@ public class HdfsTable extends Table implements FeFsTable {
 
   @Override // FeFsTable
   public String getHdfsBaseDir() { return hdfsBaseDir_; }
-  public Path getHdfsBaseDirPath() { return new Path(hdfsBaseDir_); }
+
+  public Path getHdfsBaseDirPath() {
+    Preconditions.checkNotNull(hdfsBaseDir_, "HdfsTable base dir is null");
+    return new Path(hdfsBaseDir_);
+  }
+
   @Override // FeFsTable
   public boolean usesAvroSchemaOverride() { return avroSchema_ != null; }
 
@@ -2028,6 +2033,13 @@ public class HdfsTable extends Table implements FeFsTable {
     return getTableStats(this);
   }
 
+  @Override
+  public FileSystemUtil.FsType getFsType() {
+    Preconditions.checkNotNull(getHdfsBaseDirPath().toUri().getScheme(),
+        "Cannot get scheme from path " + getHdfsBaseDirPath());
+    return FileSystemUtil.FsType.getFsType(getHdfsBaseDirPath().toUri().getScheme());
+  }
+
   // TODO(todd): move to FeCatalogUtils. Upon moving to Java 8, could be
   // a default method of FeFsTable.
   public static TResultSet getTableStats(FeFsTable table) {
diff --git a/fe/src/main/java/org/apache/impala/catalog/local/LocalFsPartition.java b/fe/src/main/java/org/apache/impala/catalog/local/LocalFsPartition.java
index 522e984..140ff6b 100644
--- a/fe/src/main/java/org/apache/impala/catalog/local/LocalFsPartition.java
+++ b/fe/src/main/java/org/apache/impala/catalog/local/LocalFsPartition.java
@@ -33,6 +33,7 @@ import org.apache.impala.catalog.HdfsPartition.FileDescriptor;
 import org.apache.impala.catalog.HdfsStorageDescriptor;
 import org.apache.impala.catalog.HdfsStorageDescriptor.InvalidStorageDescriptorException;
 import org.apache.impala.catalog.PartitionStatsUtil;
+import org.apache.impala.common.FileSystemUtil;
 import org.apache.impala.thrift.TAccessLevel;
 import org.apache.impala.thrift.THdfsPartitionLocation;
 import org.apache.impala.thrift.TPartitionStats;
@@ -84,6 +85,13 @@ public class LocalFsPartition implements FeFsPartition {
   }
 
   @Override
+  public FileSystemUtil.FsType getFsType() {
+    Preconditions.checkNotNull(getLocationPath().toUri().getScheme(),
+        "Cannot get scheme from path " + getLocationPath());
+    return FileSystemUtil.FsType.getFsType(getLocationPath().toUri().getScheme());
+  }
+
+  @Override
   public List<FileDescriptor> getFileDescriptors() {
     return fileDescriptors_;
   }
@@ -115,6 +123,8 @@ public class LocalFsPartition implements FeFsPartition {
 
   @Override
   public Path getLocationPath() {
+    Preconditions.checkNotNull(getLocation(),
+            "LocalFsPartition location is null");
     return new Path(getLocation());
   }
 
diff --git a/fe/src/main/java/org/apache/impala/catalog/local/LocalFsTable.java b/fe/src/main/java/org/apache/impala/catalog/local/LocalFsTable.java
index a1266c3..566a63d 100644
--- a/fe/src/main/java/org/apache/impala/catalog/local/LocalFsTable.java
+++ b/fe/src/main/java/org/apache/impala/catalog/local/LocalFsTable.java
@@ -28,6 +28,7 @@ import java.util.Set;
 import java.util.TreeMap;
 
 import org.apache.avro.Schema;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.metastore.api.FieldSchema;
 import org.apache.hadoop.hive.metastore.api.Partition;
 import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
@@ -49,6 +50,7 @@ import org.apache.impala.catalog.local.MetaProvider.PartitionMetadata;
 import org.apache.impala.catalog.local.MetaProvider.PartitionRef;
 import org.apache.impala.catalog.local.MetaProvider.TableMetaRef;
 import org.apache.impala.common.AnalysisException;
+import org.apache.impala.common.FileSystemUtil;
 import org.apache.impala.thrift.CatalogObjectsConstants;
 import org.apache.impala.thrift.THdfsPartition;
 import org.apache.impala.thrift.THdfsTable;
@@ -271,6 +273,16 @@ public class LocalFsTable extends LocalTable implements FeFsTable {
   }
 
   @Override
+  public FileSystemUtil.FsType getFsType() {
+    Preconditions.checkNotNull(getHdfsBaseDir(),
+            "LocalTable base dir is null");
+    Path hdfsBaseDirPath = new Path(getHdfsBaseDir());
+    Preconditions.checkNotNull(hdfsBaseDirPath.toUri().getScheme(),
+        "Cannot get scheme from path " + getHdfsBaseDir());
+    return FileSystemUtil.FsType.getFsType(hdfsBaseDirPath.toUri().getScheme());
+  }
+
+  @Override
   public TTableDescriptor toThriftDescriptor(int tableId,
       Set<Long> referencedPartitions) {
     if (referencedPartitions == null) {
diff --git a/fe/src/main/java/org/apache/impala/common/FileSystemUtil.java b/fe/src/main/java/org/apache/impala/common/FileSystemUtil.java
index 1de2744..10f4c69 100644
--- a/fe/src/main/java/org/apache/impala/common/FileSystemUtil.java
+++ b/fe/src/main/java/org/apache/impala/common/FileSystemUtil.java
@@ -17,11 +17,11 @@
 
 package org.apache.impala.common;
 
-import java.io.File;
 import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.io.InputStream;
 import java.net.URI;
+import java.util.Map;
 import java.util.UUID;
 
 import org.apache.commons.io.IOUtils;
@@ -45,6 +45,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableMap;
 
 /**
  * Common utility functions for operating on FileSystem objects.
@@ -388,6 +389,46 @@ public class FileSystemUtil {
     return isDistributedFileSystem(path.getFileSystem(CONF));
   }
 
+  /**
+   * Represents the type of filesystem being used. Typically associated with a
+   * {@link org.apache.hadoop.fs.FileSystem} instance that is used to read data.
+   *
+   * <p>
+   *   Unlike the {@code is*FileSystem} methods above. A FsType is more
+   *   generic in that it is capable of grouping different filesystems to the
+   *   same type. For example, the FsType {@link FsType#ADLS} maps to
+   *   multiple filesystems: {@link AdlFileSystem},
+   *   {@link AzureBlobFileSystem}, and {@link SecureAzureBlobFileSystem}.
+   * </p>
+   */
+  public enum FsType {
+    ADLS,
+    HDFS,
+    LOCAL,
+    S3;
+
+    private static final Map<String, FsType> SCHEME_TO_FS_MAPPING =
+        ImmutableMap.<String, FsType>builder()
+            .put("abfs", ADLS)
+            .put("abfss", ADLS)
+            .put("adl", ADLS)
+            .put("file", LOCAL)
+            .put("hdfs", HDFS)
+            .put("s3a", S3)
+            .build();
+
+    /**
+     * Provides a mapping between filesystem schemes and filesystems types. This can be
+     * useful as there are often multiple filesystem connectors for a give fs, each
+     * with its own scheme (e.g. abfs, abfss, adl are all ADLS connectors).
+     * Returns the {@link FsType} associated with a given filesystem scheme (e.g. local,
+     * hdfs, s3a, etc.)
+     */
+    public static FsType getFsType(String scheme) {
+      return SCHEME_TO_FS_MAPPING.get(scheme);
+    }
+  }
+
   public static FileSystem getDefaultFileSystem() throws IOException {
     Path path = new Path(FileSystem.getDefaultUri(CONF));
     FileSystem fs = path.getFileSystem(CONF);
diff --git a/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java b/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java
index fb589b2..b58cb98 100644
--- a/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java
+++ b/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java
@@ -25,9 +25,12 @@ import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.TreeMap;
+import java.util.stream.Collectors;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
 import org.apache.impala.analysis.AggregateInfo;
 import org.apache.impala.analysis.Analyzer;
 import org.apache.impala.analysis.BinaryPredicate;
@@ -49,6 +52,7 @@ import org.apache.impala.analysis.TupleDescriptor;
 import org.apache.impala.analysis.TupleId;
 import org.apache.impala.catalog.Column;
 import org.apache.impala.catalog.ColumnStats;
+import org.apache.impala.catalog.FeTable;
 import org.apache.impala.catalog.FeFsPartition;
 import org.apache.impala.catalog.FeFsTable;
 import org.apache.impala.catalog.HdfsCompression;
@@ -83,6 +87,7 @@ import org.apache.impala.thrift.TScanRangeSpec;
 import org.apache.impala.thrift.TTableStats;
 import org.apache.impala.util.BitUtil;
 import org.apache.impala.util.ExecutorMembershipSnapshot;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -171,10 +176,13 @@ public class HdfsScanNode extends ScanNode {
   private final AggregateInfo aggInfo_;
 
   // Number of partitions, files and bytes scanned. Set in computeScanRangeLocations().
-  // Might not match 'partitions_' due to table sampling.
-  private int numPartitions_ = 0;
-  private long totalFiles_ = 0;
-  private long totalBytes_ = 0;
+  // Might not match 'partitions_' due to table sampling. Grouped by the FsType, so
+  // each key value pair maps how many partitions / files / bytes are stored on each fs.
+  // Stored as a TreeMap so that iteration order is defined by the order of enums in
+  // FsType.
+  private Map<FileSystemUtil.FsType, Long> numPartitionsPerFs_ = new TreeMap<>();
+  private Map<FileSystemUtil.FsType, Long> totalFilesPerFs_ = new TreeMap<>();
+  private Map<FileSystemUtil.FsType, Long> totalBytesPerFs_ = new TreeMap<>();
 
   // File formats scanned. Set in computeScanRangeLocations().
   private Set<HdfsFileFormat> fileFormats_;
@@ -276,8 +284,7 @@ public class HdfsScanNode extends ScanNode {
   public HdfsScanNode(PlanNodeId id, TupleDescriptor desc, List<Expr> conjuncts,
       List<? extends FeFsPartition> partitions, TableRef hdfsTblRef,
       AggregateInfo aggInfo, List<Expr> partConjuncts) {
-    super(id, desc, "SCAN HDFS");
-    Preconditions.checkState(desc.getTable() instanceof FeFsTable);
+    super(id, desc, createDisplayName(hdfsTblRef.getTable()));
     tbl_ = (FeFsTable)desc.getTable();
     conjuncts_ = conjuncts;
     partitions_ = partitions;
@@ -296,6 +303,14 @@ public class HdfsScanNode extends ScanNode {
     }
   }
 
+  /**
+   * Returns the display name for this scan node. Of the form "SCAN [storage-layer-name]"
+   */
+  private static String createDisplayName(FeTable table) {
+    Preconditions.checkState(table instanceof FeFsTable);
+    return "SCAN " + ((FeFsTable) table).getFsType();
+  }
+
   @Override
   protected String debugString() {
     ToStringHelper helper = Objects.toStringHelper(this);
@@ -746,17 +761,48 @@ public class HdfsScanNode extends ScanNode {
   }
 
   /**
+   * A collection of metadata associated with a sampled partition. Unlike
+   * {@link FeFsPartition} this class is safe to use in hash-based data structures.
+   */
+  public static final class SampledPartitionMetadata {
+
+    private final long partitionId;
+    private final FileSystemUtil.FsType partitionFsType;
+
+    public SampledPartitionMetadata(
+        long partitionId, FileSystemUtil.FsType partitionFsType) {
+      this.partitionId = partitionId;
+      this.partitionFsType = partitionFsType;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+      if (this == o) return true;
+      if (o == null || getClass() != o.getClass()) return false;
+      SampledPartitionMetadata that = (SampledPartitionMetadata) o;
+      return partitionId == that.partitionId && partitionFsType == that.partitionFsType;
+    }
+
+    @Override
+    public int hashCode() {
+      return java.util.Objects.hash(partitionId, partitionFsType);
+    }
+
+    private FileSystemUtil.FsType getPartitionFsType() { return partitionFsType; }
+  }
+
+  /**
    * Computes scan ranges (i.e. hdfs splits) plus their storage locations, including
    * volume ids, based on the given maximum number of bytes each scan range should scan.
    * If 'sampleParams_' is not null, generates a sample and computes the scan ranges
    * based on the sample.
    *
-   * Initializes members with information about files and scan ranges, e.g. totalFiles_,
-   * fileFormats_, etc.
+   * Initializes members with information about files and scan ranges, e.g.
+   * totalFilesPerFs_, fileFormats_, etc.
    */
   private void computeScanRangeLocations(Analyzer analyzer)
       throws ImpalaRuntimeException {
-    Map<Long, List<FileDescriptor>> sampledFiles = null;
+    Map<SampledPartitionMetadata, List<FileDescriptor>> sampledFiles = null;
     if (sampleParams_ != null) {
       long percentBytes = sampleParams_.getPercentBytes();
       long randomSeed;
@@ -775,9 +821,17 @@ public class HdfsScanNode extends ScanNode {
     long scanRangeBytesLimit = analyzer.getQueryCtx().client_request.getQuery_options()
         .getMax_scan_range_length();
     scanRangeSpecs_ = new TScanRangeSpec();
-    numPartitions_ = (sampledFiles != null) ? sampledFiles.size() : partitions_.size();
-    totalFiles_ = 0;
-    totalBytes_ = 0;
+
+    if (sampledFiles != null) {
+      numPartitionsPerFs_ = sampledFiles.keySet().stream().collect(Collectors.groupingBy(
+          SampledPartitionMetadata::getPartitionFsType, Collectors.counting()));
+    } else {
+      numPartitionsPerFs_.putAll(partitions_.stream().collect(
+          Collectors.groupingBy(FeFsPartition::getFsType, Collectors.counting())));
+    }
+
+    totalFilesPerFs_ = new TreeMap<>();
+    totalBytesPerFs_ = new TreeMap<>();
     largestScanRangeBytes_ = 0;
     maxScanRangeNumRows_ = -1;
     fileFormats_ = new HashSet<>();
@@ -785,7 +839,8 @@ public class HdfsScanNode extends ScanNode {
       List<FileDescriptor> fileDescs = partition.getFileDescriptors();
       if (sampledFiles != null) {
         // If we are sampling, check whether this partition is included in the sample.
-        fileDescs = sampledFiles.get(Long.valueOf(partition.getId()));
+        fileDescs = sampledFiles.get(
+            new SampledPartitionMetadata(partition.getId(), partition.getFsType()));
         if (fileDescs == null) continue;
       }
       long partitionNumRows = partition.getNumRows();
@@ -816,8 +871,8 @@ public class HdfsScanNode extends ScanNode {
       final long partitionBytes = FileDescriptor.computeTotalFileLength(fileDescs);
       long partitionMaxScanRangeBytes = 0;
       boolean partitionMissingDiskIds = false;
-      totalBytes_ += partitionBytes;
-      totalFiles_ += fileDescs.size();
+      totalBytesPerFs_.merge(partition.getFsType(), partitionBytes, Long::sum);
+      totalFilesPerFs_.merge(partition.getFsType(), (long) fileDescs.size(), Long::sum);
       for (FileDescriptor fileDesc: fileDescs) {
         if (!analyzer.getQueryOptions().isAllow_erasure_coded_files() &&
             fileDesc.getIsEc()) {
@@ -844,14 +899,15 @@ public class HdfsScanNode extends ScanNode {
             partitionNumRows, partitionBytes, partitionMaxScanRangeBytes);
       }
     }
-    if (totalFiles_ == 0) {
+    if (totalFilesPerFs_.isEmpty() || sumValues(totalFilesPerFs_) == 0) {
       maxScanRangeNumRows_ = 0;
     } else {
       // Also estimate max rows per scan range based on table-level stats, in case some
       // or all partition-level stats were missing.
       long tableNumRows = tbl_.getNumRows();
       if (tableNumRows >= 0) {
-        updateMaxScanRangeNumRows(tableNumRows, totalBytes_, largestScanRangeBytes_);
+        updateMaxScanRangeNumRows(
+            tableNumRows, sumValues(totalBytesPerFs_), largestScanRangeBytes_);
       }
     }
   }
@@ -1011,11 +1067,13 @@ public class HdfsScanNode extends ScanNode {
    */
   private void computeCardinalities() {
     // Choose between the extrapolated row count and the one based on stored stats.
-    extrapolatedNumRows_ = FeFsTable.Utils.getExtrapolatedNumRows(tbl_, totalBytes_);
+    extrapolatedNumRows_ = FeFsTable.Utils.getExtrapolatedNumRows(tbl_,
+            sumValues(totalBytesPerFs_));
     long statsNumRows = getStatsNumRows();
     if (extrapolatedNumRows_ != -1) {
-      // The extrapolated row count is based on the 'totalBytes_' which already accounts
-      // for table sampling, so no additional adjustment for sampling is necessary.
+      // The extrapolated row count is based on the 'totalBytesPerFs_' which already
+      // accounts for table sampling, so no additional adjustment for sampling is
+      // necessary.
       cardinality_ = extrapolatedNumRows_;
     } else {
       // Set the cardinality based on table or partition stats.
@@ -1029,7 +1087,7 @@ public class HdfsScanNode extends ScanNode {
     }
 
     // Checked after the block above to first collect information for the explain output.
-    if (totalBytes_ == 0) {
+    if (sumValues(totalBytesPerFs_) == 0) {
       // Nothing to scan. Definitely a cardinality of 0.
       inputCardinality_ = 0;
       cardinality_ = 0;
@@ -1259,11 +1317,33 @@ public class HdfsScanNode extends ScanNode {
           .append(String.format("partition predicates: %s\n",
               getExplainString(partitionConjuncts_, detailLevel)));
       }
-      if (tbl_.getNumClusteringCols() == 0) numPartitions_ = 1;
-      output.append(detailPrefix)
-        .append(String.format("partitions=%d/%d files=%d size=%s\n",
-            numPartitions_, table.getPartitions().size(), totalFiles_,
-            PrintUtils.printBytes(totalBytes_)));
+      String partMetaTemplate = "partitions=%d/%d files=%d size=%s\n";
+      if (!numPartitionsPerFs_.isEmpty()) {
+        // The table is partitioned; print a line for each filesystem we are reading
+        // partitions from
+        for (Map.Entry<FileSystemUtil.FsType, Long> partsPerFs :
+            numPartitionsPerFs_.entrySet()) {
+          FileSystemUtil.FsType fsType = partsPerFs.getKey();
+          output.append(detailPrefix);
+          output.append(fsType).append(" ");
+          output.append(String.format(partMetaTemplate, partsPerFs.getValue(),
+              table.getPartitions().size(), totalFilesPerFs_.get(fsType),
+              PrintUtils.printBytes(totalBytesPerFs_.get(fsType))));
+        }
+      } else if (tbl_.getNumClusteringCols() == 0) {
+        // There are no partitions so we use the FsType of the base table
+        output.append(detailPrefix);
+        output.append(table.getFsType()).append(" ");
+        output.append(String.format(partMetaTemplate, 1, table.getPartitions().size(),
+            0, PrintUtils.printBytes(0)));
+      } else {
+        // The table is partitioned, but no partitions are selected; in this case we
+        // exclude the FsType completely
+        output.append(detailPrefix);
+        output.append(String.format(partMetaTemplate, 0, table.getPartitions().size(),
+            0, PrintUtils.printBytes(0)));
+      }
+
       if (!conjuncts_.isEmpty()) {
         output.append(detailPrefix)
           .append(String.format("predicates: %s\n",
@@ -1301,8 +1381,8 @@ public class HdfsScanNode extends ScanNode {
         output.append(detailPrefix)
           .append(String.format("missing disk ids: "
                 + "partitions=%s/%s files=%s/%s scan ranges %s/%s\n",
-            numPartitionsNoDiskIds_, numPartitions_, numFilesNoDiskIds_,
-            totalFiles_, numScanRangesNoDiskIds_,
+            numPartitionsNoDiskIds_, sumValues(numPartitionsPerFs_),
+            numFilesNoDiskIds_, sumValues(totalFilesPerFs_), numScanRangesNoDiskIds_,
             scanRangeSpecs_.getConcrete_rangesSize() + generatedScanRangeCount_));
       }
       // Groups the min max original conjuncts by tuple descriptor.
@@ -1440,7 +1520,8 @@ public class HdfsScanNode extends ScanNode {
     int maxScannerThreads = computeMaxNumberOfScannerThreads(queryOptions,
         perHostScanRanges);
 
-    long avgScanRangeBytes = (long) Math.ceil(totalBytes_ / (double) scanRangeSize);
+    long avgScanRangeBytes =
+        (long) Math.ceil(sumValues(totalBytesPerFs_) / (double) scanRangeSize);
     // The +1 accounts for an extra I/O buffer to read past the scan range due to a
     // trailing record spanning Hdfs blocks.
     long maxIoBufferSize =
@@ -1673,4 +1754,11 @@ public class HdfsScanNode extends ScanNode {
   public boolean hasCorruptTableStats() { return hasCorruptTableStats_; }
 
   public boolean hasMissingDiskIds() { return numScanRangesNoDiskIds_ > 0; }
+
+  /**
+   * Returns of all the values in the given {@link Map}.
+   */
+  private static long sumValues(Map<?, Long> input) {
+    return input.values().stream().mapToLong(Long::longValue).sum();
+  }
 }
diff --git a/fe/src/test/java/org/apache/impala/common/FrontendFixture.java b/fe/src/test/java/org/apache/impala/common/FrontendFixture.java
index b380c4f..a37717b 100644
--- a/fe/src/test/java/org/apache/impala/common/FrontendFixture.java
+++ b/fe/src/test/java/org/apache/impala/common/FrontendFixture.java
@@ -194,7 +194,7 @@ public class FrontendFixture {
       }
       try {
         HdfsTable hdfsTable = (HdfsTable) dummyTable;
-        hdfsTable.setPrototypePartition(msTbl.getSd());
+        hdfsTable.initializePartitionMetadata(msTbl);
       } catch (CatalogException e) {
         e.printStackTrace();
         fail("Failed to add test table:\n" + createTableSql);
diff --git a/fe/src/test/java/org/apache/impala/planner/ExplainTest.java b/fe/src/test/java/org/apache/impala/planner/ExplainTest.java
new file mode 100644
index 0000000..dcc3bd2
--- /dev/null
+++ b/fe/src/test/java/org/apache/impala/planner/ExplainTest.java
@@ -0,0 +1,188 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.impala.planner;
+
+import com.google.common.base.Splitter;
+import com.google.common.collect.Lists;
+
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.impala.analysis.Analyzer;
+import org.apache.impala.analysis.DescriptorTable;
+import org.apache.impala.analysis.Path;
+import org.apache.impala.analysis.TableRef;
+import org.apache.impala.analysis.TupleDescriptor;
+import org.apache.impala.analysis.TupleId;
+import org.apache.impala.catalog.FeDb;
+import org.apache.impala.catalog.FeFsTable;
+import org.apache.impala.catalog.HdfsFileFormat;
+import org.apache.impala.catalog.HdfsPartition;
+import org.apache.impala.common.FileSystemUtil;
+import org.apache.impala.common.FrontendTestBase;
+import org.apache.impala.common.ImpalaException;
+import org.apache.impala.thrift.TClientRequest;
+import org.apache.impala.thrift.TExplainLevel;
+
+import org.apache.impala.thrift.TQueryCtx;
+import org.apache.impala.thrift.TQueryOptions;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+/**
+ * Unit tests for validating explain plans. This class relies on mocking
+ * {@link ScanNode} objects (and other associated classes) to validate explain plan
+ * output.
+ */
+public class ExplainTest extends FrontendTestBase {
+
+  /**
+   * IMPALA-6050: Tests that explains plans for queries that read data from multiple
+   * filesystems (e.g. S3, ADLS, HDFS) accurately report the number of partitions and
+   * file read from each filesystem.
+   */
+  @Test
+  public void testScanNodeFsScheme() throws ImpalaException {
+    List<HdfsPartition> partitions = new ArrayList<>();
+
+    String dummyDbName = "dummy-db";
+    String dummyTblName = "dummy-tbl";
+    String dummyTblPath = "hdfs://localhost/" + dummyDbName + "." + dummyTblName;
+
+    FeDb mockDb = mock(FeDb.class);
+    when(mockDb.getName()).thenReturn(dummyDbName);
+    FeFsTable mockFeFsTable = createMockFeFsTable(partitions, dummyTblName, mockDb);
+    TupleDescriptor tupleDescriptor = createMockTupleDescriptor(mockFeFsTable);
+    TableRef mockTableRef = mock(TableRef.class);
+    when(mockTableRef.getTable()).thenReturn(mockFeFsTable);
+
+    partitions.add(createMockHdfsPartition("abfs://dummy-fs@dummy-account.dfs.core"
+            + ".windows.net/dummy-part-1",
+        FileSystemUtil.FsType.ADLS));
+    partitions.add(createMockHdfsPartition("abfs://dummy-fs@dummy-account.dfs.core"
+            + ".windows.net/dummy-part-2",
+        FileSystemUtil.FsType.ADLS));
+    partitions.add(createMockHdfsPartition("abfss://dummy-fs@dummy-account.dfs.core"
+            + ".windows.net/dummy-part-3",
+        FileSystemUtil.FsType.ADLS));
+    partitions.add(createMockHdfsPartition("abfss://dummy-fs@dummy-account.dfs.core"
+            + ".windows.net/dummy-part-4",
+        FileSystemUtil.FsType.ADLS));
+    partitions.add(createMockHdfsPartition("adl://dummy-account.azuredatalakestore"
+            + ".net/dummy-part-5",
+        FileSystemUtil.FsType.ADLS));
+    partitions.add(createMockHdfsPartition("adl://dummy-account.azuredatalakestore"
+            + ".net/dummy-part-6",
+        FileSystemUtil.FsType.ADLS));
+    partitions.add(createMockHdfsPartition(
+        "s3a://dummy-bucket/dummy-part-7", FileSystemUtil.FsType.S3));
+    partitions.add(createMockHdfsPartition(
+        "s3a://dummy-bucket/dummy-part-8", FileSystemUtil.FsType.S3));
+    partitions.add(createMockHdfsPartition(
+        dummyTblPath + "/dummy-part-9", FileSystemUtil.FsType.HDFS));
+    partitions.add(createMockHdfsPartition(
+        dummyTblPath + "/dummy-part-10", FileSystemUtil.FsType.HDFS));
+
+    HdfsScanNode hdfsScanNode =
+        new HdfsScanNode(PlanNodeId.createGenerator().getNextId(), tupleDescriptor,
+            new ArrayList<>(), partitions, mockTableRef, null, new ArrayList<>());
+
+    Analyzer mockAnalyer = createMockAnalyzer();
+
+    hdfsScanNode.init(mockAnalyer);
+
+    List<String> explainString =
+        Lists.newArrayList(Splitter.on('\n').omitEmptyStrings().trimResults().split(
+            hdfsScanNode.getNodeExplainString("", "", TExplainLevel.STANDARD)));
+
+    Assert.assertEquals(
+        "Scan node explain string not of expected size", 4, explainString.size());
+    Assert.assertTrue("Scan node explain string does not contain correct base table "
+            + "scheme",
+        explainString.get(0).contains("SCAN HDFS"));
+    Assert.assertTrue("Scan node explain string does not correct ADLS metadata",
+        explainString.get(1).contains("ADLS partitions=6/10 files=6 size=6B"));
+    Assert.assertTrue("Scan node explain string does not correct HDFS metadata",
+        explainString.get(2).contains("HDFS partitions=2/10 files=2 size=2B"));
+    Assert.assertTrue("Scan node explain string does not correct S3 metadata",
+        explainString.get(3).contains("S3 partitions=2/10 files=2 size=2B"));
+  }
+
+  private TupleDescriptor createMockTupleDescriptor(FeFsTable mockFeFsTable) {
+    TupleDescriptor tupleDescriptor = mock(TupleDescriptor.class);
+    when(tupleDescriptor.getTable()).thenReturn(mockFeFsTable);
+    when(tupleDescriptor.getId()).thenReturn(TupleId.createGenerator().getNextId());
+    when(tupleDescriptor.getPath()).thenReturn(mock(Path.class));
+    return tupleDescriptor;
+  }
+
+  private FeFsTable createMockFeFsTable(
+      List<HdfsPartition> partitions, String dummyTblName, FeDb mockDb) {
+    FeFsTable mockFeFsTable = mock(FeFsTable.class);
+    when(mockFeFsTable.getFsType()).thenReturn(FileSystemUtil.FsType.HDFS);
+    when(mockFeFsTable.getMetaStoreTable()).thenReturn(mock(Table.class));
+    doReturn(partitions).when(mockFeFsTable).getPartitions();
+    when(mockFeFsTable.getDb()).thenReturn(mockDb);
+    when(mockFeFsTable.getName()).thenReturn(dummyTblName);
+    return mockFeFsTable;
+  }
+
+  private HdfsPartition createMockHdfsPartition(
+      String path, FileSystemUtil.FsType fsType) {
+    HdfsPartition mockHdfsPartition = mock(HdfsPartition.class);
+
+    List<HdfsPartition.FileDescriptor> mockFilesDescs = new ArrayList<>();
+    HdfsPartition.FileDescriptor mockFileDesc = mock(HdfsPartition.FileDescriptor.class);
+    when(mockFileDesc.getFileLength()).thenReturn(1L);
+    when(mockFileDesc.getFileName()).thenReturn("");
+    mockFilesDescs.add(mockFileDesc);
+
+    when(mockHdfsPartition.getLocationPath())
+        .thenReturn(new org.apache.hadoop.fs.Path(path));
+    when(mockHdfsPartition.getFileDescriptors()).thenReturn(mockFilesDescs);
+    when(mockHdfsPartition.getFileFormat()).thenReturn(HdfsFileFormat.PARQUET);
+    when(mockHdfsPartition.getFsType()).thenReturn(fsType);
+    return mockHdfsPartition;
+  }
+
+  private Analyzer createMockAnalyzer() {
+    Analyzer mockAnalyer = mock(Analyzer.class);
+
+    TQueryCtx mockQueryCtx = mock(TQueryCtx.class);
+    TClientRequest tClientRequest = mock(TClientRequest.class);
+    when(tClientRequest.getQuery_options()).thenReturn(mock(TQueryOptions.class));
+    mockQueryCtx.client_request = tClientRequest;
+
+    DescriptorTable mockDescriptorTable = mock(DescriptorTable.class);
+    when(mockDescriptorTable.getTupleDesc(any())).thenReturn(mock(TupleDescriptor.class));
+
+    when(mockAnalyer.getQueryCtx()).thenReturn(mockQueryCtx);
+    when(mockAnalyer.getDescTbl()).thenReturn(mock(DescriptorTable.class));
+    when(mockAnalyer.getQueryOptions()).thenReturn(mock(TQueryOptions.class));
+    when(mockAnalyer.getDescTbl()).thenReturn(mockDescriptorTable);
+    when(mockAnalyer.getTupleDesc(any())).thenReturn(mock(TupleDescriptor.class));
+    return mockAnalyer;
+  }
+}
diff --git a/fe/src/test/java/org/apache/impala/planner/PlannerTest.java b/fe/src/test/java/org/apache/impala/planner/PlannerTest.java
index cf1b0d8..4223e6c 100644
--- a/fe/src/test/java/org/apache/impala/planner/PlannerTest.java
+++ b/fe/src/test/java/org/apache/impala/planner/PlannerTest.java
@@ -792,4 +792,18 @@ public class PlannerTest extends PlannerTestBase {
     assertEquals(" row-size= cardinality=10.3K",
         filter.transform(" row-size=10B cardinality=10.3K"));
   }
+
+  @Test
+  public void testScanNodeFsScheme() {
+    addTestTable("CREATE TABLE abfs_tbl (col int) LOCATION "
+        + "'abfs://dummy-fs@dummy-account.dfs.core.windows.net/abfs_tbl'");
+    addTestTable("CREATE TABLE abfss_tbl (col int) LOCATION "
+        + "'abfss://dummy-fs@dummy-account.dfs.core.windows.net/abfs_tbl'");
+    addTestTable("CREATE TABLE adl_tbl (col int) LOCATION "
+        + "'adl://dummy-account.azuredatalakestore.net/adl_tbl'");
+    addTestTable("CREATE TABLE s3a_tbl (col int) LOCATION "
+        + "'s3a://dummy-bucket/s3_tbl'");
+    runPlannerTestFile(
+        "scan-node-fs-scheme", ImmutableSet.of(PlannerTestOption.VALIDATE_SCAN_FS));
+  }
 }
diff --git a/fe/src/test/java/org/apache/impala/planner/PlannerTestBase.java b/fe/src/test/java/org/apache/impala/planner/PlannerTestBase.java
index 80afd92..2aa4069 100644
--- a/fe/src/test/java/org/apache/impala/planner/PlannerTestBase.java
+++ b/fe/src/test/java/org/apache/impala/planner/PlannerTestBase.java
@@ -541,6 +541,10 @@ public class PlannerTestBase extends FrontendTestBase {
         resultFilters.add(TestUtils.ROW_SIZE_FILTER);
         resultFilters.add(TestUtils.CARDINALITY_FILTER);
       }
+      if (!testOptions.contains(PlannerTestOption.VALIDATE_SCAN_FS)) {
+        resultFilters.add(TestUtils.SCAN_NODE_SCHEME_FILTER);
+      }
+
       String planDiff = TestUtils.compareOutput(
           Lists.newArrayList(explainStr.split("\n")), expectedPlan, true, resultFilters);
       if (!planDiff.isEmpty()) {
@@ -819,7 +823,18 @@ public class PlannerTestBase extends FrontendTestBase {
     // HMS table stats and key predicate selectivity. Enable this to test
     // the case when HBase key stats are unavailable (such as due to overly
     // restrictive key predicates).
-    DISABLE_HBASE_KEY_ESTIMATE
+    DISABLE_HBASE_KEY_ESTIMATE,
+    // Validate the filesystem schemes shown in the scan node plan. An example of a scan
+    // node profile that contains fs specific information is:
+    //
+    //   00:SCAN HDFS [functional.testtbl]
+    //     HDFS partitions=1/1 files=0 size=0B
+    //     S3 partitions=1/0 files=0 size=0B
+    //     ADLS partitions=1/0 files=0 size=0B
+    //
+    // By default, this flag is disabled. So tests will ignore the values of 'HDFS',
+    // 'S3', and 'ADLS' in the above explain plan.
+    VALIDATE_SCAN_FS
   }
 
   protected void runPlannerTestFile(String testFile, TQueryOptions options) {
diff --git a/fe/src/test/java/org/apache/impala/testutil/TestUtils.java b/fe/src/test/java/org/apache/impala/testutil/TestUtils.java
index de6c27c..25ab2d7 100644
--- a/fe/src/test/java/org/apache/impala/testutil/TestUtils.java
+++ b/fe/src/test/java/org/apache/impala/testutil/TestUtils.java
@@ -29,6 +29,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Scanner;
 import java.util.TimeZone;
+import java.util.regex.Pattern;
 
 import javax.json.Json;
 import javax.json.JsonObject;
@@ -93,7 +94,6 @@ public class TestUtils {
     }
   }
 
-
   /**
    * Filter to ignore the value from elements in the format key=value.
    */
@@ -121,6 +121,32 @@ public class TestUtils {
       return input.replaceAll(keyPrefix + valueRegex, keyPrefix);
     }
   }
+
+  /**
+   * Filter to ignore the filesystem schemes in the scan node explain output. See
+   * {@link org.apache.impala.planner.PlannerTestBase.PlannerTestOption#VALIDATE_SCAN_FS}
+   * for more details.
+   */
+  public static final ResultFilter SCAN_NODE_SCHEME_FILTER = new ResultFilter() {
+
+    private final String fsSchemes = "(HDFS|S3|LOCAL|ADLS)";
+    private final Pattern scanNodeFsScheme = Pattern.compile("SCAN " + fsSchemes);
+    // We don't match the size because the FILE_SIZE_FILTER could remove it
+    private final Pattern scanNodeInputMetadata =
+        Pattern.compile(fsSchemes + " partitions=\\d+/\\d+ files=\\d+ size=");
+
+    @Override
+    public boolean matches(String input) {
+      return scanNodeInputMetadata.matcher(input).find()
+          || scanNodeFsScheme.matcher(input).find();
+    }
+
+    @Override
+    public String transform(String input) {
+      return input.replaceAll(fsSchemes, "");
+    }
+  };
+
   // File size could vary from run to run. For example, the parquet file header size
   // or column metadata size could change if the Impala version changes. That doesn't
   // mean anything is wrong with the plan, so we want to filter the file size out.
@@ -166,9 +192,8 @@ public class TestUtils {
    *
    * @return an error message if actual does not match expected, "" otherwise.
    */
-  public static String compareOutput(
-      ArrayList<String> actual, ArrayList<String> expected, boolean orderMatters,
-      List<ResultFilter> lineFilters) {
+  public static String compareOutput(ArrayList<String> actual, ArrayList<String> expected,
+      boolean orderMatters, List<ResultFilter> lineFilters) {
     if (!orderMatters) {
       Collections.sort(actual);
       Collections.sort(expected);
@@ -177,14 +202,14 @@ public class TestUtils {
     int maxLen = Math.min(actual.size(), expected.size());
     outer:
     for (int i = 0; i < maxLen; ++i) {
-      String expectedStr = expected.get(i).trim();
+      String expectedStr = expected.get(i);
       String actualStr = actual.get(i);
       // Apply all default and caller-supplied filters to the expected and actual output.
       boolean containsPrefix = false;
       for (List<ResultFilter> filters:
           Arrays.<List<ResultFilter>>asList(DEFAULT_FILTERS, lineFilters)) {
         for (ResultFilter filter: filters) {
-          if (filter.matches(expectedStr)) {
+          if (filter.matches(expectedStr) || filter.matches(actualStr)) {
             containsPrefix = true;
             expectedStr = filter.transform(expectedStr);
             actualStr = filter.transform(actualStr);
diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/scan-node-fs-scheme.test b/testdata/workloads/functional-planner/queries/PlannerTest/scan-node-fs-scheme.test
new file mode 100644
index 0000000..1c8d58e
--- /dev/null
+++ b/testdata/workloads/functional-planner/queries/PlannerTest/scan-node-fs-scheme.test
@@ -0,0 +1,80 @@
+select * from functional.testtbl
+---- PLAN
+PLAN-ROOT SINK
+|
+00:SCAN HDFS [functional.testtbl]
+   HDFS partitions=1/1 files=0 size=0B
+   row-size= cardinality=
+---- DISTRIBUTEDPLAN
+PLAN-ROOT SINK
+|
+01:EXCHANGE [UNPARTITIONED]
+|
+00:SCAN HDFS [functional.testtbl]
+   HDFS partitions=1/1 files=0 size=0B
+   row-size= cardinality=
+====
+select * from s3a_tbl
+---- PLAN
+PLAN-ROOT SINK
+|
+00:SCAN S3 [default.s3a_tbl]
+   S3 partitions=1/0 files=0 size=0B
+   row-size= cardinality=
+---- DISTRIBUTEDPLAN
+PLAN-ROOT SINK
+|
+01:EXCHANGE [UNPARTITIONED]
+|
+00:SCAN S3 [default.s3a_tbl]
+   S3 partitions=1/0 files=0 size=0B
+   row-size= cardinality=
+====
+select * from adl_tbl
+---- PLAN
+PLAN-ROOT SINK
+|
+00:SCAN ADLS [default.adl_tbl]
+   ADLS partitions=1/0 files=0 size=0B
+   row-size= cardinality=
+---- DISTRIBUTEDPLAN
+PLAN-ROOT SINK
+|
+01:EXCHANGE [UNPARTITIONED]
+|
+00:SCAN ADLS [default.adl_tbl]
+   ADLS partitions=1/0 files=0 size=0B
+   row-size= cardinality=
+====
+select * from abfs_tbl
+---- PLAN
+PLAN-ROOT SINK
+|
+00:SCAN ADLS [default.abfs_tbl]
+   ADLS partitions=1/0 files=0 size=0B
+   row-size= cardinality=
+---- DISTRIBUTEDPLAN
+PLAN-ROOT SINK
+|
+01:EXCHANGE [UNPARTITIONED]
+|
+00:SCAN ADLS [default.abfs_tbl]
+   ADLS partitions=1/0 files=0 size=0B
+   row-size= cardinality=
+====
+select * from abfss_tbl
+---- PLAN
+PLAN-ROOT SINK
+|
+00:SCAN ADLS [default.abfss_tbl]
+   ADLS partitions=1/0 files=0 size=0B
+   row-size= cardinality=
+---- DISTRIBUTEDPLAN
+PLAN-ROOT SINK
+|
+01:EXCHANGE [UNPARTITIONED]
+|
+00:SCAN ADLS [default.abfss_tbl]
+   ADLS partitions=1/0 files=0 size=0B
+   row-size= cardinality=
+====
diff --git a/testdata/workloads/functional-query/queries/QueryTest/partition-col-types.test b/testdata/workloads/functional-query/queries/QueryTest/partition-col-types.test
index cd98972..c9fc6d3 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/partition-col-types.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/partition-col-types.test
@@ -83,8 +83,8 @@ string_col in ('1', '2', '3')
 ---- RESULTS: VERIFY_IS_SUBSET
 '01:EXCHANGE [UNPARTITIONED]'
 '|'
-'00:SCAN HDFS [$DATABASE.all_insert_partition_col_types]'
-'   partitions=3/11 files=3 size=6B'
+'00:SCAN $FILESYSTEM_NAME [$DATABASE.all_insert_partition_col_types]'
+'   $FILESYSTEM_NAME partitions=3/11 files=3 size=6B'
 ====
 ---- QUERY
 EXPLAIN
@@ -93,8 +93,8 @@ WHERE tinyint_col < 7
 ---- RESULTS: VERIFY_IS_SUBSET
 '01:EXCHANGE [UNPARTITIONED]'
 '|'
-'00:SCAN HDFS [$DATABASE.all_insert_partition_col_types]'
-'   partitions=7/11 files=7 size=14B'
+'00:SCAN $FILESYSTEM_NAME [$DATABASE.all_insert_partition_col_types]'
+'   $FILESYSTEM_NAME partitions=7/11 files=7 size=14B'
 ====
 ---- QUERY
 EXPLAIN
@@ -103,8 +103,8 @@ WHERE smallint_col < 6
 ---- RESULTS: VERIFY_IS_SUBSET
 '01:EXCHANGE [UNPARTITIONED]'
 '|'
-'00:SCAN HDFS [$DATABASE.all_insert_partition_col_types]'
-'   partitions=6/11 files=6 size=12B'
+'00:SCAN $FILESYSTEM_NAME [$DATABASE.all_insert_partition_col_types]'
+'   $FILESYSTEM_NAME partitions=6/11 files=6 size=12B'
 ====
 ---- QUERY
 EXPLAIN
@@ -113,8 +113,8 @@ WHERE int_col < 5
 ---- RESULTS: VERIFY_IS_SUBSET
 '01:EXCHANGE [UNPARTITIONED]'
 '|'
-'00:SCAN HDFS [$DATABASE.all_insert_partition_col_types]'
-'   partitions=5/11 files=5 size=10B'
+'00:SCAN $FILESYSTEM_NAME [$DATABASE.all_insert_partition_col_types]'
+'   $FILESYSTEM_NAME partitions=5/11 files=5 size=10B'
 ====
 ---- QUERY
 EXPLAIN
@@ -123,8 +123,8 @@ WHERE bigint_col < 40
 ---- RESULTS: VERIFY_IS_SUBSET
 '01:EXCHANGE [UNPARTITIONED]'
 '|'
-'00:SCAN HDFS [$DATABASE.all_insert_partition_col_types]'
-'   partitions=4/11 files=4 size=8B'
+'00:SCAN $FILESYSTEM_NAME [$DATABASE.all_insert_partition_col_types]'
+'   $FILESYSTEM_NAME partitions=4/11 files=4 size=8B'
 ====
 ---- QUERY
 EXPLAIN
@@ -133,8 +133,8 @@ WHERE string_col in ('1', '2', '3')
 ---- RESULTS: VERIFY_IS_SUBSET
 '01:EXCHANGE [UNPARTITIONED]'
 '|'
-'00:SCAN HDFS [$DATABASE.all_insert_partition_col_types]'
-'   partitions=3/11 files=3 size=6B'
+'00:SCAN $FILESYSTEM_NAME [$DATABASE.all_insert_partition_col_types]'
+'   $FILESYSTEM_NAME partitions=3/11 files=3 size=6B'
 ====
 ---- QUERY
 EXPLAIN
@@ -143,8 +143,8 @@ WHERE double_col = 1.1
 ---- RESULTS: VERIFY_IS_SUBSET
 '01:EXCHANGE [UNPARTITIONED]'
 '|'
-'00:SCAN HDFS [$DATABASE.all_insert_partition_col_types]'
-'   partitions=10/11 files=10 size=20B'
+'00:SCAN $FILESYSTEM_NAME [$DATABASE.all_insert_partition_col_types]'
+'   $FILESYSTEM_NAME partitions=10/11 files=10 size=20B'
 ====
 ---- QUERY
 EXPLAIN
@@ -153,8 +153,8 @@ WHERE float_col = 2
 ---- RESULTS: VERIFY_IS_SUBSET
 '01:EXCHANGE [UNPARTITIONED]'
 '|'
-'00:SCAN HDFS [$DATABASE.all_insert_partition_col_types]'
-'   partitions=1/11 files=1 size=3B'
+'00:SCAN $FILESYSTEM_NAME [$DATABASE.all_insert_partition_col_types]'
+'   $FILESYSTEM_NAME partitions=1/11 files=1 size=3B'
 ====
 ---- QUERY
 # Create a table with all supported partition key column types. TIMESTAMP is not
@@ -208,8 +208,8 @@ WHERE bool_col=false
 ---- RESULTS: VERIFY_IS_SUBSET
 '01:EXCHANGE [UNPARTITIONED]'
 '|'
-'00:SCAN HDFS [$DATABASE.all_partition_col_types]'
-'   partitions=1/2 files=1 size=2B'
+'00:SCAN $FILESYSTEM_NAME [$DATABASE.all_partition_col_types]'
+'   $FILESYSTEM_NAME partitions=1/2 files=1 size=2B'
 ====
 ---- QUERY
 DROP TABLE all_partition_col_types;
@@ -237,8 +237,8 @@ WHERE decimal_col = 4.34
 ---- RESULTS: VERIFY_IS_SUBSET
 '01:EXCHANGE [UNPARTITIONED]'
 '|'
-'00:SCAN HDFS [$DATABASE.test_dec_partition]'
-'   partitions=1/1 files=1 size=9B'
+'00:SCAN $FILESYSTEM_NAME [$DATABASE.test_dec_partition]'
+'   $FILESYSTEM_NAME partitions=1/1 files=1 size=9B'
 ====
 ---- QUERY
 EXPLAIN
@@ -247,8 +247,8 @@ WHERE decimal_col = 04.340
 ---- RESULTS: VERIFY_IS_SUBSET
 '01:EXCHANGE [UNPARTITIONED]'
 '|'
-'00:SCAN HDFS [$DATABASE.test_dec_partition]'
-'   partitions=1/1 files=1 size=9B'
+'00:SCAN $FILESYSTEM_NAME [$DATABASE.test_dec_partition]'
+'   $FILESYSTEM_NAME partitions=1/1 files=1 size=9B'
 ====
 ---- QUERY
 EXPLAIN
@@ -257,7 +257,7 @@ WHERE decimal_col = 4.35
 ---- RESULTS: VERIFY_IS_SUBSET
 '01:EXCHANGE [UNPARTITIONED]'
 '|'
-'00:SCAN HDFS [$DATABASE.test_dec_partition]'
+'00:SCAN $FILESYSTEM_NAME [$DATABASE.test_dec_partition]'
 '   partitions=0/1 files=0 size=0B'
 ====
 ---- QUERY
@@ -280,6 +280,6 @@ WHERE decimal_col = 8.68
 ---- RESULTS: VERIFY_IS_SUBSET
 '01:EXCHANGE [UNPARTITIONED]'
 '|'
-'00:SCAN HDFS [$DATABASE.test_dec_partition]'
-'   partitions=1/3 files=1 size=18B'
+'00:SCAN $FILESYSTEM_NAME [$DATABASE.test_dec_partition]'
+'   $FILESYSTEM_NAME partitions=1/3 files=1 size=18B'
 ====
diff --git a/tests/common/impala_test_suite.py b/tests/common/impala_test_suite.py
index 9600820..1a2fff6 100644
--- a/tests/common/impala_test_suite.py
+++ b/tests/common/impala_test_suite.py
@@ -47,7 +47,6 @@ from tests.common.test_dimensions import (
     get_dataset_from_workload,
     load_table_info_dimension)
 from tests.common.test_result_verifier import (
-    apply_error_match_filter,
     try_compile_regex,
     verify_raw_results,
     verify_runtime_profile)
@@ -61,7 +60,8 @@ from tests.util.filesystem_utils import (
     IS_ADLS,
     S3_BUCKET_NAME,
     ADLS_STORE_NAME,
-    FILESYSTEM_PREFIX)
+    FILESYSTEM_PREFIX,
+    FILESYSTEM_NAME)
 
 from tests.util.hdfs_util import (
   HdfsConfig,
@@ -111,6 +111,7 @@ COMMENT_LINES_REGEX = r'(?:\s*--.*\n)*'
 SET_PATTERN = re.compile(
     COMMENT_LINES_REGEX + r'\s*set\s*([a-zA-Z_][a-zA-Z0-9_]*)\s*=*', re.I)
 
+
 # Base class for Impala tests. All impala test cases should inherit from this class
 class ImpalaTestSuite(BaseTestSuite):
   @classmethod
@@ -156,7 +157,6 @@ class ImpalaTestSuite(BaseTestSuite):
       # HS2 connection can fail for benign reasons, e.g. running with unsupported auth.
       LOG.info("HS2 connection setup failed, continuing...: {0}", e)
 
-
     # Default query options are populated on demand.
     cls.default_query_options = {}
 
@@ -226,7 +226,7 @@ class ImpalaTestSuite(BaseTestSuite):
       hdfs_client = get_hdfs_client_from_conf(HDFS_CONF)
     else:
       host, port = pytest.config.option.namenode_http_address.split(":")
-      hdfs_client =  get_hdfs_client(host, port)
+      hdfs_client = get_hdfs_client(host, port)
     return hdfs_client
 
   @classmethod
@@ -309,6 +309,7 @@ class ImpalaTestSuite(BaseTestSuite):
       # So, allow both $NAMENODE and $FILESYSTEM_PREFIX to be used in CATCH.
       expected_str = expected_str.strip() \
           .replace('$FILESYSTEM_PREFIX', FILESYSTEM_PREFIX) \
+          .replace('$FILESYSTEM_NAME', FILESYSTEM_NAME) \
           .replace('$NAMENODE', NAMENODE) \
           .replace('$IMPALA_HOME', IMPALA_HOME)
       if use_db: expected_str = expected_str.replace('$DATABASE', use_db)
@@ -337,7 +338,8 @@ class ImpalaTestSuite(BaseTestSuite):
         test_section[section_name] = test_section[section_name] \
                                      .replace('$NAMENODE', NAMENODE) \
                                      .replace('$IMPALA_HOME', IMPALA_HOME) \
-                                     .replace('$USER', getuser())
+                                     .replace('$USER', getuser()) \
+                                     .replace('$FILESYSTEM_NAME', FILESYSTEM_NAME)
         if use_db:
           test_section[section_name] = test_section[section_name].replace('$DATABASE', use_db)
     result_section, type_section = 'RESULTS', 'TYPES'
@@ -413,6 +415,7 @@ class ImpalaTestSuite(BaseTestSuite):
           "SHELL test sections can't contain other sections"
         cmd = test_section['SHELL']\
           .replace('$FILESYSTEM_PREFIX', FILESYSTEM_PREFIX)\
+          .replace('$FILESYSTEM_NAME', FILESYSTEM_NAME)\
           .replace('$IMPALA_HOME', IMPALA_HOME)
         if use_db: cmd = cmd.replace('$DATABASE', use_db)
         LOG.info("Shell command: " + cmd)
@@ -431,13 +434,14 @@ class ImpalaTestSuite(BaseTestSuite):
           .replace('$GROUP_NAME', group_name)
           .replace('$IMPALA_HOME', IMPALA_HOME)
           .replace('$FILESYSTEM_PREFIX', FILESYSTEM_PREFIX)
+          .replace('$FILESYSTEM_NAME', FILESYSTEM_NAME)
           .replace('$SECONDARY_FILESYSTEM', os.getenv("SECONDARY_FILESYSTEM") or str())
           .replace('$USER', getuser()))
       if use_db: query = query.replace('$DATABASE', use_db)
 
-      reserved_keywords = ["$DATABASE", "$FILESYSTEM_PREFIX", "$GROUP_NAME",
-                           "$IMPALA_HOME", "$NAMENODE", "$QUERY", "$SECONDARY_FILESYSTEM",
-                           "$USER"]
+      reserved_keywords = ["$DATABASE", "$FILESYSTEM_PREFIX", "$FILESYSTEM_NAME",
+                           "$GROUP_NAME", "$IMPALA_HOME", "$NAMENODE", "$QUERY",
+                           "$SECONDARY_FILESYSTEM", "$USER"]
 
       if test_file_vars:
         for key, value in test_file_vars.iteritems():
@@ -482,6 +486,7 @@ class ImpalaTestSuite(BaseTestSuite):
       if 'CATCH' in test_section and '__NO_ERROR__' not in test_section['CATCH']:
         expected_str = " or ".join(test_section['CATCH']).strip() \
           .replace('$FILESYSTEM_PREFIX', FILESYSTEM_PREFIX) \
+          .replace('$FILESYSTEM_NAME', FILESYSTEM_NAME) \
           .replace('$NAMENODE', NAMENODE) \
           .replace('$IMPALA_HOME', IMPALA_HOME)
         assert False, "Expected exception: %s" % expected_str
diff --git a/tests/util/filesystem_utils.py b/tests/util/filesystem_utils.py
index a7b0809..76212f6 100644
--- a/tests/util/filesystem_utils.py
+++ b/tests/util/filesystem_utils.py
@@ -53,14 +53,28 @@ ADLS_CLIENT_ID = os.getenv("azure_client_id")
 ADLS_TENANT_ID = os.getenv("azure_tenant_id")
 ADLS_CLIENT_SECRET = os.getenv("azure_client_secret")
 
+# A map of FILESYSTEM values to their corresponding Scan Node types
+fs_to_name = {'s3': 'S3', 'hdfs': 'HDFS', 'local': 'LOCAL', 'adls': 'ADLS',
+              'abfs': 'ADLS'}
+
+
+def get_fs_name(fs):
+ """Given the target filesystem, return the name of the associated storage layer"""
+ return fs_to_name[fs]
+
+
 def prepend_with_fs(fs, path):
   """Prepend 'path' with 'fs' if it's not already the prefix."""
   return path if path.startswith(fs) else "%s%s" % (fs, path)
 
+
 def get_fs_path(path):
   return prepend_with_fs(FILESYSTEM_PREFIX, path)
 
+
 def get_secondary_fs_path(path):
   return prepend_with_fs(SECONDARY_FILESYSTEM, path)
 
+
 WAREHOUSE = get_fs_path('/test-warehouse')
+FILESYSTEM_NAME = get_fs_name(FILESYSTEM)