You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@iceberg.apache.org by GitBox <gi...@apache.org> on 2022/01/12 01:24:59 UTC

[GitHub] [iceberg] kingeasternsun commented on a change in pull request #3876: ⚡ Speed up TableMigration By collect the DafaFile In parallel

kingeasternsun commented on a change in pull request #3876:
URL: https://github.com/apache/iceberg/pull/3876#discussion_r782643727



##########
File path: data/src/main/java/org/apache/iceberg/data/TableMigrationUtil.java
##########
@@ -181,4 +188,169 @@ private TableMigrationUtil() {
       throw new RuntimeException("Unable to list files in partition: " + partitionUri, e);
     }
   }
+
+  private static List<DataFile> listAvroPartition(Map<String, String> partitionPath, String partitionUri,
+                                                  PartitionSpec spec, Configuration conf, int parallelism) {
+    // not use parallelism
+    if (parallelism == 0 || parallelism == 1) {
+      return listAvroPartition(partitionPath, partitionUri, spec, conf);
+    }
+
+    List<Callable<DataFile>> footers = new ArrayList<>();
+    try {
+      Path partition = new Path(partitionUri);
+      FileSystem fs = partition.getFileSystem(conf);
+      FileStatus[] fileStatus = fs.listStatus(partition, HIDDEN_PATH_FILTER);
+      for (final FileStatus stat : fileStatus) {
+        if (!stat.isFile()) {
+          continue;
+        }
+        footers.add(() -> {
+          InputFile file = HadoopInputFile.fromLocation(stat.getPath().toString(), conf);
+          long rowCount = Avro.rowCount(file);
+          Metrics metrics = new Metrics(rowCount, null, null, null, null);
+          String partitionKey = spec.fields().stream()
+                  .map(PartitionField::name)
+                  .map(name -> String.format("%s=%s", name, partitionPath.get(name)))
+                  .collect(Collectors.joining("/"));
+
+          return DataFiles.builder(spec)
+                  .withPath(stat.getPath().toString())
+                  .withFormat("avro")
+                  .withFileSizeInBytes(stat.getLen())
+                  .withMetrics(metrics)
+                  .withPartitionPath(partitionKey)
+                  .build();
+        });
+      }
+    } catch (IOException e) {
+      throw new RuntimeException("Unable to list files in partition: " + partitionUri, e);
+    }
+
+    try {
+      return runAllInParallel(parallelism, footers);
+    } catch (ExecutionException e) {
+      throw new RuntimeException("Could not read footer: " + e.getMessage(), e.getCause());
+    }
+
+  }
+
+  private static List<DataFile> listParquetPartition(Map<String, String> partitionPath, String partitionUri,
+                                                     PartitionSpec spec, Configuration conf,
+                                                     MetricsConfig metricsSpec, NameMapping mapping, int parallelism) {
+    // not use parallelism
+    if (parallelism == 0 || parallelism == 1) {
+      return listParquetPartition(partitionPath, partitionUri, spec, conf, metricsSpec, mapping);
+    }
+
+    List<Callable<DataFile>> footers = new ArrayList<>();
+    try {
+      Path partition = new Path(partitionUri);
+      FileSystem fs = partition.getFileSystem(conf);
+      FileStatus[] fileStatus = fs.listStatus(partition, HIDDEN_PATH_FILTER);
+      for (final FileStatus stat : fileStatus) {
+        if (!stat.isFile()) {
+          continue;
+        }
+        footers.add(() -> {
+          Metrics metrics;
+          try {
+            ParquetMetadata metadata = ParquetFileReader.readFooter(conf, stat);
+            metrics = ParquetUtil.footerMetrics(metadata, Stream.empty(), metricsSpec, mapping);
+          } catch (IOException e) {
+            throw new RuntimeException("Unable to read the footer of the parquet file: " +
+                    stat.getPath(), e);
+          }
+          String partitionKey = spec.fields().stream()
+                  .map(PartitionField::name)
+                  .map(name -> String.format("%s=%s", name, partitionPath.get(name)))
+                  .collect(Collectors.joining("/"));
+
+          return DataFiles.builder(spec)
+                  .withPath(stat.getPath().toString())
+                  .withFormat("parquet")
+                  .withFileSizeInBytes(stat.getLen())
+                  .withMetrics(metrics)
+                  .withPartitionPath(partitionKey)
+                  .build();
+        });
+      }
+    } catch (IOException e) {
+      throw new RuntimeException("Unable to list files in partition: " + partitionUri, e);
+    }
+
+    try {
+      return runAllInParallel(parallelism, footers);
+    } catch (ExecutionException e) {
+      throw new RuntimeException("Could not read footer: " + e.getMessage(), e.getCause());
+    }
+
+  }
+
+  private static List<DataFile> listOrcPartition(Map<String, String> partitionPath, String partitionUri,
+                                                     PartitionSpec spec, Configuration conf,
+                                                     MetricsConfig metricsSpec, NameMapping mapping, int parallelism) {
+    // not use parallelism
+    if (parallelism == 0 || parallelism == 1) {
+      return listOrcPartition(partitionPath, partitionUri, spec, conf, metricsSpec, mapping);
+    }
+
+    List<Callable<DataFile>> footers = new ArrayList<>();
+    try {
+      Path partition = new Path(partitionUri);
+      FileSystem fs = partition.getFileSystem(conf);
+      FileStatus[] fileStatus = fs.listStatus(partition, HIDDEN_PATH_FILTER);
+      for (final FileStatus stat : fileStatus) {
+        if (!stat.isFile()) {
+          continue;
+        }
+        footers.add(() -> {
+          Metrics metrics = OrcMetrics.fromInputFile(HadoopInputFile.fromPath(stat.getPath(), conf),
+                  metricsSpec, mapping);
+          String partitionKey = spec.fields().stream()
+                  .map(PartitionField::name)
+                  .map(name -> String.format("%s=%s", name, partitionPath.get(name)))
+                  .collect(Collectors.joining("/"));
+
+          return DataFiles.builder(spec)
+                  .withPath(stat.getPath().toString())
+                  .withFormat("orc")
+                  .withFileSizeInBytes(stat.getLen())
+                  .withMetrics(metrics)
+                  .withPartitionPath(partitionKey)
+                  .build();
+        });
+      }
+    } catch (IOException e) {
+      throw new RuntimeException("Unable to list files in partition: " + partitionUri, e);
+    }
+
+    try {
+      return runAllInParallel(parallelism, footers);
+    } catch (ExecutionException e) {
+      throw new RuntimeException("Could not read footer: " + e.getMessage(), e.getCause());
+    }
+
+  }
+
+  private static <T> List<T> runAllInParallel(int parallelism, List<Callable<T>> toRun) throws ExecutionException {

Review comment:
       Thansk for your  review,  because I' m a newer to java, Can u give me a link about the `Tasks` utility , so I can learn how to replace it with `Tasks` .




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org