You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@iceberg.apache.org by GitBox <gi...@apache.org> on 2022/05/05 23:20:37 UTC

[GitHub] [iceberg] aokolnychyi commented on a diff in pull request #4652: ICEBERG-4346: Better handling of Orphan files

aokolnychyi commented on code in PR #4652:
URL: https://github.com/apache/iceberg/pull/4652#discussion_r866297319


##########
api/src/main/java/org/apache/iceberg/actions/DeleteOrphanFiles.java:
##########
@@ -80,6 +81,28 @@ public interface DeleteOrphanFiles extends Action<DeleteOrphanFiles, DeleteOrpha
    */
   DeleteOrphanFiles executeDeleteWith(ExecutorService executorService);
 
+

Review Comment:
   nit: extra empty line



##########
spark/v3.0/spark/src/main/java/org/apache/iceberg/spark/actions/BaseDeleteOrphanFilesSparkAction.java:
##########
@@ -78,22 +84,16 @@
     extends BaseSparkAction<DeleteOrphanFiles, DeleteOrphanFiles.Result> implements DeleteOrphanFiles {
 
   private static final Logger LOG = LoggerFactory.getLogger(BaseDeleteOrphanFilesSparkAction.class);
-  private static final UserDefinedFunction filenameUDF = functions.udf((String path) -> {
-    int lastIndex = path.lastIndexOf(File.separator);
-    if (lastIndex == -1) {
-      return path;
-    } else {
-      return path.substring(lastIndex + 1);
-    }
-  }, DataTypes.StringType);
-
   private static final ExecutorService DEFAULT_DELETE_EXECUTOR_SERVICE = null;
 
   private final SerializableConfiguration hadoopConf;
   private final int partitionDiscoveryParallelism;
   private final Table table;
 
   private String location = null;
+  private PrefixMisMatchMode prefixMismatchMode = PrefixMisMatchMode.IGNORE;

Review Comment:
   I think the default mode should be error to bring the attention to the problem.



##########
spark/v2.4/spark/src/main/java/org/apache/iceberg/spark/actions/BaseDeleteOrphanFilesSparkAction.java:
##########
@@ -128,6 +128,21 @@ public BaseDeleteOrphanFilesSparkAction executeDeleteWith(ExecutorService execut
     return this;
   }
 
+  @Override
+  public DeleteOrphanFiles prefixMismatchMode(String mode) {

Review Comment:
   If you add default implementations, you may avoid touching all Spark versions.



##########
api/src/main/java/org/apache/iceberg/actions/DeleteOrphanFiles.java:
##########
@@ -80,6 +81,28 @@ public interface DeleteOrphanFiles extends Action<DeleteOrphanFiles, DeleteOrpha
    */
   DeleteOrphanFiles executeDeleteWith(ExecutorService executorService);
 
+
+  /**
+   * Pass a mode for handling the files that cannot be determined if they are orphan
+   * @param mode mode for handling files that cannot be determined if they are orphan
+   * @return this for method chaining
+   */
+  DeleteOrphanFiles prefixMismatchMode(String mode);

Review Comment:
   I think you can move the enum from the Spark module here and make it nested within this action interface (e.g. right above the result interface). Then we can refer to it in this method instead of using a string.



##########
api/src/main/java/org/apache/iceberg/actions/DeleteOrphanFiles.java:
##########
@@ -80,6 +81,28 @@ public interface DeleteOrphanFiles extends Action<DeleteOrphanFiles, DeleteOrpha
    */
   DeleteOrphanFiles executeDeleteWith(ExecutorService executorService);
 
+
+  /**
+   * Pass a mode for handling the files that cannot be determined if they are orphan
+   * @param mode mode for handling files that cannot be determined if they are orphan
+   * @return this for method chaining
+   */
+  DeleteOrphanFiles prefixMismatchMode(String mode);
+
+  /**
+   * Pass a list of schemes to be considered equivalent when finding orphan files
+   * @param equivalentSchemes list of equivalent schemes
+   * @return this for method chaining
+   */
+  DeleteOrphanFiles equivalentSchemes(List<String> equivalentSchemes);

Review Comment:
   All newly added methods should have default implementations throwing an exception.
   
   ```
   throw new UnsupportedOperationException(this.getClass().getName() + " does not implement XXX");
   ```



##########
api/src/main/java/org/apache/iceberg/actions/DeleteOrphanFiles.java:
##########
@@ -80,6 +81,28 @@ public interface DeleteOrphanFiles extends Action<DeleteOrphanFiles, DeleteOrpha
    */
   DeleteOrphanFiles executeDeleteWith(ExecutorService executorService);
 
+
+  /**
+   * Pass a mode for handling the files that cannot be determined if they are orphan
+   * @param mode mode for handling files that cannot be determined if they are orphan
+   * @return this for method chaining
+   */
+  DeleteOrphanFiles prefixMismatchMode(String mode);
+
+  /**
+   * Pass a list of schemes to be considered equivalent when finding orphan files
+   * @param equivalentSchemes list of equivalent schemes
+   * @return this for method chaining
+   */
+  DeleteOrphanFiles equivalentSchemes(List<String> equivalentSchemes);
+
+  /**
+   * Pass a list of authorities to be considered equivalent when finding orphan files

Review Comment:
   Question: what if I have the following use case where `bucket1` and `bucket2` are indeed different?
   
   ```
   s3://bucket1/path/to/file.parquet - actual
   s3://bucket2/path/to/file.parquet - referenced by metadata
   ```
   
   The first file should be deleted. How can we support this case with this API? The only way I can think of right now is to add another method of unequal authorities. That seems to complicate the action quite a bit.
   
   Any better ideas?



##########
api/src/main/java/org/apache/iceberg/actions/DeleteOrphanFiles.java:
##########
@@ -80,6 +81,28 @@ public interface DeleteOrphanFiles extends Action<DeleteOrphanFiles, DeleteOrpha
    */
   DeleteOrphanFiles executeDeleteWith(ExecutorService executorService);
 
+
+  /**
+   * Pass a mode for handling the files that cannot be determined if they are orphan
+   * @param mode mode for handling files that cannot be determined if they are orphan
+   * @return this for method chaining
+   */
+  DeleteOrphanFiles prefixMismatchMode(String mode);
+
+  /**
+   * Pass a list of schemes to be considered equivalent when finding orphan files
+   * @param equivalentSchemes list of equivalent schemes
+   * @return this for method chaining
+   */
+  DeleteOrphanFiles equivalentSchemes(List<String> equivalentSchemes);
+
+  /**
+   * Pass a list of authorities to be considered equivalent when finding orphan files
+   * @param equivalentAuthorities list of equivalent schemes
+   * @return this for method chaining
+   */
+  DeleteOrphanFiles equivalentAuthorities(List<String> equivalentAuthorities);

Review Comment:
   Question: will it be better to use a map? We recently added a way to specify a list of actual files so it may contain entries for multiple schemes/authorities.



##########
spark/v3.0/spark/src/main/java/org/apache/iceberg/spark/actions/BaseDeleteOrphanFilesSparkAction.java:
##########
@@ -78,22 +84,16 @@
     extends BaseSparkAction<DeleteOrphanFiles, DeleteOrphanFiles.Result> implements DeleteOrphanFiles {
 
   private static final Logger LOG = LoggerFactory.getLogger(BaseDeleteOrphanFilesSparkAction.class);
-  private static final UserDefinedFunction filenameUDF = functions.udf((String path) -> {
-    int lastIndex = path.lastIndexOf(File.separator);
-    if (lastIndex == -1) {
-      return path;
-    } else {
-      return path.substring(lastIndex + 1);
-    }
-  }, DataTypes.StringType);
-
   private static final ExecutorService DEFAULT_DELETE_EXECUTOR_SERVICE = null;
 
   private final SerializableConfiguration hadoopConf;
   private final int partitionDiscoveryParallelism;
   private final Table table;
 
   private String location = null;
+  private PrefixMisMatchMode prefixMismatchMode = PrefixMisMatchMode.IGNORE;
+  private List<String> equivalentSchemes;

Review Comment:
   We also need to define some default values like s3a, s3, s3n



##########
spark/v3.0/spark/src/main/java/org/apache/iceberg/spark/actions/BaseDeleteOrphanFilesSparkAction.java:
##########
@@ -273,4 +310,102 @@ private static FlatMapFunction<Iterator<String>, String> listDirsRecursively(
       return files.iterator();
     };
   }
+
+  private static Dataset<Row> sanitizeAndSplitPaths(Dataset<Row> filesDF) {
+    final StructType schema = new StructType(new StructField[]{

Review Comment:
   Seems like this can be a static constant?



##########
spark/v3.0/spark/src/main/java/org/apache/iceberg/spark/actions/BaseDeleteOrphanFilesSparkAction.java:
##########
@@ -186,6 +202,27 @@ private DeleteOrphanFiles.Result doExecute() {
     return new BaseDeleteOrphanFilesActionResult(orphanFiles);
   }
 
+  static Dataset<String> getOrphanFilesDF(Dataset<Row> actualFileDF,

Review Comment:
   nit: annotate with `@VisibleForTesting`?



##########
spark/v3.0/spark/src/main/java/org/apache/iceberg/spark/actions/BaseDeleteOrphanFilesSparkAction.java:
##########
@@ -186,6 +202,27 @@ private DeleteOrphanFiles.Result doExecute() {
     return new BaseDeleteOrphanFilesActionResult(orphanFiles);
   }
 
+  static Dataset<String> getOrphanFilesDF(Dataset<Row> actualFileDF,

Review Comment:
   We usually try to avoid using `get` as it has almost to meaning. What about `findOrphanFiles` and returning a list of strings?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org