You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by to...@apache.org on 2018/08/07 18:10:07 UTC
[4/5] impala git commit: IMPALA-7307 (part 2). Support TABLESAMPLE in
LocalCatalog
IMPALA-7307 (part 2). Support TABLESAMPLE in LocalCatalog
Tested with 'run-tests.py -k tablesample' and the tests passed.
Change-Id: I2f7baf05f16c6389ed900e0459708005ab44491e
Reviewed-on: http://gerrit.cloudera.org:8080/10972
Tested-by: Impala Public Jenkins <im...@cloudera.com>
Reviewed-by: Todd Lipcon <to...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/4a049c61
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/4a049c61
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/4a049c61
Branch: refs/heads/master
Commit: 4a049c61562e6fc5562e3cb41c78161ec32e74f0
Parents: aa26087
Author: Todd Lipcon <to...@cloudera.com>
Authored: Mon Jul 16 17:50:00 2018 -0700
Committer: Todd Lipcon <to...@apache.org>
Committed: Tue Aug 7 17:38:04 2018 +0000
----------------------------------------------------------------------
.../impala/analysis/ComputeStatsStmt.java | 2 +-
.../org/apache/impala/catalog/FeFsTable.java | 113 ++++++++++++++++---
.../org/apache/impala/catalog/HdfsTable.java | 82 --------------
.../impala/catalog/local/LocalFsTable.java | 9 --
.../org/apache/impala/planner/HdfsScanNode.java | 3 +-
5 files changed, 101 insertions(+), 108 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/impala/blob/4a049c61/fe/src/main/java/org/apache/impala/analysis/ComputeStatsStmt.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/analysis/ComputeStatsStmt.java b/fe/src/main/java/org/apache/impala/analysis/ComputeStatsStmt.java
index 098c862..3836e4b 100644
--- a/fe/src/main/java/org/apache/impala/analysis/ComputeStatsStmt.java
+++ b/fe/src/main/java/org/apache/impala/analysis/ComputeStatsStmt.java
@@ -616,7 +616,7 @@ public class ComputeStatsStmt extends StatementBase {
// TODO(todd): can we avoid loading all the partitions for this?
Collection<? extends FeFsPartition> partitions =
FeCatalogUtils.loadAllPartitions(hdfsTable);
- Map<Long, List<FileDescriptor>> sample = hdfsTable.getFilesSample(
+ Map<Long, List<FileDescriptor>> sample = FeFsTable.Utils.getFilesSample(hdfsTable,
partitions, samplePerc, minSampleBytes, sampleSeed);
long sampleFileBytes = 0;
for (List<FileDescriptor> fds: sample.values()) {
http://git-wip-us.apache.org/repos/asf/impala/blob/4a049c61/fe/src/main/java/org/apache/impala/catalog/FeFsTable.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/catalog/FeFsTable.java b/fe/src/main/java/org/apache/impala/catalog/FeFsTable.java
index faaf5bd..19bebfc 100644
--- a/fe/src/main/java/org/apache/impala/catalog/FeFsTable.java
+++ b/fe/src/main/java/org/apache/impala/catalog/FeFsTable.java
@@ -17,9 +17,11 @@
package org.apache.impala.catalog;
import java.util.Collection;
+import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Random;
import java.util.Set;
import java.util.TreeMap;
@@ -32,6 +34,10 @@ import org.apache.impala.thrift.TResultSet;
import org.apache.impala.thrift.TTableStats;
import org.apache.impala.util.ListMap;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+
/**
* Frontend interface for interacting with a filesystem-backed table.
*
@@ -170,21 +176,6 @@ public interface FeFsTable extends FeTable {
int parseSkipHeaderLineCount(StringBuilder error);
/**
- * Selects a random sample of files from the given list of partitions such that the sum
- * of file sizes is at least 'percentBytes' percent of the total number of bytes in
- * those partitions and at least 'minSampleBytes'. The sample is returned as a map from
- * partition id to a list of file descriptors selected from that partition.
- * This function allocates memory proportional to the number of files in 'inputParts'.
- * Its implementation tries to minimize the constant factor and object generation.
- * The given 'randomSeed' is used for random number generation.
- * The 'percentBytes' parameter must be between 0 and 100.
- */
- Map<Long, List<FileDescriptor>> getFilesSample(
- Collection<? extends FeFsPartition> inputParts,
- long percentBytes, long minSampleBytes,
- long randomSeed);
-
- /**
* @return the index of hosts that store replicas of blocks of this table.
*/
ListMap<TNetworkAddress> getHostIndex();
@@ -231,5 +222,97 @@ public interface FeFsTable extends FeTable {
double extrapolatedNumRows = fileBytes * rowsPerByte;
return (long) Math.max(1, Math.round(extrapolatedNumRows));
}
+
+ /**
+ * Selects a random sample of files from the given list of partitions such that the
+ * sum of file sizes is at least 'percentBytes' percent of the total number of bytes
+ * in those partitions and at least 'minSampleBytes'. The sample is returned as a map
+ * from partition id to a list of file descriptors selected from that partition.
+ *
+ * This function allocates memory proportional to the number of files in 'inputParts'.
+ * Its implementation tries to minimize the constant factor and object generation.
+ * The given 'randomSeed' is used for random number generation.
+ * The 'percentBytes' parameter must be between 0 and 100.
+ */
+ public static Map<Long, List<FileDescriptor>> getFilesSample(
+ FeFsTable table,
+ Collection<? extends FeFsPartition> inputParts,
+ long percentBytes, long minSampleBytes,
+ long randomSeed) {
+ Preconditions.checkState(percentBytes >= 0 && percentBytes <= 100);
+ Preconditions.checkState(minSampleBytes >= 0);
+
+ long totalNumFiles = 0;
+ for (FeFsPartition part : inputParts) {
+ totalNumFiles += part.getNumFileDescriptors();
+ }
+
+ // Conservative max size for Java arrays. The actual maximum varies
+ // from JVM version and sometimes between configurations.
+ final long JVM_MAX_ARRAY_SIZE = Integer.MAX_VALUE - 10;
+ if (totalNumFiles > JVM_MAX_ARRAY_SIZE) {
+ throw new IllegalStateException(String.format(
+ "Too many files to generate a table sample of table %s. " +
+ "Sample requested over %s files, but a maximum of %s files are supported.",
+ table.getTableName().toString(), totalNumFiles, JVM_MAX_ARRAY_SIZE));
+ }
+
+ // Ensure a consistent ordering of files for repeatable runs. The files within a
+ // partition are already ordered based on how they are loaded in the catalog.
+ List<FeFsPartition> orderedParts = Lists.newArrayList(inputParts);
+ Collections.sort(orderedParts, HdfsPartition.KV_COMPARATOR);
+
+ // fileIdxs contains indexes into the file descriptor lists of all inputParts
+ // parts[i] contains the partition corresponding to fileIdxs[i]
+ // fileIdxs[i] is an index into the file descriptor list of the partition parts[i]
+ // The purpose of these arrays is to efficiently avoid selecting the same file
+ // multiple times during the sampling, regardless of the sample percent.
+ // We purposely avoid generating objects proportional to the number of files.
+ int[] fileIdxs = new int[(int)totalNumFiles];
+ FeFsPartition[] parts = new FeFsPartition[(int)totalNumFiles];
+ int idx = 0;
+ long totalBytes = 0;
+ for (FeFsPartition part: orderedParts) {
+ totalBytes += part.getSize();
+ int numFds = part.getNumFileDescriptors();
+ for (int fileIdx = 0; fileIdx < numFds; ++fileIdx) {
+ fileIdxs[idx] = fileIdx;
+ parts[idx] = part;
+ ++idx;
+ }
+ }
+ if (idx != totalNumFiles) {
+ throw new AssertionError("partition file counts changed during iteration");
+ }
+
+ int numFilesRemaining = idx;
+ double fracPercentBytes = (double) percentBytes / 100;
+ long targetBytes = (long) Math.round(totalBytes * fracPercentBytes);
+ targetBytes = Math.max(targetBytes, minSampleBytes);
+
+ // Randomly select files until targetBytes has been reached or all files have been
+ // selected.
+ Random rnd = new Random(randomSeed);
+ long selectedBytes = 0;
+ Map<Long, List<FileDescriptor>> result = Maps.newHashMap();
+ while (selectedBytes < targetBytes && numFilesRemaining > 0) {
+ int selectedIdx = Math.abs(rnd.nextInt()) % numFilesRemaining;
+ FeFsPartition part = parts[selectedIdx];
+ Long partId = Long.valueOf(part.getId());
+ List<FileDescriptor> sampleFileIdxs = result.get(partId);
+ if (sampleFileIdxs == null) {
+ sampleFileIdxs = Lists.newArrayList();
+ result.put(partId, sampleFileIdxs);
+ }
+ FileDescriptor fd = part.getFileDescriptors().get(fileIdxs[selectedIdx]);
+ sampleFileIdxs.add(fd);
+ selectedBytes += fd.getFileLength();
+ // Avoid selecting the same file multiple times.
+ fileIdxs[selectedIdx] = fileIdxs[numFilesRemaining - 1];
+ parts[selectedIdx] = parts[numFilesRemaining - 1];
+ --numFilesRemaining;
+ }
+ return result;
+ }
}
}
http://git-wip-us.apache.org/repos/asf/impala/blob/4a049c61/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java b/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java
index b704a81..0b66f25 100644
--- a/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java
+++ b/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java
@@ -29,7 +29,6 @@ import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
-import java.util.Random;
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.Callable;
@@ -2131,87 +2130,6 @@ public class HdfsTable extends Table implements FeFsTable {
addPartition(refreshedPartition);
}
- @Override // FeFsTable
- public Map<Long, List<FileDescriptor>> getFilesSample(
- Collection<? extends FeFsPartition> inputParts,
- long percentBytes, long minSampleBytes,
- long randomSeed) {
- Preconditions.checkState(percentBytes >= 0 && percentBytes <= 100);
- Preconditions.checkState(minSampleBytes >= 0);
-
- long totalNumFiles = 0;
- for (FeFsPartition part : inputParts) {
- totalNumFiles += part.getNumFileDescriptors();
- }
-
- // Conservative max size for Java arrays. The actual maximum varies
- // from JVM version and sometimes between configurations.
- final long JVM_MAX_ARRAY_SIZE = Integer.MAX_VALUE - 10;
- if (totalNumFiles > JVM_MAX_ARRAY_SIZE) {
- throw new IllegalStateException(String.format(
- "Too many files to generate a table sample of table %s. " +
- "Sample requested over %s files, but a maximum of %s files are supported.",
- getTableName().toString(), totalNumFiles, JVM_MAX_ARRAY_SIZE));
- }
-
- // Ensure a consistent ordering of files for repeatable runs. The files within a
- // partition are already ordered based on how they are loaded in the catalog.
- List<FeFsPartition> orderedParts = Lists.newArrayList(inputParts);
- Collections.sort(orderedParts, HdfsPartition.KV_COMPARATOR);
-
- // fileIdxs contains indexes into the file descriptor lists of all inputParts
- // parts[i] contains the partition corresponding to fileIdxs[i]
- // fileIdxs[i] is an index into the file descriptor list of the partition parts[i]
- // The purpose of these arrays is to efficiently avoid selecting the same file
- // multiple times during the sampling, regardless of the sample percent. We purposely
- // avoid generating objects proportional to the number of files.
- int[] fileIdxs = new int[(int)totalNumFiles];
- FeFsPartition[] parts = new FeFsPartition[(int)totalNumFiles];
- int idx = 0;
- long totalBytes = 0;
- for (FeFsPartition part: orderedParts) {
- totalBytes += part.getSize();
- int numFds = part.getNumFileDescriptors();
- for (int fileIdx = 0; fileIdx < numFds; ++fileIdx) {
- fileIdxs[idx] = fileIdx;
- parts[idx] = part;
- ++idx;
- }
- }
- if (idx != totalNumFiles) {
- throw new AssertionError("partition file counts changed during iteration");
- }
-
- int numFilesRemaining = idx;
- double fracPercentBytes = (double) percentBytes / 100;
- long targetBytes = (long) Math.round(totalBytes * fracPercentBytes);
- targetBytes = Math.max(targetBytes, minSampleBytes);
-
- // Randomly select files until targetBytes has been reached or all files have been
- // selected.
- Random rnd = new Random(randomSeed);
- long selectedBytes = 0;
- Map<Long, List<FileDescriptor>> result = Maps.newHashMap();
- while (selectedBytes < targetBytes && numFilesRemaining > 0) {
- int selectedIdx = Math.abs(rnd.nextInt()) % numFilesRemaining;
- FeFsPartition part = parts[selectedIdx];
- Long partId = Long.valueOf(part.getId());
- List<FileDescriptor> sampleFileIdxs = result.get(partId);
- if (sampleFileIdxs == null) {
- sampleFileIdxs = Lists.newArrayList();
- result.put(partId, sampleFileIdxs);
- }
- FileDescriptor fd = part.getFileDescriptors().get(fileIdxs[selectedIdx]);
- sampleFileIdxs.add(fd);
- selectedBytes += fd.getFileLength();
- // Avoid selecting the same file multiple times.
- fileIdxs[selectedIdx] = fileIdxs[numFilesRemaining - 1];
- parts[selectedIdx] = parts[numFilesRemaining - 1];
- --numFilesRemaining;
- }
- return result;
- }
-
/**
* Registers table metrics.
*/
http://git-wip-us.apache.org/repos/asf/impala/blob/4a049c61/fe/src/main/java/org/apache/impala/catalog/local/LocalFsTable.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/catalog/local/LocalFsTable.java b/fe/src/main/java/org/apache/impala/catalog/local/LocalFsTable.java
index a47497a..7f016b5 100644
--- a/fe/src/main/java/org/apache/impala/catalog/local/LocalFsTable.java
+++ b/fe/src/main/java/org/apache/impala/catalog/local/LocalFsTable.java
@@ -43,7 +43,6 @@ import org.apache.impala.catalog.FeFsPartition;
import org.apache.impala.catalog.FeFsTable;
import org.apache.impala.catalog.HdfsFileFormat;
import org.apache.impala.catalog.HdfsTable;
-import org.apache.impala.catalog.HdfsPartition.FileDescriptor;
import org.apache.impala.catalog.PrunablePartition;
import org.apache.impala.common.AnalysisException;
import org.apache.impala.thrift.CatalogObjectsConstants;
@@ -544,14 +543,6 @@ public class LocalFsTable extends LocalTable implements FeFsTable {
}
@Override
- public Map<Long, List<FileDescriptor>> getFilesSample(
- Collection<? extends FeFsPartition> inputParts,
- long percentBytes, long minSampleBytes, long randomSeed) {
- // TODO(todd): implement me
- return Collections.emptyMap();
- }
-
- @Override
public ListMap<TNetworkAddress> getHostIndex() {
return hostIndex_;
}
http://git-wip-us.apache.org/repos/asf/impala/blob/4a049c61/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java b/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java
index 706093c..cb33556 100644
--- a/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java
+++ b/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java
@@ -757,7 +757,8 @@ public class HdfsScanNode extends ScanNode {
// Pass a minimum sample size of 0 because users cannot set a minimum sample size
// for scans directly. For compute stats, a minimum sample size can be set, and
// the sampling percent is adjusted to reflect it.
- sampledFiles = tbl_.getFilesSample(partitions_, percentBytes, 0, randomSeed);
+ sampledFiles = FeFsTable.Utils.getFilesSample(tbl_, partitions_, percentBytes, 0,
+ randomSeed);
}
long scanRangeBytesLimit = analyzer.getQueryCtx().client_request.getQuery_options()