You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by to...@apache.org on 2018/08/07 18:10:07 UTC

[4/5] impala git commit: IMPALA-7307 (part 2). Support TABLESAMPLE in LocalCatalog

IMPALA-7307 (part 2). Support TABLESAMPLE in LocalCatalog

Tested with 'run-tests.py -k tablesample' and the tests passed.

Change-Id: I2f7baf05f16c6389ed900e0459708005ab44491e
Reviewed-on: http://gerrit.cloudera.org:8080/10972
Tested-by: Impala Public Jenkins <im...@cloudera.com>
Reviewed-by: Todd Lipcon <to...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/4a049c61
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/4a049c61
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/4a049c61

Branch: refs/heads/master
Commit: 4a049c61562e6fc5562e3cb41c78161ec32e74f0
Parents: aa26087
Author: Todd Lipcon <to...@cloudera.com>
Authored: Mon Jul 16 17:50:00 2018 -0700
Committer: Todd Lipcon <to...@apache.org>
Committed: Tue Aug 7 17:38:04 2018 +0000

----------------------------------------------------------------------
 .../impala/analysis/ComputeStatsStmt.java       |   2 +-
 .../org/apache/impala/catalog/FeFsTable.java    | 113 ++++++++++++++++---
 .../org/apache/impala/catalog/HdfsTable.java    |  82 --------------
 .../impala/catalog/local/LocalFsTable.java      |   9 --
 .../org/apache/impala/planner/HdfsScanNode.java |   3 +-
 5 files changed, 101 insertions(+), 108 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/4a049c61/fe/src/main/java/org/apache/impala/analysis/ComputeStatsStmt.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/analysis/ComputeStatsStmt.java b/fe/src/main/java/org/apache/impala/analysis/ComputeStatsStmt.java
index 098c862..3836e4b 100644
--- a/fe/src/main/java/org/apache/impala/analysis/ComputeStatsStmt.java
+++ b/fe/src/main/java/org/apache/impala/analysis/ComputeStatsStmt.java
@@ -616,7 +616,7 @@ public class ComputeStatsStmt extends StatementBase {
     // TODO(todd): can we avoid loading all the partitions for this?
     Collection<? extends FeFsPartition> partitions =
         FeCatalogUtils.loadAllPartitions(hdfsTable);
-    Map<Long, List<FileDescriptor>> sample = hdfsTable.getFilesSample(
+    Map<Long, List<FileDescriptor>> sample = FeFsTable.Utils.getFilesSample(hdfsTable,
         partitions, samplePerc, minSampleBytes, sampleSeed);
     long sampleFileBytes = 0;
     for (List<FileDescriptor> fds: sample.values()) {

http://git-wip-us.apache.org/repos/asf/impala/blob/4a049c61/fe/src/main/java/org/apache/impala/catalog/FeFsTable.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/catalog/FeFsTable.java b/fe/src/main/java/org/apache/impala/catalog/FeFsTable.java
index faaf5bd..19bebfc 100644
--- a/fe/src/main/java/org/apache/impala/catalog/FeFsTable.java
+++ b/fe/src/main/java/org/apache/impala/catalog/FeFsTable.java
@@ -17,9 +17,11 @@
 package org.apache.impala.catalog;
 
 import java.util.Collection;
+import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Random;
 import java.util.Set;
 import java.util.TreeMap;
 
@@ -32,6 +34,10 @@ import org.apache.impala.thrift.TResultSet;
 import org.apache.impala.thrift.TTableStats;
 import org.apache.impala.util.ListMap;
 
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+
 /**
  * Frontend interface for interacting with a filesystem-backed table.
  *
@@ -170,21 +176,6 @@ public interface FeFsTable extends FeTable {
   int parseSkipHeaderLineCount(StringBuilder error);
 
   /**
-   * Selects a random sample of files from the given list of partitions such that the sum
-   * of file sizes is at least 'percentBytes' percent of the total number of bytes in
-   * those partitions and at least 'minSampleBytes'. The sample is returned as a map from
-   * partition id to a list of file descriptors selected from that partition.
-   * This function allocates memory proportional to the number of files in 'inputParts'.
-   * Its implementation tries to minimize the constant factor and object generation.
-   * The given 'randomSeed' is used for random number generation.
-   * The 'percentBytes' parameter must be between 0 and 100.
-   */
-  Map<Long, List<FileDescriptor>> getFilesSample(
-      Collection<? extends FeFsPartition> inputParts,
-      long percentBytes, long minSampleBytes,
-      long randomSeed);
-
-  /**
    * @return the index of hosts that store replicas of blocks of this table.
    */
   ListMap<TNetworkAddress> getHostIndex();
@@ -231,5 +222,97 @@ public interface FeFsTable extends FeTable {
       double extrapolatedNumRows = fileBytes * rowsPerByte;
       return (long) Math.max(1, Math.round(extrapolatedNumRows));
     }
+
+    /**
+     * Selects a random sample of files from the given list of partitions such that the
+     * sum of file sizes is at least 'percentBytes' percent of the total number of bytes
+     * in those partitions and at least 'minSampleBytes'. The sample is returned as a map
+     * from partition id to a list of file descriptors selected from that partition.
+     *
+     * This function allocates memory proportional to the number of files in 'inputParts'.
+     * Its implementation tries to minimize the constant factor and object generation.
+     * The given 'randomSeed' is used for random number generation.
+     * The 'percentBytes' parameter must be between 0 and 100.
+     */
+    public static Map<Long, List<FileDescriptor>> getFilesSample(
+        FeFsTable table,
+        Collection<? extends FeFsPartition> inputParts,
+        long percentBytes, long minSampleBytes,
+        long randomSeed) {
+      Preconditions.checkState(percentBytes >= 0 && percentBytes <= 100);
+      Preconditions.checkState(minSampleBytes >= 0);
+
+      long totalNumFiles = 0;
+      for (FeFsPartition part : inputParts) {
+        totalNumFiles += part.getNumFileDescriptors();
+      }
+
+      // Conservative max size for Java arrays. The actual maximum varies
+      // from JVM version and sometimes between configurations.
+      final long JVM_MAX_ARRAY_SIZE = Integer.MAX_VALUE - 10;
+      if (totalNumFiles > JVM_MAX_ARRAY_SIZE) {
+        throw new IllegalStateException(String.format(
+            "Too many files to generate a table sample of table %s. " +
+            "Sample requested over %s files, but a maximum of %s files are supported.",
+            table.getTableName().toString(), totalNumFiles, JVM_MAX_ARRAY_SIZE));
+      }
+
+      // Ensure a consistent ordering of files for repeatable runs. The files within a
+      // partition are already ordered based on how they are loaded in the catalog.
+      List<FeFsPartition> orderedParts = Lists.newArrayList(inputParts);
+      Collections.sort(orderedParts, HdfsPartition.KV_COMPARATOR);
+
+      // fileIdxs contains indexes into the file descriptor lists of all inputParts
+      // parts[i] contains the partition corresponding to fileIdxs[i]
+      // fileIdxs[i] is an index into the file descriptor list of the partition parts[i]
+      // The purpose of these arrays is to efficiently avoid selecting the same file
+      // multiple times during the sampling, regardless of the sample percent.
+      // We purposely avoid generating objects proportional to the number of files.
+      int[] fileIdxs = new int[(int)totalNumFiles];
+      FeFsPartition[] parts = new FeFsPartition[(int)totalNumFiles];
+      int idx = 0;
+      long totalBytes = 0;
+      for (FeFsPartition part: orderedParts) {
+        totalBytes += part.getSize();
+        int numFds = part.getNumFileDescriptors();
+        for (int fileIdx = 0; fileIdx < numFds; ++fileIdx) {
+          fileIdxs[idx] = fileIdx;
+          parts[idx] = part;
+          ++idx;
+        }
+      }
+      if (idx != totalNumFiles) {
+        throw new AssertionError("partition file counts changed during iteration");
+      }
+
+      int numFilesRemaining = idx;
+      double fracPercentBytes = (double) percentBytes / 100;
+      long targetBytes = (long) Math.round(totalBytes * fracPercentBytes);
+      targetBytes = Math.max(targetBytes, minSampleBytes);
+
+      // Randomly select files until targetBytes has been reached or all files have been
+      // selected.
+      Random rnd = new Random(randomSeed);
+      long selectedBytes = 0;
+      Map<Long, List<FileDescriptor>> result = Maps.newHashMap();
+      while (selectedBytes < targetBytes && numFilesRemaining > 0) {
+        int selectedIdx = Math.abs(rnd.nextInt()) % numFilesRemaining;
+        FeFsPartition part = parts[selectedIdx];
+        Long partId = Long.valueOf(part.getId());
+        List<FileDescriptor> sampleFileIdxs = result.get(partId);
+        if (sampleFileIdxs == null) {
+          sampleFileIdxs = Lists.newArrayList();
+          result.put(partId, sampleFileIdxs);
+        }
+        FileDescriptor fd = part.getFileDescriptors().get(fileIdxs[selectedIdx]);
+        sampleFileIdxs.add(fd);
+        selectedBytes += fd.getFileLength();
+        // Avoid selecting the same file multiple times.
+        fileIdxs[selectedIdx] = fileIdxs[numFilesRemaining - 1];
+        parts[selectedIdx] = parts[numFilesRemaining - 1];
+        --numFilesRemaining;
+      }
+      return result;
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/impala/blob/4a049c61/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java b/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java
index b704a81..0b66f25 100644
--- a/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java
+++ b/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java
@@ -29,7 +29,6 @@ import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
-import java.util.Random;
 import java.util.Set;
 import java.util.TreeMap;
 import java.util.concurrent.Callable;
@@ -2131,87 +2130,6 @@ public class HdfsTable extends Table implements FeFsTable {
     addPartition(refreshedPartition);
   }
 
-  @Override // FeFsTable
-  public Map<Long, List<FileDescriptor>> getFilesSample(
-      Collection<? extends FeFsPartition> inputParts,
-      long percentBytes, long minSampleBytes,
-      long randomSeed) {
-    Preconditions.checkState(percentBytes >= 0 && percentBytes <= 100);
-    Preconditions.checkState(minSampleBytes >= 0);
-
-    long totalNumFiles = 0;
-    for (FeFsPartition part : inputParts) {
-      totalNumFiles += part.getNumFileDescriptors();
-    }
-
-    // Conservative max size for Java arrays. The actual maximum varies
-    // from JVM version and sometimes between configurations.
-    final long JVM_MAX_ARRAY_SIZE = Integer.MAX_VALUE - 10;
-    if (totalNumFiles > JVM_MAX_ARRAY_SIZE) {
-      throw new IllegalStateException(String.format(
-          "Too many files to generate a table sample of table %s. " +
-          "Sample requested over %s files, but a maximum of %s files are supported.",
-          getTableName().toString(), totalNumFiles, JVM_MAX_ARRAY_SIZE));
-    }
-
-    // Ensure a consistent ordering of files for repeatable runs. The files within a
-    // partition are already ordered based on how they are loaded in the catalog.
-    List<FeFsPartition> orderedParts = Lists.newArrayList(inputParts);
-    Collections.sort(orderedParts, HdfsPartition.KV_COMPARATOR);
-
-    // fileIdxs contains indexes into the file descriptor lists of all inputParts
-    // parts[i] contains the partition corresponding to fileIdxs[i]
-    // fileIdxs[i] is an index into the file descriptor list of the partition parts[i]
-    // The purpose of these arrays is to efficiently avoid selecting the same file
-    // multiple times during the sampling, regardless of the sample percent. We purposely
-    // avoid generating objects proportional to the number of files.
-    int[] fileIdxs = new int[(int)totalNumFiles];
-    FeFsPartition[] parts = new FeFsPartition[(int)totalNumFiles];
-    int idx = 0;
-    long totalBytes = 0;
-    for (FeFsPartition part: orderedParts) {
-      totalBytes += part.getSize();
-      int numFds = part.getNumFileDescriptors();
-      for (int fileIdx = 0; fileIdx < numFds; ++fileIdx) {
-        fileIdxs[idx] = fileIdx;
-        parts[idx] = part;
-        ++idx;
-      }
-    }
-    if (idx != totalNumFiles) {
-      throw new AssertionError("partition file counts changed during iteration");
-    }
-
-    int numFilesRemaining = idx;
-    double fracPercentBytes = (double) percentBytes / 100;
-    long targetBytes = (long) Math.round(totalBytes * fracPercentBytes);
-    targetBytes = Math.max(targetBytes, minSampleBytes);
-
-    // Randomly select files until targetBytes has been reached or all files have been
-    // selected.
-    Random rnd = new Random(randomSeed);
-    long selectedBytes = 0;
-    Map<Long, List<FileDescriptor>> result = Maps.newHashMap();
-    while (selectedBytes < targetBytes && numFilesRemaining > 0) {
-      int selectedIdx = Math.abs(rnd.nextInt()) % numFilesRemaining;
-      FeFsPartition part = parts[selectedIdx];
-      Long partId = Long.valueOf(part.getId());
-      List<FileDescriptor> sampleFileIdxs = result.get(partId);
-      if (sampleFileIdxs == null) {
-        sampleFileIdxs = Lists.newArrayList();
-        result.put(partId, sampleFileIdxs);
-      }
-      FileDescriptor fd = part.getFileDescriptors().get(fileIdxs[selectedIdx]);
-      sampleFileIdxs.add(fd);
-      selectedBytes += fd.getFileLength();
-      // Avoid selecting the same file multiple times.
-      fileIdxs[selectedIdx] = fileIdxs[numFilesRemaining - 1];
-      parts[selectedIdx] = parts[numFilesRemaining - 1];
-      --numFilesRemaining;
-    }
-    return result;
-  }
-
   /**
    * Registers table metrics.
    */

http://git-wip-us.apache.org/repos/asf/impala/blob/4a049c61/fe/src/main/java/org/apache/impala/catalog/local/LocalFsTable.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/catalog/local/LocalFsTable.java b/fe/src/main/java/org/apache/impala/catalog/local/LocalFsTable.java
index a47497a..7f016b5 100644
--- a/fe/src/main/java/org/apache/impala/catalog/local/LocalFsTable.java
+++ b/fe/src/main/java/org/apache/impala/catalog/local/LocalFsTable.java
@@ -43,7 +43,6 @@ import org.apache.impala.catalog.FeFsPartition;
 import org.apache.impala.catalog.FeFsTable;
 import org.apache.impala.catalog.HdfsFileFormat;
 import org.apache.impala.catalog.HdfsTable;
-import org.apache.impala.catalog.HdfsPartition.FileDescriptor;
 import org.apache.impala.catalog.PrunablePartition;
 import org.apache.impala.common.AnalysisException;
 import org.apache.impala.thrift.CatalogObjectsConstants;
@@ -544,14 +543,6 @@ public class LocalFsTable extends LocalTable implements FeFsTable {
   }
 
   @Override
-  public Map<Long, List<FileDescriptor>> getFilesSample(
-      Collection<? extends FeFsPartition> inputParts,
-      long percentBytes, long minSampleBytes, long randomSeed) {
-    // TODO(todd): implement me
-    return Collections.emptyMap();
-  }
-
-  @Override
   public ListMap<TNetworkAddress> getHostIndex() {
     return hostIndex_;
   }

http://git-wip-us.apache.org/repos/asf/impala/blob/4a049c61/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java b/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java
index 706093c..cb33556 100644
--- a/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java
+++ b/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java
@@ -757,7 +757,8 @@ public class HdfsScanNode extends ScanNode {
       // Pass a minimum sample size of 0 because users cannot set a minimum sample size
       // for scans directly. For compute stats, a minimum sample size can be set, and
       // the sampling percent is adjusted to reflect it.
-      sampledFiles = tbl_.getFilesSample(partitions_, percentBytes, 0, randomSeed);
+      sampledFiles = FeFsTable.Utils.getFilesSample(tbl_, partitions_, percentBytes, 0,
+          randomSeed);
     }
 
     long scanRangeBytesLimit = analyzer.getQueryCtx().client_request.getQuery_options()