You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by st...@apache.org on 2023/09/05 01:59:53 UTC

[impala] 01/03: IMPALA-12408: Optimize HdfsScanNode.computeScanRangeLocations()

This is an automated email from the ASF dual-hosted git repository.

stigahuang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit c49f5d2778d10e988ab4d926e3326de043c20fe1
Author: Csaba Ringhofer <cs...@cloudera.com>
AuthorDate: Tue Aug 29 16:37:42 2023 +0200

    IMPALA-12408: Optimize HdfsScanNode.computeScanRangeLocations()
    
    computeScanRangeLocations() could be very slow for tables
    with large number of partitions. This patch tries to minimize
    the use of two expensive function calls:
    1. HdfsPartition.getLocation()
      - This looks like a simple property but actually decompresses
        the location string.
      - Was often called indirectly through getFsType().
      - After the patch it is only called once per partition.
    2. hadoop.fs.FileSystem.getFileSystem()
      - Hadoop caches the FileSystem object but the key contains
        UserGroupInformation which is obtained with
        UserGroupInformation.getCurrentUser(), making the call costly.
      - As the user is always the same during Impala planning we can cache
        it simply by scheme + authority part of the location URI. After
        the patch getFileSystem() is called if scheme/authority is
        different than in the previous partition, leading to a single call
        for most tables.
    
    Note that caching these values in HdfsPartition could also help
    but preferred to avoid increasing the size of that class.
    
    The patch also changes the implementation of how we count the number
    of partitions per file system (to avoid the extra calls to
    getFsType()). This made class SampledPartitionMetadata unnecessary and
    reverted some of the changes in https://gerrit.cloudera.org/#/c/12282/
    
    Benchmarks:
    Measured using tpcds.store_sales (1824 partitions)
    union all'd 256 times:
    explain select * from tpcds_parquet.store_sales256;
    Before patch: 8.8s
    After patch: 1.1s
    
    The improvement is also visible on full tpcds benchmark:
    +----------+-----------------------+---------+------------+------------+----------------+
    | Workload | File Format           | Avg (s) | Delta(Avg) | GeoMean(s) | Delta(GeoMean) |
    +----------+-----------------------+---------+------------+------------+----------------+
    | TPCDS(2) | parquet / none / none | 0.53    | -8.99%     | 0.29       | -10.78%        |
    +----------+-----------------------+---------+------------+------------+----------------+
    The effect is less significant on higher scale factors.
    
    Testing:
    - ran core tests
    
    Change-Id: Icf3e9c169d65c15df6a6762cc68fbb477fe64a7c
    Reviewed-on: http://gerrit.cloudera.org:8080/20434
    Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 .../apache/impala/analysis/ComputeStatsStmt.java   |   5 +-
 .../java/org/apache/impala/catalog/FeFsTable.java  |   9 +-
 .../org/apache/impala/catalog/HdfsPartition.java   |   7 +-
 .../impala/catalog/local/LocalFsPartition.java     |   7 +-
 .../org/apache/impala/planner/HdfsScanNode.java    | 133 ++++++++++-----------
 .../org/apache/impala/planner/IcebergScanNode.java |   8 +-
 6 files changed, 83 insertions(+), 86 deletions(-)

diff --git a/fe/src/main/java/org/apache/impala/analysis/ComputeStatsStmt.java b/fe/src/main/java/org/apache/impala/analysis/ComputeStatsStmt.java
index fe2ada49e..721b5ec9d 100644
--- a/fe/src/main/java/org/apache/impala/analysis/ComputeStatsStmt.java
+++ b/fe/src/main/java/org/apache/impala/analysis/ComputeStatsStmt.java
@@ -830,9 +830,8 @@ public class ComputeStatsStmt extends StatementBase {
     // TODO(todd): can we avoid loading all the partitions for this?
     Collection<? extends FeFsPartition> partitions =
         FeCatalogUtils.loadAllPartitions(hdfsTable);
-    Map<HdfsScanNode.SampledPartitionMetadata, List<FileDescriptor>> sample =
-            FeFsTable.Utils.getFilesSample(hdfsTable,
-        partitions, samplePerc, minSampleBytes, sampleSeed);
+    Map<Long, List<FileDescriptor>> sample = FeFsTable.Utils.getFilesSample(
+        hdfsTable, partitions, samplePerc, minSampleBytes, sampleSeed);
     long sampleFileBytes = 0;
     for (List<FileDescriptor> fds: sample.values()) {
       for (FileDescriptor fd: fds) sampleFileBytes += fd.getFileLength();
diff --git a/fe/src/main/java/org/apache/impala/catalog/FeFsTable.java b/fe/src/main/java/org/apache/impala/catalog/FeFsTable.java
index e2fa4e713..06e053258 100644
--- a/fe/src/main/java/org/apache/impala/catalog/FeFsTable.java
+++ b/fe/src/main/java/org/apache/impala/catalog/FeFsTable.java
@@ -482,7 +482,7 @@ public interface FeFsTable extends FeTable {
      *
      * TODO(IMPALA-9883): Fix this for full ACID tables.
      */
-    public static Map<HdfsScanNode.SampledPartitionMetadata, List<FileDescriptor>>
+    public static Map<Long, List<FileDescriptor>>
         getFilesSample(FeFsTable table, Collection<? extends FeFsPartition> inputParts,
             long percentBytes, long minSampleBytes, long randomSeed) {
       Preconditions.checkState(percentBytes >= 0 && percentBytes <= 100);
@@ -540,15 +540,14 @@ public interface FeFsTable extends FeTable {
       // selected.
       Random rnd = new Random(randomSeed);
       long selectedBytes = 0;
-      Map<HdfsScanNode.SampledPartitionMetadata, List<FileDescriptor>> result =
+      Map<Long, List<FileDescriptor>> result =
           new HashMap<>();
       while (selectedBytes < targetBytes && numFilesRemaining > 0) {
         int selectedIdx = Math.abs(rnd.nextInt()) % numFilesRemaining;
         FeFsPartition part = parts[selectedIdx];
-        HdfsScanNode.SampledPartitionMetadata sampledPartitionMetadata =
-            new HdfsScanNode.SampledPartitionMetadata(part.getId(), part.getFsType());
+        Long partId = Long.valueOf(part.getId());
         List<FileDescriptor> sampleFileIdxs = result.computeIfAbsent(
-            sampledPartitionMetadata, partMetadata -> Lists.newArrayList());
+            partId, id -> Lists.newArrayList());
         FileDescriptor fd = part.getFileDescriptors().get(fileIdxs[selectedIdx]);
         sampleFileIdxs.add(fd);
         selectedBytes += fd.getFileLength();
diff --git a/fe/src/main/java/org/apache/impala/catalog/HdfsPartition.java b/fe/src/main/java/org/apache/impala/catalog/HdfsPartition.java
index 8afd439cb..ca77e75cc 100644
--- a/fe/src/main/java/org/apache/impala/catalog/HdfsPartition.java
+++ b/fe/src/main/java/org/apache/impala/catalog/HdfsPartition.java
@@ -909,9 +909,10 @@ public class HdfsPartition extends CatalogObjectImpl
 
   @Override
   public FileSystemUtil.FsType getFsType() {
-    Preconditions.checkNotNull(getLocationPath().toUri().getScheme(),
-        "Cannot get scheme from path " + getLocationPath());
-    return FileSystemUtil.FsType.getFsType(getLocationPath().toUri().getScheme());
+    Path location = getLocationPath();
+    Preconditions.checkNotNull(location.toUri().getScheme(),
+        "Cannot get scheme from path " + location);
+    return FileSystemUtil.FsType.getFsType(location.toUri().getScheme());
   }
 
   @Override // FeFsPartition
diff --git a/fe/src/main/java/org/apache/impala/catalog/local/LocalFsPartition.java b/fe/src/main/java/org/apache/impala/catalog/local/LocalFsPartition.java
index c44345577..01827556b 100644
--- a/fe/src/main/java/org/apache/impala/catalog/local/LocalFsPartition.java
+++ b/fe/src/main/java/org/apache/impala/catalog/local/LocalFsPartition.java
@@ -122,9 +122,10 @@ public class LocalFsPartition implements FeFsPartition {
 
   @Override
   public FileSystemUtil.FsType getFsType() {
-    Preconditions.checkNotNull(getLocationPath().toUri().getScheme(),
-        "Cannot get scheme from path " + getLocationPath());
-    return FileSystemUtil.FsType.getFsType(getLocationPath().toUri().getScheme());
+    Path location = getLocationPath();
+    Preconditions.checkNotNull(location.toUri().getScheme(),
+        "Cannot get scheme from path " + location);
+    return FileSystemUtil.FsType.getFsType(location.toUri().getScheme());
   }
 
   @Override
diff --git a/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java b/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java
index 41a230741..2da3b32e5 100644
--- a/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java
+++ b/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java
@@ -26,12 +26,14 @@ import java.util.HashSet;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 import java.util.Set;
 import java.util.TreeMap;
 import java.util.stream.Collectors;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
 import org.apache.impala.analysis.Analyzer;
 import org.apache.impala.analysis.BinaryPredicate;
 import org.apache.impala.analysis.DescriptorTable;
@@ -327,8 +329,8 @@ public class HdfsScanNode extends ScanNode {
   // this scan node has the count(*) optimization enabled.
   protected SlotDescriptor countStarSlot_ = null;
 
-  // Sampled file descriptors if table sampling is used.
-  Map<SampledPartitionMetadata, List<FileDescriptor>> sampledFiles_ = null;
+  // Sampled file descriptors if table sampling is used. Grouped by partition id.
+  Map<Long, List<FileDescriptor>> sampledFiles_ = null;
 
   // Conjuncts used to trim the set of partitions passed to this node.
   // Used only to display EXPLAIN information.
@@ -1092,37 +1094,6 @@ public class HdfsScanNode extends ScanNode {
     }
   }
 
-  /**
-   * A collection of metadata associated with a sampled partition. Unlike
-   * {@link FeFsPartition} this class is safe to use in hash-based data structures.
-   */
-  public static final class SampledPartitionMetadata {
-
-    private final long partitionId;
-    private final FileSystemUtil.FsType partitionFsType;
-
-    public SampledPartitionMetadata(
-        long partitionId, FileSystemUtil.FsType partitionFsType) {
-      this.partitionId = partitionId;
-      this.partitionFsType = partitionFsType;
-    }
-
-    @Override
-    public boolean equals(Object o) {
-      if (this == o) return true;
-      if (o == null || getClass() != o.getClass()) return false;
-      SampledPartitionMetadata that = (SampledPartitionMetadata) o;
-      return partitionId == that.partitionId && partitionFsType == that.partitionFsType;
-    }
-
-    @Override
-    public int hashCode() {
-      return java.util.Objects.hash(partitionId, partitionFsType);
-    }
-
-    private FileSystemUtil.FsType getPartitionFsType() { return partitionFsType; }
-  }
-
   /**
    * Computes scan ranges (i.e. hdfs splits) plus their storage locations, including
    * volume ids, based on the given maximum number of bytes each scan range should scan.
@@ -1152,14 +1123,7 @@ public class HdfsScanNode extends ScanNode {
         .getMax_scan_range_length();
     scanRangeSpecs_ = new TScanRangeSpec();
 
-    if (sampledFiles_ != null) {
-      numPartitionsPerFs_ = sampledFiles_.keySet().stream().collect(Collectors.groupingBy(
-          SampledPartitionMetadata::getPartitionFsType, Collectors.counting()));
-    } else {
-      numPartitionsPerFs_.putAll(partitions_.stream().collect(
-          Collectors.groupingBy(FeFsPartition::getFsType, Collectors.counting())));
-    }
-
+    numPartitionsPerFs_ = new TreeMap<>();
     totalFilesPerFs_ = new TreeMap<>();
     totalBytesPerFs_ = new TreeMap<>();
     totalFilesPerFsEC_ = new TreeMap<>();
@@ -1173,15 +1137,43 @@ public class HdfsScanNode extends ScanNode {
             .isOptimize_simple_limit()
         && analyzer.getSimpleLimitStatus() != null
         && analyzer.getSimpleLimitStatus().first);
+
+    // Save the last looked up FileSystem object. It is enough for the scheme and
+    // authority part of the URI to match to ensure that getFileSystem() would return the
+    // same file system. See Hadoop's filesystem caching implementation at
+    // https://github.com/apache/hadoop/blob/1046f9cf9888155c27923f3f56efa107d908ad5b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java#L3867
+    // Note that in the Hadoop code the slow part is UserGroupInformation.getCurrentUser()
+    // which is not important here as the user is always the same in Impala.
+    String lastFsScheme = null;
+    String lastFsAuthority = null;
+    FileSystem lastFileSytem = null;
     for (FeFsPartition partition: partitions_) {
-      // Missing disk id accounting is only done for file systems that support the notion
-      // of disk/storage ids.
+      // Save location to local variable beacuse getLocation() can be slow as it needs to
+      // decompress the partition's location.
+      String partitionLocation = partition.getLocation();
+      Path partitionPath = new Path(partitionLocation);
+      String fsScheme = partitionPath.toUri().getScheme();
+      String fsAuthority = partitionPath.toUri().getAuthority();
+      FileSystemUtil.FsType fsType = FileSystemUtil.FsType.getFsType(fsScheme);
+
       FileSystem partitionFs;
-      try {
-        partitionFs = partition.getLocationPath().getFileSystem(CONF);
-      } catch (IOException e) {
-        throw new ImpalaRuntimeException("Error determining partition fs type", e);
+      if (lastFileSytem != null &&
+         Objects.equals(lastFsScheme, fsScheme) &&
+         Objects.equals(lastFsAuthority, fsAuthority)) {
+        partitionFs = lastFileSytem;
+      } else {
+        try {
+          partitionFs = partitionPath.getFileSystem(CONF);
+        } catch (IOException e) {
+          throw new ImpalaRuntimeException("Error determining partition fs type", e);
+        }
+        lastFsScheme = fsScheme;
+        lastFsAuthority = fsAuthority;
+        lastFileSytem = partitionFs;
       }
+
+      // Missing disk id accounting is only done for file systems that support the notion
+      // of disk/storage ids.
       boolean fsHasBlocks = FileSystemUtil.supportsStorageIds(partitionFs);
       List<FileDescriptor> fileDescs = getFileDescriptorsWithLimit(partition, fsHasBlocks,
           isSimpleLimit ? analyzer.getSimpleLimitStatus().second - simpleLimitNumRows
@@ -1191,10 +1183,10 @@ public class HdfsScanNode extends ScanNode {
 
       if (sampledFiles_ != null) {
         // If we are sampling, check whether this partition is included in the sample.
-        fileDescs = sampledFiles_.get(
-            new SampledPartitionMetadata(partition.getId(), partition.getFsType()));
+        fileDescs = sampledFiles_.get(partition.getId());
         if (fileDescs == null) continue;
       }
+
       long partitionNumRows = partition.getNumRows();
 
       analyzer.getDescTbl().addReferencedPartition(tbl_, partition.getId());
@@ -1211,7 +1203,7 @@ public class HdfsScanNode extends ScanNode {
         // short-circuiting the scan for a partition key scan).
         long defaultBlockSize = (partition.getFileFormat().isParquetBased()) ?
             analyzer.getQueryOptions().parquet_object_store_split_size :
-            partitionFs.getDefaultBlockSize(partition.getLocationPath());
+            partitionFs.getDefaultBlockSize(partitionPath);
         long maxBlockSize =
             Math.max(defaultBlockSize, FileDescriptor.MIN_SYNTHETIC_BLOCK_SIZE);
         if (scanRangeBytesLimit > 0) {
@@ -1223,32 +1215,34 @@ public class HdfsScanNode extends ScanNode {
       final long partitionBytes = FileDescriptor.computeTotalFileLength(fileDescs);
       long partitionMaxScanRangeBytes = 0;
       boolean partitionMissingDiskIds = false;
-      totalBytesPerFs_.merge(partition.getFsType(), partitionBytes, Long::sum);
-      totalFilesPerFs_.merge(partition.getFsType(), (long) fileDescs.size(), Long::sum);
+      totalBytesPerFs_.merge(fsType, partitionBytes, Long::sum);
+      totalFilesPerFs_.merge(fsType, (long) fileDescs.size(), Long::sum);
+      numPartitionsPerFs_.merge(fsType, 1L, Long::sum);
 
       for (FileDescriptor fileDesc: fileDescs) {
         if (!analyzer.getQueryOptions().isAllow_erasure_coded_files() &&
             fileDesc.getIsEc()) {
           throw new ImpalaRuntimeException(String
               .format("Scanning of HDFS erasure-coded file (%s) is not supported",
-                  fileDesc.getAbsolutePath(partition.getLocation())));
+                  fileDesc.getAbsolutePath(partitionLocation)));
         }
 
         // Accumulate on the number of EC files and the total size of such files.
         if (fileDesc.getIsEc()) {
-          totalFilesPerFsEC_.merge(partition.getFsType(), 1L, Long::sum);
-          totalBytesPerFsEC_.merge(
-              partition.getFsType(), fileDesc.getFileLength(), Long::sum);
+          totalFilesPerFsEC_.merge(fsType, 1L, Long::sum);
+          totalBytesPerFsEC_.merge(fsType, fileDesc.getFileLength(), Long::sum);
         }
 
         if (!fsHasBlocks) {
           Preconditions.checkState(fileDesc.getNumFileBlocks() == 0);
-          generateScanRangeSpecs(partition, fileDesc, scanRangeBytesLimit);
+          generateScanRangeSpecs(
+              partition, partitionLocation, fileDesc, scanRangeBytesLimit);
         } else {
           // Skips files that have no associated blocks.
           if (fileDesc.getNumFileBlocks() == 0) continue;
           Pair<Boolean, Long> result = transformBlocksToScanRanges(
-              partition, fileDesc, fsHasBlocks, scanRangeBytesLimit, analyzer);
+              partition, partitionLocation, fsType, fileDesc, fsHasBlocks,
+              scanRangeBytesLimit, analyzer);
           partitionMaxScanRangeBytes =
               Math.max(partitionMaxScanRangeBytes, result.second);
           if (result.first) partitionMissingDiskIds = true;
@@ -1310,7 +1304,7 @@ public class HdfsScanNode extends ScanNode {
    * @param minSampleBytes minimum number of bytes to read.
    * @param randomSeed used for random number generation.
    */
-  protected Map<SampledPartitionMetadata, List<FileDescriptor>> getFilesSample(
+  protected Map<Long, List<FileDescriptor>> getFilesSample(
       long percentBytes, long minSampleBytes, long randomSeed) {
     return FeFsTable.Utils.getFilesSample(tbl_, partitions_, percentBytes, minSampleBytes,
         randomSeed);
@@ -1342,17 +1336,21 @@ public class HdfsScanNode extends ScanNode {
    * Used for file systems that do not have any physical attributes associated with
    * blocks (e.g., replica locations, caching, etc.). 'maxBlock' size determines how large
    * the scan ranges can be (may be ignored if the file is not splittable).
+   * Expects partition's location string in partitionLocation as getting it from
+   * FeFsPartition can be expensive.
    */
-  private void generateScanRangeSpecs(
-      FeFsPartition partition, FileDescriptor fileDesc, long maxBlockSize) {
+  private void generateScanRangeSpecs(FeFsPartition partition, String partitionLocation,
+      FileDescriptor fileDesc, long maxBlockSize) {
     Preconditions.checkArgument(fileDesc.getNumFileBlocks() == 0);
     Preconditions.checkArgument(maxBlockSize > 0);
     if (fileDesc.getFileLength() <= 0) return;
     boolean splittable = partition.getFileFormat().isSplittable(
         HdfsCompression.fromFileName(fileDesc.getPath()));
+    // Hashing must use String.hashCode() for consistency.
+    int partitionHash = partitionLocation.hashCode();
     TFileSplitGeneratorSpec splitSpec = new TFileSplitGeneratorSpec(
         fileDesc.toThrift(), maxBlockSize, splittable, partition.getId(),
-        partition.getLocation().hashCode());
+        partitionHash);
     scanRangeSpecs_.addToSplit_specs(splitSpec);
     long scanRangeBytes = Math.min(maxBlockSize, fileDesc.getFileLength());
     if (splittable && !isPartitionKeyScan_) {
@@ -1371,14 +1369,15 @@ public class HdfsScanNode extends ScanNode {
    * coordinator can assign ranges to workers to avoid remote reads. These
    * TScanRangeLocationLists are added to scanRanges_. A pair is returned that indicates
    * whether the file has a missing disk id and the maximum scan range (in bytes) found.
+   * Expects partition's location string in partitionLocation and filesystem type in
+   * fsType as getting these from FeFsPartition can be expensive.
    */
   private Pair<Boolean, Long> transformBlocksToScanRanges(FeFsPartition partition,
-      FileDescriptor fileDesc, boolean fsHasBlocks,
-      long scanRangeBytesLimit, Analyzer analyzer) {
+      String partitionLocation, FileSystemUtil.FsType fsType, FileDescriptor fileDesc,
+      boolean fsHasBlocks, long scanRangeBytesLimit, Analyzer analyzer) {
     Preconditions.checkArgument(fileDesc.getNumFileBlocks() > 0);
     boolean fileDescMissingDiskIds = false;
     long fileMaxScanRangeBytes = 0;
-    FileSystemUtil.FsType fsType = partition.getFsType();
     int i = 0;
     if (isPartitionKeyScan_ && (partition.getFileFormat().isParquetBased()
         || partition.getFileFormat() == HdfsFileFormat.ORC)) {
@@ -1430,7 +1429,7 @@ public class HdfsScanNode extends ScanNode {
         THdfsFileSplit hdfsFileSplit = new THdfsFileSplit(fileDesc.getRelativePath(),
             currentOffset, currentLength, partition.getId(), fileDesc.getFileLength(),
             fileDesc.getFileCompression().toThrift(), fileDesc.getModificationTime(),
-            partition.getLocation().hashCode());
+            partitionLocation.hashCode());
         hdfsFileSplit.setAbsolute_path(fileDesc.getAbsolutePath());
         hdfsFileSplit.setIs_encrypted(fileDesc.getIsEncrypted());
         hdfsFileSplit.setIs_erasure_coded(fileDesc.getIsEc());
@@ -1454,7 +1453,7 @@ public class HdfsScanNode extends ScanNode {
       ++numFilesNoDiskIds_;
       if (LOG.isTraceEnabled()) {
         LOG.trace("File blocks mapping to unknown disk ids. Dir: "
-            + partition.getLocation() + " File:" + fileDesc.toString());
+            + partitionLocation + " File:" + fileDesc.toString());
       }
     }
 
diff --git a/fe/src/main/java/org/apache/impala/planner/IcebergScanNode.java b/fe/src/main/java/org/apache/impala/planner/IcebergScanNode.java
index c2a028aac..c798c4160 100644
--- a/fe/src/main/java/org/apache/impala/planner/IcebergScanNode.java
+++ b/fe/src/main/java/org/apache/impala/planner/IcebergScanNode.java
@@ -180,7 +180,7 @@ public class IcebergScanNode extends HdfsScanNode {
    * The algorithm is based on FeFsTable.Utils.getFilesSample()
    */
   @Override
-  protected Map<SampledPartitionMetadata, List<FileDescriptor>> getFilesSample(
+  protected Map<Long, List<FileDescriptor>> getFilesSample(
       long percentBytes, long minSampleBytes, long randomSeed) {
     Preconditions.checkState(percentBytes >= 0 && percentBytes <= 100);
     Preconditions.checkState(minSampleBytes >= 0);
@@ -191,8 +191,6 @@ public class IcebergScanNode extends HdfsScanNode {
 
     Preconditions.checkState(partitions_.size() == 1);
     FeFsPartition part = partitions_.get(0);
-    SampledPartitionMetadata sampledPartitionMetadata =
-        new SampledPartitionMetadata(part.getId(), part.getFsType());
 
     long totalBytes = 0;
     for (FileDescriptor fd : orderedFds) {
@@ -218,8 +216,8 @@ public class IcebergScanNode extends HdfsScanNode {
       orderedFds.set(selectedIdx, orderedFds.get(numFilesRemaining - 1));
       --numFilesRemaining;
     }
-    Map<SampledPartitionMetadata, List<FileDescriptor>> result = new HashMap<>();
-    result.put(sampledPartitionMetadata, sampleFiles);
+    Map<Long, List<FileDescriptor>> result = new HashMap<>();
+    result.put(part.getId(), sampleFiles);
     return result;
   }