You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by ru...@apache.org on 2022/01/24 13:41:17 UTC
[iceberg] branch master updated: Data: Read metrics in parallel during TableMigration (#3876)
This is an automated email from the ASF dual-hosted git repository.
russellspitzer pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new 599b402 Data: Read metrics in parallel during TableMigration (#3876)
599b402 is described below
commit 599b402a1149b01f3b6a9f759ffc05f3145cdaec
Author: kingeasternsun <ki...@gmail.com>
AuthorDate: Mon Jan 24 21:40:52 2022 +0800
Data: Read metrics in parallel during TableMigration (#3876)
Adds a parameter for reading the metrics of files in parallel, rather than one at a time in TableMigrationUtils.
Co-authored-by: King <wa...@deepexi.com>
---
.../apache/iceberg/data/TableMigrationUtil.java | 182 ++++++++++-----------
1 file changed, 90 insertions(+), 92 deletions(-)
diff --git a/data/src/main/java/org/apache/iceberg/data/TableMigrationUtil.java b/data/src/main/java/org/apache/iceberg/data/TableMigrationUtil.java
index c534c70..277cc71 100644
--- a/data/src/main/java/org/apache/iceberg/data/TableMigrationUtil.java
+++ b/data/src/main/java/org/apache/iceberg/data/TableMigrationUtil.java
@@ -20,11 +20,14 @@
package org.apache.iceberg.data;
import java.io.IOException;
+import java.io.UncheckedIOException;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadPoolExecutor;
import java.util.stream.Collectors;
-import java.util.stream.Stream;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
@@ -42,11 +45,11 @@ import org.apache.iceberg.io.InputFile;
import org.apache.iceberg.mapping.NameMapping;
import org.apache.iceberg.orc.OrcMetrics;
import org.apache.iceberg.parquet.ParquetUtil;
-import org.apache.parquet.hadoop.ParquetFileReader;
-import org.apache.parquet.hadoop.metadata.ParquetMetadata;
+import org.apache.iceberg.relocated.com.google.common.util.concurrent.MoreExecutors;
+import org.apache.iceberg.relocated.com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.apache.iceberg.util.Tasks;
public class TableMigrationUtil {
-
private static final PathFilter HIDDEN_PATH_FILTER =
p -> !p.getName().startsWith("_") && !p.getName().startsWith(".");
@@ -74,111 +77,106 @@ public class TableMigrationUtil {
public static List<DataFile> listPartition(Map<String, String> partition, String uri, String format,
PartitionSpec spec, Configuration conf, MetricsConfig metricsConfig,
NameMapping mapping) {
- if (format.contains("avro")) {
- return listAvroPartition(partition, uri, spec, conf);
- } else if (format.contains("parquet")) {
- return listParquetPartition(partition, uri, spec, conf, metricsConfig, mapping);
- } else if (format.contains("orc")) {
- return listOrcPartition(partition, uri, spec, conf, metricsConfig, mapping);
- } else {
- throw new UnsupportedOperationException("Unknown partition format: " + format);
- }
+ return listPartition(partition, uri, format, spec, conf, metricsConfig, mapping, 1);
}
- private static List<DataFile> listAvroPartition(Map<String, String> partitionPath, String partitionUri,
- PartitionSpec spec, Configuration conf) {
+ public static List<DataFile> listPartition(Map<String, String> partitionPath, String partitionUri, String format,
+ PartitionSpec spec, Configuration conf, MetricsConfig metricsSpec,
+ NameMapping mapping, int parallelism) {
try {
+ String partitionKey = spec.fields().stream()
+ .map(PartitionField::name)
+ .map(name -> String.format("%s=%s", name, partitionPath.get(name)))
+ .collect(Collectors.joining("/"));
+
Path partition = new Path(partitionUri);
FileSystem fs = partition.getFileSystem(conf);
- return Arrays.stream(fs.listStatus(partition, HIDDEN_PATH_FILTER))
- .filter(FileStatus::isFile)
- .map(stat -> {
- InputFile file = HadoopInputFile.fromLocation(stat.getPath().toString(), conf);
- long rowCount = Avro.rowCount(file);
- Metrics metrics = new Metrics(rowCount, null, null, null, null);
- String partitionKey = spec.fields().stream()
- .map(PartitionField::name)
- .map(name -> String.format("%s=%s", name, partitionPath.get(name)))
- .collect(Collectors.joining("/"));
-
- return DataFiles.builder(spec)
- .withPath(stat.getPath().toString())
- .withFormat("avro")
- .withFileSizeInBytes(stat.getLen())
- .withMetrics(metrics)
- .withPartitionPath(partitionKey)
- .build();
-
- }).collect(Collectors.toList());
+ List<FileStatus> fileStatus = Arrays.stream(fs.listStatus(partition, HIDDEN_PATH_FILTER))
+ .filter(FileStatus::isFile)
+ .collect(Collectors.toList());
+ DataFile[] datafiles = new DataFile[fileStatus.size()];
+ Tasks.Builder<Integer> task = Tasks.range(fileStatus.size())
+ .stopOnFailure()
+ .throwFailureWhenFinished();
+
+ if (parallelism > 1) {
+ task.executeWith(migrationService(parallelism));
+ }
+
+ if (format.contains("avro")) {
+ task.run(index -> {
+ Metrics metrics = getAvroMerics(fileStatus.get(index).getPath(), conf);
+ datafiles[index] = buildDataFile(fileStatus.get(index), partitionKey, spec, metrics, "avro");
+ });
+ } else if (format.contains("parquet")) {
+ task.run(index -> {
+ Metrics metrics = getParquetMerics(fileStatus.get(index).getPath(), conf, metricsSpec, mapping);
+ datafiles[index] = buildDataFile(fileStatus.get(index), partitionKey, spec, metrics, "parquet");
+ });
+ } else if (format.contains("orc")) {
+ task.run(index -> {
+ Metrics metrics = getOrcMerics(fileStatus.get(index).getPath(), conf, metricsSpec, mapping);
+ datafiles[index] = buildDataFile(fileStatus.get(index), partitionKey, spec, metrics, "orc");
+ });
+ } else {
+ throw new UnsupportedOperationException("Unknown partition format: " + format);
+ }
+ return Arrays.asList(datafiles);
} catch (IOException e) {
throw new RuntimeException("Unable to list files in partition: " + partitionUri, e);
}
}
- private static List<DataFile> listParquetPartition(Map<String, String> partitionPath, String partitionUri,
- PartitionSpec spec, Configuration conf,
- MetricsConfig metricsSpec, NameMapping mapping) {
+ private static Metrics getAvroMerics(Path path, Configuration conf) {
try {
- Path partition = new Path(partitionUri);
- FileSystem fs = partition.getFileSystem(conf);
-
- return Arrays.stream(fs.listStatus(partition, HIDDEN_PATH_FILTER))
- .filter(FileStatus::isFile)
- .map(stat -> {
- Metrics metrics;
- try {
- ParquetMetadata metadata = ParquetFileReader.readFooter(conf, stat);
- metrics = ParquetUtil.footerMetrics(metadata, Stream.empty(), metricsSpec, mapping);
- } catch (IOException e) {
- throw new RuntimeException("Unable to read the footer of the parquet file: " +
- stat.getPath(), e);
- }
- String partitionKey = spec.fields().stream()
- .map(PartitionField::name)
- .map(name -> String.format("%s=%s", name, partitionPath.get(name)))
- .collect(Collectors.joining("/"));
-
- return DataFiles.builder(spec)
- .withPath(stat.getPath().toString())
- .withFormat("parquet")
- .withFileSizeInBytes(stat.getLen())
- .withMetrics(metrics)
- .withPartitionPath(partitionKey)
- .build();
- }).collect(Collectors.toList());
- } catch (IOException e) {
- throw new RuntimeException("Unable to list files in partition: " + partitionUri, e);
+ InputFile file = HadoopInputFile.fromPath(path, conf);
+ long rowCount = Avro.rowCount(file);
+ return new Metrics(rowCount, null, null, null, null);
+ } catch (UncheckedIOException e) {
+ throw new RuntimeException("Unable to read Avro file: " +
+ path, e);
}
}
- private static List<DataFile> listOrcPartition(Map<String, String> partitionPath, String partitionUri,
- PartitionSpec spec, Configuration conf,
- MetricsConfig metricsSpec, NameMapping mapping) {
+ private static Metrics getParquetMerics(Path path, Configuration conf,
+ MetricsConfig metricsSpec, NameMapping mapping) {
try {
- Path partition = new Path(partitionUri);
- FileSystem fs = partition.getFileSystem(conf);
+ InputFile file = HadoopInputFile.fromPath(path, conf);
+ return ParquetUtil.fileMetrics(file, metricsSpec, mapping);
+ } catch (UncheckedIOException e) {
+ throw new RuntimeException("Unable to read the metrics of the Parquet file: " +
+ path, e);
+ }
+ }
- return Arrays.stream(fs.listStatus(partition, HIDDEN_PATH_FILTER))
- .filter(FileStatus::isFile)
- .map(stat -> {
- Metrics metrics = OrcMetrics.fromInputFile(HadoopInputFile.fromPath(stat.getPath(), conf),
- metricsSpec, mapping);
- String partitionKey = spec.fields().stream()
- .map(PartitionField::name)
- .map(name -> String.format("%s=%s", name, partitionPath.get(name)))
- .collect(Collectors.joining("/"));
+ private static Metrics getOrcMerics(Path path, Configuration conf,
+ MetricsConfig metricsSpec, NameMapping mapping) {
+ try {
+ return OrcMetrics.fromInputFile(HadoopInputFile.fromPath(path, conf),
+ metricsSpec, mapping);
+ } catch (UncheckedIOException e) {
+ throw new RuntimeException("Unable to read the metrics of the Orc file: " +
+ path, e);
+ }
+ }
- return DataFiles.builder(spec)
- .withPath(stat.getPath().toString())
- .withFormat("orc")
- .withFileSizeInBytes(stat.getLen())
- .withMetrics(metrics)
- .withPartitionPath(partitionKey)
- .build();
+ private static DataFile buildDataFile(FileStatus stat, String partitionKey,
+ PartitionSpec spec, Metrics metrics, String format) {
+ return DataFiles.builder(spec)
+ .withPath(stat.getPath().toString())
+ .withFormat(format)
+ .withFileSizeInBytes(stat.getLen())
+ .withMetrics(metrics)
+ .withPartitionPath(partitionKey)
+ .build();
+ }
- }).collect(Collectors.toList());
- } catch (IOException e) {
- throw new RuntimeException("Unable to list files in partition: " + partitionUri, e);
- }
+ private static ExecutorService migrationService(int concurrentDeletes) {
+ return MoreExecutors.getExitingExecutorService(
+ (ThreadPoolExecutor) Executors.newFixedThreadPool(
+ concurrentDeletes,
+ new ThreadFactoryBuilder()
+ .setNameFormat("table-migration-%d")
+ .build()));
}
}