You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by ru...@apache.org on 2022/01/24 13:41:17 UTC

[iceberg] branch master updated: Data: Read metrics in parallel during TableMigration (#3876)

This is an automated email from the ASF dual-hosted git repository.

russellspitzer pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new 599b402  Data: Read metrics in parallel during TableMigration (#3876)
599b402 is described below

commit 599b402a1149b01f3b6a9f759ffc05f3145cdaec
Author: kingeasternsun <ki...@gmail.com>
AuthorDate: Mon Jan 24 21:40:52 2022 +0800

    Data: Read metrics in parallel during TableMigration (#3876)
    
    Adds a parameter for reading the metrics of files in parallel, rather than one at a time in TableMigrationUtils.
    
    Co-authored-by: King <wa...@deepexi.com>
---
 .../apache/iceberg/data/TableMigrationUtil.java    | 182 ++++++++++-----------
 1 file changed, 90 insertions(+), 92 deletions(-)

diff --git a/data/src/main/java/org/apache/iceberg/data/TableMigrationUtil.java b/data/src/main/java/org/apache/iceberg/data/TableMigrationUtil.java
index c534c70..277cc71 100644
--- a/data/src/main/java/org/apache/iceberg/data/TableMigrationUtil.java
+++ b/data/src/main/java/org/apache/iceberg/data/TableMigrationUtil.java
@@ -20,11 +20,14 @@
 package org.apache.iceberg.data;
 
 import java.io.IOException;
+import java.io.UncheckedIOException;
 import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadPoolExecutor;
 import java.util.stream.Collectors;
-import java.util.stream.Stream;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
@@ -42,11 +45,11 @@ import org.apache.iceberg.io.InputFile;
 import org.apache.iceberg.mapping.NameMapping;
 import org.apache.iceberg.orc.OrcMetrics;
 import org.apache.iceberg.parquet.ParquetUtil;
-import org.apache.parquet.hadoop.ParquetFileReader;
-import org.apache.parquet.hadoop.metadata.ParquetMetadata;
+import org.apache.iceberg.relocated.com.google.common.util.concurrent.MoreExecutors;
+import org.apache.iceberg.relocated.com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.apache.iceberg.util.Tasks;
 
 public class TableMigrationUtil {
-
   private static final PathFilter HIDDEN_PATH_FILTER =
       p -> !p.getName().startsWith("_") && !p.getName().startsWith(".");
 
@@ -74,111 +77,106 @@ public class TableMigrationUtil {
   public static List<DataFile> listPartition(Map<String, String> partition, String uri, String format,
                                              PartitionSpec spec, Configuration conf, MetricsConfig metricsConfig,
                                              NameMapping mapping) {
-    if (format.contains("avro")) {
-      return listAvroPartition(partition, uri, spec, conf);
-    } else if (format.contains("parquet")) {
-      return listParquetPartition(partition, uri, spec, conf, metricsConfig, mapping);
-    } else if (format.contains("orc")) {
-      return listOrcPartition(partition, uri, spec, conf, metricsConfig, mapping);
-    } else {
-      throw new UnsupportedOperationException("Unknown partition format: " + format);
-    }
+    return listPartition(partition, uri, format, spec, conf, metricsConfig, mapping, 1);
   }
 
-  private static List<DataFile> listAvroPartition(Map<String, String> partitionPath, String partitionUri,
-                                                  PartitionSpec spec, Configuration conf) {
+  public static List<DataFile> listPartition(Map<String, String> partitionPath, String partitionUri, String format,
+                                             PartitionSpec spec, Configuration conf, MetricsConfig metricsSpec,
+                                             NameMapping mapping, int parallelism) {
     try {
+      String partitionKey = spec.fields().stream()
+              .map(PartitionField::name)
+              .map(name -> String.format("%s=%s", name, partitionPath.get(name)))
+              .collect(Collectors.joining("/"));
+
       Path partition = new Path(partitionUri);
       FileSystem fs = partition.getFileSystem(conf);
-      return Arrays.stream(fs.listStatus(partition, HIDDEN_PATH_FILTER))
-          .filter(FileStatus::isFile)
-          .map(stat -> {
-            InputFile file = HadoopInputFile.fromLocation(stat.getPath().toString(), conf);
-            long rowCount = Avro.rowCount(file);
-            Metrics metrics = new Metrics(rowCount, null, null, null, null);
-            String partitionKey = spec.fields().stream()
-                .map(PartitionField::name)
-                .map(name -> String.format("%s=%s", name, partitionPath.get(name)))
-                .collect(Collectors.joining("/"));
-
-            return DataFiles.builder(spec)
-                .withPath(stat.getPath().toString())
-                .withFormat("avro")
-                .withFileSizeInBytes(stat.getLen())
-                .withMetrics(metrics)
-                .withPartitionPath(partitionKey)
-                .build();
-
-          }).collect(Collectors.toList());
+      List<FileStatus> fileStatus = Arrays.stream(fs.listStatus(partition, HIDDEN_PATH_FILTER))
+              .filter(FileStatus::isFile)
+              .collect(Collectors.toList());
+      DataFile[] datafiles = new DataFile[fileStatus.size()];
+      Tasks.Builder<Integer> task = Tasks.range(fileStatus.size())
+              .stopOnFailure()
+              .throwFailureWhenFinished();
+
+      if (parallelism > 1) {
+        task.executeWith(migrationService(parallelism));
+      }
+
+      if (format.contains("avro")) {
+        task.run(index -> {
+          Metrics metrics = getAvroMerics(fileStatus.get(index).getPath(), conf);
+          datafiles[index] = buildDataFile(fileStatus.get(index), partitionKey, spec, metrics, "avro");
+        });
+      } else if (format.contains("parquet")) {
+        task.run(index -> {
+          Metrics metrics = getParquetMerics(fileStatus.get(index).getPath(), conf, metricsSpec, mapping);
+          datafiles[index] = buildDataFile(fileStatus.get(index), partitionKey, spec, metrics, "parquet");
+        });
+      } else if (format.contains("orc")) {
+        task.run(index -> {
+          Metrics metrics = getOrcMerics(fileStatus.get(index).getPath(), conf, metricsSpec, mapping);
+          datafiles[index] = buildDataFile(fileStatus.get(index), partitionKey, spec, metrics, "orc");
+        });
+      } else {
+        throw new UnsupportedOperationException("Unknown partition format: " + format);
+      }
+      return Arrays.asList(datafiles);
     } catch (IOException e) {
       throw new RuntimeException("Unable to list files in partition: " + partitionUri, e);
     }
   }
 
-  private static List<DataFile> listParquetPartition(Map<String, String> partitionPath, String partitionUri,
-                                                     PartitionSpec spec, Configuration conf,
-                                                     MetricsConfig metricsSpec, NameMapping mapping) {
+  private static Metrics getAvroMerics(Path path,  Configuration conf) {
     try {
-      Path partition = new Path(partitionUri);
-      FileSystem fs = partition.getFileSystem(conf);
-
-      return Arrays.stream(fs.listStatus(partition, HIDDEN_PATH_FILTER))
-          .filter(FileStatus::isFile)
-          .map(stat -> {
-            Metrics metrics;
-            try {
-              ParquetMetadata metadata = ParquetFileReader.readFooter(conf, stat);
-              metrics = ParquetUtil.footerMetrics(metadata, Stream.empty(), metricsSpec, mapping);
-            } catch (IOException e) {
-              throw new RuntimeException("Unable to read the footer of the parquet file: " +
-                  stat.getPath(), e);
-            }
-            String partitionKey = spec.fields().stream()
-                .map(PartitionField::name)
-                .map(name -> String.format("%s=%s", name, partitionPath.get(name)))
-                .collect(Collectors.joining("/"));
-
-            return DataFiles.builder(spec)
-                .withPath(stat.getPath().toString())
-                .withFormat("parquet")
-                .withFileSizeInBytes(stat.getLen())
-                .withMetrics(metrics)
-                .withPartitionPath(partitionKey)
-                .build();
-          }).collect(Collectors.toList());
-    } catch (IOException e) {
-      throw new RuntimeException("Unable to list files in partition: " + partitionUri, e);
+      InputFile file = HadoopInputFile.fromPath(path, conf);
+      long rowCount = Avro.rowCount(file);
+      return new Metrics(rowCount, null, null, null, null);
+    } catch (UncheckedIOException e) {
+      throw new RuntimeException("Unable to read Avro file: " +
+              path, e);
     }
   }
 
-  private static List<DataFile> listOrcPartition(Map<String, String> partitionPath, String partitionUri,
-                                                 PartitionSpec spec, Configuration conf,
-                                                 MetricsConfig metricsSpec, NameMapping mapping) {
+  private static Metrics getParquetMerics(Path path,  Configuration conf,
+                                          MetricsConfig metricsSpec, NameMapping mapping) {
     try {
-      Path partition = new Path(partitionUri);
-      FileSystem fs = partition.getFileSystem(conf);
+      InputFile file = HadoopInputFile.fromPath(path, conf);
+      return ParquetUtil.fileMetrics(file, metricsSpec, mapping);
+    } catch (UncheckedIOException e) {
+      throw new RuntimeException("Unable to read the metrics of the Parquet file: " +
+              path, e);
+    }
+  }
 
-      return Arrays.stream(fs.listStatus(partition, HIDDEN_PATH_FILTER))
-          .filter(FileStatus::isFile)
-          .map(stat -> {
-            Metrics metrics = OrcMetrics.fromInputFile(HadoopInputFile.fromPath(stat.getPath(), conf),
-                metricsSpec, mapping);
-            String partitionKey = spec.fields().stream()
-                .map(PartitionField::name)
-                .map(name -> String.format("%s=%s", name, partitionPath.get(name)))
-                .collect(Collectors.joining("/"));
+  private static Metrics getOrcMerics(Path path,  Configuration conf,
+                                          MetricsConfig metricsSpec, NameMapping mapping) {
+    try {
+      return OrcMetrics.fromInputFile(HadoopInputFile.fromPath(path, conf),
+              metricsSpec, mapping);
+    } catch (UncheckedIOException e) {
+      throw new RuntimeException("Unable to read the metrics of the Orc file: " +
+              path, e);
+    }
+  }
 
-            return DataFiles.builder(spec)
-                .withPath(stat.getPath().toString())
-                .withFormat("orc")
-                .withFileSizeInBytes(stat.getLen())
-                .withMetrics(metrics)
-                .withPartitionPath(partitionKey)
-                .build();
+  private static DataFile buildDataFile(FileStatus stat, String partitionKey,
+                                      PartitionSpec spec, Metrics metrics, String format) {
+    return  DataFiles.builder(spec)
+            .withPath(stat.getPath().toString())
+            .withFormat(format)
+            .withFileSizeInBytes(stat.getLen())
+            .withMetrics(metrics)
+            .withPartitionPath(partitionKey)
+            .build();
+  }
 
-          }).collect(Collectors.toList());
-    } catch (IOException e) {
-      throw new RuntimeException("Unable to list files in partition: " + partitionUri, e);
-    }
+  private static ExecutorService migrationService(int concurrentDeletes) {
+    return MoreExecutors.getExitingExecutorService(
+            (ThreadPoolExecutor) Executors.newFixedThreadPool(
+                    concurrentDeletes,
+                    new ThreadFactoryBuilder()
+                            .setNameFormat("table-migration-%d")
+                            .build()));
   }
 }