You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by ja...@apache.org on 2022/05/25 06:06:35 UTC

[iceberg] branch master updated: Core: Use shared worker thread pool for abort and clean-up (#4799)

This is an automated email from the ASF dual-hosted git repository.

jackye pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new bf6242fe5 Core: Use shared worker thread pool for abort and clean-up (#4799)
bf6242fe5 is described below

commit bf6242fe57605a7b38b9d01ee33ae325687fb3a5
Author: Prashant Singh <35...@users.noreply.github.com>
AuthorDate: Wed May 25 11:36:27 2022 +0530

    Core: Use shared worker thread pool for abort and clean-up (#4799)
---
 core/src/main/java/org/apache/iceberg/CatalogUtil.java                 | 3 +++
 .../src/main/java/org/apache/iceberg/hadoop/HadoopTableOperations.java | 2 ++
 core/src/main/java/org/apache/iceberg/io/BaseTaskWriter.java           | 2 ++
 .../main/java/org/apache/iceberg/mr/hive/HiveIcebergRecordWriter.java  | 2 ++
 .../spark/src/main/java/org/apache/iceberg/spark/SparkTableUtil.java   | 2 ++
 .../apache/iceberg/spark/actions/BaseRewriteManifestsSparkAction.java  | 2 ++
 .../java/org/apache/iceberg/spark/source/SparkPositionDeltaWrite.java  | 2 ++
 .../src/main/java/org/apache/iceberg/spark/source/SparkWrite.java      | 1 +
 8 files changed, 16 insertions(+)

diff --git a/core/src/main/java/org/apache/iceberg/CatalogUtil.java b/core/src/main/java/org/apache/iceberg/CatalogUtil.java
index 07b69c17f..1a948de99 100644
--- a/core/src/main/java/org/apache/iceberg/CatalogUtil.java
+++ b/core/src/main/java/org/apache/iceberg/CatalogUtil.java
@@ -102,16 +102,19 @@ public class CatalogUtil {
     }
 
     Tasks.foreach(Iterables.transform(manifestsToDelete, ManifestFile::path))
+        .executeWith(ThreadPools.getWorkerPool())
         .noRetry().suppressFailureWhenFinished()
         .onFailure((manifest, exc) -> LOG.warn("Delete failed for manifest: {}", manifest, exc))
         .run(io::deleteFile);
 
     Tasks.foreach(manifestListsToDelete)
+        .executeWith(ThreadPools.getWorkerPool())
         .noRetry().suppressFailureWhenFinished()
         .onFailure((list, exc) -> LOG.warn("Delete failed for manifest list: {}", list, exc))
         .run(io::deleteFile);
 
     Tasks.foreach(Iterables.transform(metadata.previousFiles(), TableMetadata.MetadataLogEntry::file))
+        .executeWith(ThreadPools.getWorkerPool())
         .noRetry().suppressFailureWhenFinished()
         .onFailure((metadataFile, exc) -> LOG.warn("Delete failed for previous metadata file: {}", metadataFile, exc))
         .run(io::deleteFile);
diff --git a/core/src/main/java/org/apache/iceberg/hadoop/HadoopTableOperations.java b/core/src/main/java/org/apache/iceberg/hadoop/HadoopTableOperations.java
index cccdb8330..7a3c8cb02 100644
--- a/core/src/main/java/org/apache/iceberg/hadoop/HadoopTableOperations.java
+++ b/core/src/main/java/org/apache/iceberg/hadoop/HadoopTableOperations.java
@@ -49,6 +49,7 @@ import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
 import org.apache.iceberg.relocated.com.google.common.collect.Sets;
 import org.apache.iceberg.util.Pair;
 import org.apache.iceberg.util.Tasks;
+import org.apache.iceberg.util.ThreadPools;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -415,6 +416,7 @@ public class HadoopTableOperations implements TableOperations {
       Set<TableMetadata.MetadataLogEntry> removedPreviousMetadataFiles = Sets.newHashSet(base.previousFiles());
       removedPreviousMetadataFiles.removeAll(metadata.previousFiles());
       Tasks.foreach(removedPreviousMetadataFiles)
+          .executeWith(ThreadPools.getWorkerPool())
           .noRetry().suppressFailureWhenFinished()
           .onFailure((previousMetadataFile, exc) ->
               LOG.warn("Delete failed for previous metadata file: {}", previousMetadataFile, exc))
diff --git a/core/src/main/java/org/apache/iceberg/io/BaseTaskWriter.java b/core/src/main/java/org/apache/iceberg/io/BaseTaskWriter.java
index 386405afc..c80084f3d 100644
--- a/core/src/main/java/org/apache/iceberg/io/BaseTaskWriter.java
+++ b/core/src/main/java/org/apache/iceberg/io/BaseTaskWriter.java
@@ -40,6 +40,7 @@ import org.apache.iceberg.util.CharSequenceSet;
 import org.apache.iceberg.util.StructLikeMap;
 import org.apache.iceberg.util.StructProjection;
 import org.apache.iceberg.util.Tasks;
+import org.apache.iceberg.util.ThreadPools;
 
 public abstract class BaseTaskWriter<T> implements TaskWriter<T> {
   private final List<DataFile> completedDataFiles = Lists.newArrayList();
@@ -73,6 +74,7 @@ public abstract class BaseTaskWriter<T> implements TaskWriter<T> {
 
     // clean up files created by this writer
     Tasks.foreach(Iterables.concat(completedDataFiles, completedDeleteFiles))
+        .executeWith(ThreadPools.getWorkerPool())
         .throwFailureWhenFinished()
         .noRetry()
         .run(file -> io.deleteFile(file.path().toString()));
diff --git a/mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergRecordWriter.java b/mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergRecordWriter.java
index 2adf43d12..18cdc24c8 100644
--- a/mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergRecordWriter.java
+++ b/mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergRecordWriter.java
@@ -39,6 +39,7 @@ import org.apache.iceberg.io.PartitionedFanoutWriter;
 import org.apache.iceberg.mr.mapred.Container;
 import org.apache.iceberg.relocated.com.google.common.collect.Maps;
 import org.apache.iceberg.util.Tasks;
+import org.apache.iceberg.util.ThreadPools;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -95,6 +96,7 @@ class HiveIcebergRecordWriter extends PartitionedFanoutWriter<Record>
     // If abort then remove the unnecessary files
     if (abort) {
       Tasks.foreach(dataFiles)
+          .executeWith(ThreadPools.getWorkerPool())
           .retry(3)
           .suppressFailureWhenFinished()
           .onFailure((file, exception) -> LOG.debug("Failed on to remove file {} on abort", file, exception))
diff --git a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/SparkTableUtil.java b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/SparkTableUtil.java
index 401787d1a..2fedb4c74 100644
--- a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/SparkTableUtil.java
+++ b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/SparkTableUtil.java
@@ -60,6 +60,7 @@ import org.apache.iceberg.relocated.com.google.common.collect.Maps;
 import org.apache.iceberg.spark.source.SparkTable;
 import org.apache.iceberg.util.PropertyUtil;
 import org.apache.iceberg.util.Tasks;
+import org.apache.iceberg.util.ThreadPools;
 import org.apache.spark.TaskContext;
 import org.apache.spark.api.java.JavaRDD;
 import org.apache.spark.api.java.JavaSparkContext;
@@ -587,6 +588,7 @@ public class SparkTableUtil {
 
   private static void deleteManifests(FileIO io, List<ManifestFile> manifests) {
     Tasks.foreach(manifests)
+        .executeWith(ThreadPools.getWorkerPool())
         .noRetry()
         .suppressFailureWhenFinished()
         .run(item -> io.deleteFile(item.path()));
diff --git a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/BaseRewriteManifestsSparkAction.java b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/BaseRewriteManifestsSparkAction.java
index c446d42ca..209838bc8 100644
--- a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/BaseRewriteManifestsSparkAction.java
+++ b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/BaseRewriteManifestsSparkAction.java
@@ -54,6 +54,7 @@ import org.apache.iceberg.spark.SparkUtil;
 import org.apache.iceberg.types.Types;
 import org.apache.iceberg.util.PropertyUtil;
 import org.apache.iceberg.util.Tasks;
+import org.apache.iceberg.util.ThreadPools;
 import org.apache.spark.api.java.function.MapFunction;
 import org.apache.spark.api.java.function.MapPartitionsFunction;
 import org.apache.spark.broadcast.Broadcast;
@@ -308,6 +309,7 @@ public class BaseRewriteManifestsSparkAction
 
   private void deleteFiles(Iterable<String> locations) {
     Tasks.foreach(locations)
+        .executeWith(ThreadPools.getWorkerPool())
         .noRetry()
         .suppressFailureWhenFinished()
         .onFailure((location, exc) -> LOG.warn("Failed to delete: {}", location, exc))
diff --git a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeltaWrite.java b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeltaWrite.java
index b57defa69..a777af3b7 100644
--- a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeltaWrite.java
+++ b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeltaWrite.java
@@ -59,6 +59,7 @@ import org.apache.iceberg.types.Types;
 import org.apache.iceberg.util.CharSequenceSet;
 import org.apache.iceberg.util.StructProjection;
 import org.apache.iceberg.util.Tasks;
+import org.apache.iceberg.util.ThreadPools;
 import org.apache.spark.api.java.JavaSparkContext;
 import org.apache.spark.broadcast.Broadcast;
 import org.apache.spark.sql.SparkSession;
@@ -135,6 +136,7 @@ class SparkPositionDeltaWrite implements DeltaWrite, RequiresDistributionAndOrde
 
   private static <T extends ContentFile<T>> void cleanFiles(FileIO io, Iterable<T> files) {
     Tasks.foreach(files)
+        .executeWith(ThreadPools.getWorkerPool())
         .throwFailureWhenFinished()
         .noRetry()
         .run(file -> io.deleteFile(file.path().toString()));
diff --git a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java
index 10493efff..e919a83ab 100644
--- a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java
+++ b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java
@@ -637,6 +637,7 @@ abstract class SparkWrite implements Write, RequiresDistributionAndOrdering {
 
   private static <T extends ContentFile<T>> void deleteFiles(FileIO io, List<T> files) {
     Tasks.foreach(files)
+        .executeWith(ThreadPools.getWorkerPool())
         .throwFailureWhenFinished()
         .noRetry()
         .run(file -> io.deleteFile(file.path().toString()));