You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@iceberg.apache.org by GitBox <gi...@apache.org> on 2022/02/16 10:09:09 UTC

[GitHub] [iceberg] kingeasternsun commented on a change in pull request #3876: ⚡ Speed up TableMigration By collect the DafaFile In parallel

kingeasternsun commented on a change in pull request #3876:
URL: https://github.com/apache/iceberg/pull/3876#discussion_r807755365



##########
File path: data/src/main/java/org/apache/iceberg/data/TableMigrationUtil.java
##########
@@ -74,111 +77,106 @@ private TableMigrationUtil() {
   public static List<DataFile> listPartition(Map<String, String> partition, String uri, String format,
                                              PartitionSpec spec, Configuration conf, MetricsConfig metricsConfig,
                                              NameMapping mapping) {
-    if (format.contains("avro")) {
-      return listAvroPartition(partition, uri, spec, conf);
-    } else if (format.contains("parquet")) {
-      return listParquetPartition(partition, uri, spec, conf, metricsConfig, mapping);
-    } else if (format.contains("orc")) {
-      return listOrcPartition(partition, uri, spec, conf, metricsConfig, mapping);
-    } else {
-      throw new UnsupportedOperationException("Unknown partition format: " + format);
-    }
+    return listPartition(partition, uri, format, spec, conf, metricsConfig, mapping, 1);
   }
 
-  private static List<DataFile> listAvroPartition(Map<String, String> partitionPath, String partitionUri,
-                                                  PartitionSpec spec, Configuration conf) {
+  public static List<DataFile> listPartition(Map<String, String> partitionPath, String partitionUri, String format,
+                                             PartitionSpec spec, Configuration conf, MetricsConfig metricsSpec,
+                                             NameMapping mapping, int parallelism) {
     try {
+      String partitionKey = spec.fields().stream()
+              .map(PartitionField::name)
+              .map(name -> String.format("%s=%s", name, partitionPath.get(name)))
+              .collect(Collectors.joining("/"));
+
       Path partition = new Path(partitionUri);
       FileSystem fs = partition.getFileSystem(conf);
-      return Arrays.stream(fs.listStatus(partition, HIDDEN_PATH_FILTER))
-          .filter(FileStatus::isFile)
-          .map(stat -> {
-            InputFile file = HadoopInputFile.fromLocation(stat.getPath().toString(), conf);
-            long rowCount = Avro.rowCount(file);
-            Metrics metrics = new Metrics(rowCount, null, null, null, null);
-            String partitionKey = spec.fields().stream()
-                .map(PartitionField::name)
-                .map(name -> String.format("%s=%s", name, partitionPath.get(name)))
-                .collect(Collectors.joining("/"));
-
-            return DataFiles.builder(spec)
-                .withPath(stat.getPath().toString())
-                .withFormat("avro")
-                .withFileSizeInBytes(stat.getLen())
-                .withMetrics(metrics)
-                .withPartitionPath(partitionKey)
-                .build();
-
-          }).collect(Collectors.toList());
+      List<FileStatus> fileStatus = Arrays.stream(fs.listStatus(partition, HIDDEN_PATH_FILTER))
+              .filter(FileStatus::isFile)
+              .collect(Collectors.toList());
+      DataFile[] datafiles = new DataFile[fileStatus.size()];
+      Tasks.Builder<Integer> task = Tasks.range(fileStatus.size())
+              .stopOnFailure()
+              .throwFailureWhenFinished();
+
+      if (parallelism > 1) {
+        task.executeWith(migrationService(parallelism));
+      }
+
+      if (format.contains("avro")) {
+        task.run(index -> {
+          Metrics metrics = getAvroMerics(fileStatus.get(index).getPath(), conf);
+          datafiles[index] = buildDataFile(fileStatus.get(index), partitionKey, spec, metrics, "avro");
+        });
+      } else if (format.contains("parquet")) {
+        task.run(index -> {
+          Metrics metrics = getParquetMerics(fileStatus.get(index).getPath(), conf, metricsSpec, mapping);
+          datafiles[index] = buildDataFile(fileStatus.get(index), partitionKey, spec, metrics, "parquet");
+        });
+      } else if (format.contains("orc")) {
+        task.run(index -> {
+          Metrics metrics = getOrcMerics(fileStatus.get(index).getPath(), conf, metricsSpec, mapping);
+          datafiles[index] = buildDataFile(fileStatus.get(index), partitionKey, spec, metrics, "orc");
+        });
+      } else {
+        throw new UnsupportedOperationException("Unknown partition format: " + format);
+      }
+      return Arrays.asList(datafiles);
     } catch (IOException e) {
       throw new RuntimeException("Unable to list files in partition: " + partitionUri, e);
     }
   }
 
-  private static List<DataFile> listParquetPartition(Map<String, String> partitionPath, String partitionUri,
-                                                     PartitionSpec spec, Configuration conf,
-                                                     MetricsConfig metricsSpec, NameMapping mapping) {
+  private static Metrics getAvroMerics(Path path,  Configuration conf) {
     try {
-      Path partition = new Path(partitionUri);
-      FileSystem fs = partition.getFileSystem(conf);
-
-      return Arrays.stream(fs.listStatus(partition, HIDDEN_PATH_FILTER))
-          .filter(FileStatus::isFile)
-          .map(stat -> {
-            Metrics metrics;
-            try {
-              ParquetMetadata metadata = ParquetFileReader.readFooter(conf, stat);
-              metrics = ParquetUtil.footerMetrics(metadata, Stream.empty(), metricsSpec, mapping);
-            } catch (IOException e) {
-              throw new RuntimeException("Unable to read the footer of the parquet file: " +
-                  stat.getPath(), e);
-            }
-            String partitionKey = spec.fields().stream()
-                .map(PartitionField::name)
-                .map(name -> String.format("%s=%s", name, partitionPath.get(name)))
-                .collect(Collectors.joining("/"));
-
-            return DataFiles.builder(spec)
-                .withPath(stat.getPath().toString())
-                .withFormat("parquet")
-                .withFileSizeInBytes(stat.getLen())
-                .withMetrics(metrics)
-                .withPartitionPath(partitionKey)
-                .build();
-          }).collect(Collectors.toList());
-    } catch (IOException e) {
-      throw new RuntimeException("Unable to list files in partition: " + partitionUri, e);
+      InputFile file = HadoopInputFile.fromPath(path, conf);
+      long rowCount = Avro.rowCount(file);
+      return new Metrics(rowCount, null, null, null, null);
+    } catch (UncheckedIOException e) {
+      throw new RuntimeException("Unable to read Avro file: " +
+              path, e);
     }
   }
 
-  private static List<DataFile> listOrcPartition(Map<String, String> partitionPath, String partitionUri,
-                                                 PartitionSpec spec, Configuration conf,
-                                                 MetricsConfig metricsSpec, NameMapping mapping) {
+  private static Metrics getParquetMerics(Path path,  Configuration conf,
+                                          MetricsConfig metricsSpec, NameMapping mapping) {
     try {
-      Path partition = new Path(partitionUri);
-      FileSystem fs = partition.getFileSystem(conf);
+      InputFile file = HadoopInputFile.fromPath(path, conf);
+      return ParquetUtil.fileMetrics(file, metricsSpec, mapping);
+    } catch (UncheckedIOException e) {
+      throw new RuntimeException("Unable to read the metrics of the Parquet file: " +
+              path, e);
+    }
+  }
 
-      return Arrays.stream(fs.listStatus(partition, HIDDEN_PATH_FILTER))
-          .filter(FileStatus::isFile)
-          .map(stat -> {
-            Metrics metrics = OrcMetrics.fromInputFile(HadoopInputFile.fromPath(stat.getPath(), conf),
-                metricsSpec, mapping);
-            String partitionKey = spec.fields().stream()
-                .map(PartitionField::name)
-                .map(name -> String.format("%s=%s", name, partitionPath.get(name)))
-                .collect(Collectors.joining("/"));
+  private static Metrics getOrcMerics(Path path,  Configuration conf,
+                                          MetricsConfig metricsSpec, NameMapping mapping) {
+    try {
+      return OrcMetrics.fromInputFile(HadoopInputFile.fromPath(path, conf),
+              metricsSpec, mapping);
+    } catch (UncheckedIOException e) {
+      throw new RuntimeException("Unable to read the metrics of the Orc file: " +
+              path, e);
+    }
+  }
 
-            return DataFiles.builder(spec)
-                .withPath(stat.getPath().toString())
-                .withFormat("orc")
-                .withFileSizeInBytes(stat.getLen())
-                .withMetrics(metrics)
-                .withPartitionPath(partitionKey)
-                .build();
+  private static DataFile buildDataFile(FileStatus stat, String partitionKey,
+                                      PartitionSpec spec, Metrics metrics, String format) {
+    return  DataFiles.builder(spec)
+            .withPath(stat.getPath().toString())
+            .withFormat(format)
+            .withFileSizeInBytes(stat.getLen())
+            .withMetrics(metrics)
+            .withPartitionPath(partitionKey)
+            .build();
+  }
 
-          }).collect(Collectors.toList());
-    } catch (IOException e) {
-      throw new RuntimeException("Unable to list files in partition: " + partitionUri, e);
-    }
+  private static ExecutorService migrationService(int concurrentDeletes) {

Review comment:
       > #4125
   
   Thanks




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org