You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@iceberg.apache.org by GitBox <gi...@apache.org> on 2022/07/14 16:32:52 UTC

[GitHub] [iceberg] aokolnychyi commented on a diff in pull request #4652: ICEBERG-4346: Better handling of Orphan files

aokolnychyi commented on code in PR #4652:
URL: https://github.com/apache/iceberg/pull/4652#discussion_r921340638


##########
spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/BaseDeleteOrphanFilesSparkAction.java:
##########
@@ -232,6 +259,75 @@ private DeleteOrphanFiles.Result doExecute() {
     return new BaseDeleteOrphanFilesActionResult(orphanFiles);
   }
 
+  private static Map<String, String> populateEqualSchemeAndAuthoritiesMap(List<String> equalSchemes,
+                                                                          List<String> equalAuthorities) {
+    Map<String, String> equalSchemeAndAuthoritiesMap = Maps.newHashMap();
+    if (equalSchemes != null && !equalSchemes.isEmpty()) {
+      String firstScheme = equalSchemes.get(0);
+      equalSchemes.forEach(value -> equalSchemeAndAuthoritiesMap.put(value, firstScheme));
+    }
+
+    if (equalAuthorities != null && !equalAuthorities.isEmpty()) {
+      String firstAuthority = equalAuthorities.get(0);
+      equalAuthorities.forEach(value -> equalSchemeAndAuthoritiesMap.put(value, firstAuthority));
+    }
+    return equalSchemeAndAuthoritiesMap;
+  }
+
+  @VisibleForTesting
+  static List<String> findOrphanFiles(Dataset<Row> actualFileDF,
+                                      Dataset<Row> validFileDF,
+                                      Map<String, String> equalSchemes,
+                                      Map<String, String> equalAuthorities,
+                                      SetAccumulator<Pair<String, String>> setAccumulator,
+                                      PrefixMismatchMode prefixMismatchMode) {
+    Map<String, String> equalSchemesAndAuthoritiesMap = Maps.newHashMap();
+    equalSchemesAndAuthoritiesMap.putAll(flattenMap(equalSchemes));
+    equalSchemesAndAuthoritiesMap.putAll(flattenMap(equalAuthorities));
+
+    Dataset<Row> normalizedActualFileDF = actualFileDF.mapPartitions(toFileMetadata(equalSchemesAndAuthoritiesMap),
+        RowEncoder.apply(FILE_METADATA_SCHEMA)).as("actual");
+    Dataset<Row> normalizedValidFileDF = validFileDF.mapPartitions(toFileMetadata(equalSchemesAndAuthoritiesMap),
+        RowEncoder.apply(FILE_METADATA_SCHEMA)).as("valid");
+
+    Column actualFileName = normalizedActualFileDF.col("path");
+    Column validFileName = normalizedValidFileDF.col("path");
+
+    Dataset<FileDescriptor> mayBeOrphanFilesDF = normalizedActualFileDF.join(normalizedValidFileDF,
+        actualFileName.equalTo(validFileName),
+        "leftouter")
+        .selectExpr("actual.scheme as actualScheme",
+            "actual.authority as actualAuthority",
+            "actual.path as actualPath",
+            "actual.file_path as actualFilePath",
+            "valid.scheme as validScheme",
+            "valid.authority as validAuthority",
+            "valid.path as validPath",
+            "valid.file_path as validFilePath")
+        .as(Encoders.bean(FileDescriptor.class));
+
+    List<String> orphanFiles = mayBeOrphanFilesDF.mapPartitions(
+        new MapOrphanFilesFunction(setAccumulator), Encoders.STRING()).collectAsList();
+    if (prefixMismatchMode == PrefixMismatchMode.ERROR && !setAccumulator.value().isEmpty()) {
+      throw new ValidationException("Unable to deterministically find all orphan files." +
+          " Found file paths that have same file path but different authorities/schemes. Conflicting" +
+          " authorities/schemes found: %s", setAccumulator.value().toString());
+    }
+    return orphanFiles;
+  }
+
+  private static Map<String, String> flattenMap(Map<String, String> toBeFlattenedMap) {

Review Comment:
   To be honest, I am not sure supporting comma separated values will be easier for the user. I'd either skip this logic and just support a plain map or at least support commas in the key vs in the value (`s3a, s3n` -> `s3`).



##########
spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/BaseDeleteOrphanFilesSparkAction.java:
##########
@@ -232,6 +259,75 @@ private DeleteOrphanFiles.Result doExecute() {
     return new BaseDeleteOrphanFilesActionResult(orphanFiles);
   }
 
+  private static Map<String, String> populateEqualSchemeAndAuthoritiesMap(List<String> equalSchemes,
+                                                                          List<String> equalAuthorities) {
+    Map<String, String> equalSchemeAndAuthoritiesMap = Maps.newHashMap();
+    if (equalSchemes != null && !equalSchemes.isEmpty()) {
+      String firstScheme = equalSchemes.get(0);
+      equalSchemes.forEach(value -> equalSchemeAndAuthoritiesMap.put(value, firstScheme));
+    }
+
+    if (equalAuthorities != null && !equalAuthorities.isEmpty()) {
+      String firstAuthority = equalAuthorities.get(0);
+      equalAuthorities.forEach(value -> equalSchemeAndAuthoritiesMap.put(value, firstAuthority));
+    }
+    return equalSchemeAndAuthoritiesMap;
+  }
+
+  @VisibleForTesting
+  static List<String> findOrphanFiles(Dataset<Row> actualFileDF,
+                                      Dataset<Row> validFileDF,
+                                      Map<String, String> equalSchemes,
+                                      Map<String, String> equalAuthorities,
+                                      SetAccumulator<Pair<String, String>> setAccumulator,
+                                      PrefixMismatchMode prefixMismatchMode) {
+    Map<String, String> equalSchemesAndAuthoritiesMap = Maps.newHashMap();
+    equalSchemesAndAuthoritiesMap.putAll(flattenMap(equalSchemes));
+    equalSchemesAndAuthoritiesMap.putAll(flattenMap(equalAuthorities));
+
+    Dataset<Row> normalizedActualFileDF = actualFileDF.mapPartitions(toFileMetadata(equalSchemesAndAuthoritiesMap),
+        RowEncoder.apply(FILE_METADATA_SCHEMA)).as("actual");
+    Dataset<Row> normalizedValidFileDF = validFileDF.mapPartitions(toFileMetadata(equalSchemesAndAuthoritiesMap),
+        RowEncoder.apply(FILE_METADATA_SCHEMA)).as("valid");
+
+    Column actualFileName = normalizedActualFileDF.col("path");
+    Column validFileName = normalizedValidFileDF.col("path");
+
+    Dataset<FileDescriptor> mayBeOrphanFilesDF = normalizedActualFileDF.join(normalizedValidFileDF,

Review Comment:
   I think we should be able to use typed `joinWith`.
   
   ```
   Dataset<FileMetadata> actualFileMetadataDS = actualFileDF
       .mapPartitions(toFileMetadata(equalSchemes, equalAuthorities), Encoders.bean(FileMetadata.class));
   
   Dataset<FileMetadata> validFileMetadataDS = validFileDF
       .mapPartitions(toFileMetadata(equalSchemes, equalAuthorities), Encoders.bean(FileMetadata.class));
   
   Column joinCod = actualFileMetadataDS.col("path").equalTo(validFileMetadataDS.col("path"));
   
   SetAccumulator<Pair<String, String>> conflicts = new SetAccumulator<>();
   spark.sparkContext().register(conflicts);
   
   List<String> orphanFiles = actualFileMetadataDS
       .joinWith(validFileMetadataDS, joinCod, "leftouter")
       .mapPartitions(findOrphanFiles(prefixMismatchMode, conflicts), Encoders.STRING())
       .collectAsList();
   ```
   
   ```
   private static MapPartitionsFunction<Row, FileMetadata> toFileMetadata(...)
   ```
   



##########
spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/BaseDeleteOrphanFilesSparkAction.java:
##########
@@ -232,6 +259,75 @@ private DeleteOrphanFiles.Result doExecute() {
     return new BaseDeleteOrphanFilesActionResult(orphanFiles);
   }
 
+  private static Map<String, String> populateEqualSchemeAndAuthoritiesMap(List<String> equalSchemes,
+                                                                          List<String> equalAuthorities) {
+    Map<String, String> equalSchemeAndAuthoritiesMap = Maps.newHashMap();
+    if (equalSchemes != null && !equalSchemes.isEmpty()) {
+      String firstScheme = equalSchemes.get(0);
+      equalSchemes.forEach(value -> equalSchemeAndAuthoritiesMap.put(value, firstScheme));
+    }
+
+    if (equalAuthorities != null && !equalAuthorities.isEmpty()) {
+      String firstAuthority = equalAuthorities.get(0);
+      equalAuthorities.forEach(value -> equalSchemeAndAuthoritiesMap.put(value, firstAuthority));
+    }
+    return equalSchemeAndAuthoritiesMap;
+  }
+
+  @VisibleForTesting
+  static List<String> findOrphanFiles(Dataset<Row> actualFileDF,
+                                      Dataset<Row> validFileDF,
+                                      Map<String, String> equalSchemes,
+                                      Map<String, String> equalAuthorities,
+                                      SetAccumulator<Pair<String, String>> setAccumulator,
+                                      PrefixMismatchMode prefixMismatchMode) {
+    Map<String, String> equalSchemesAndAuthoritiesMap = Maps.newHashMap();

Review Comment:
   nit: I think it will be cleaner to keep these maps separate.



##########
spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/BaseDeleteOrphanFilesSparkAction.java:
##########
@@ -94,14 +101,12 @@
     extends BaseSparkAction<DeleteOrphanFiles, DeleteOrphanFiles.Result> implements DeleteOrphanFiles {
 
   private static final Logger LOG = LoggerFactory.getLogger(BaseDeleteOrphanFilesSparkAction.class);
-  private static final UserDefinedFunction filenameUDF = functions.udf((String path) -> {
-    int lastIndex = path.lastIndexOf(File.separator);
-    if (lastIndex == -1) {
-      return path;
-    } else {
-      return path.substring(lastIndex + 1);
-    }
-  }, DataTypes.StringType);
+  private static final StructType FILE_METADATA_SCHEMA = new StructType(new StructField[]{

Review Comment:
   This won't be needed if we switch to `joinWith`.



##########
spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/FileDescriptor.java:
##########
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.actions;
+
+public class FileDescriptor {

Review Comment:
   If we switch to `joinWith` in the action, this class can be simplified.
   
   ```
   public static class FileMetadata {
     private String scheme;
     private String authority;
     private String path;
     private String location;
   
     ...
   }
   ```
   



##########
spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/BaseDeleteOrphanFilesSparkAction.java:
##########
@@ -213,14 +239,15 @@ private DeleteOrphanFiles.Result doExecute() {
     Dataset<Row> validFileDF = validContentFileDF.union(validMetadataFileDF);
     Dataset<Row> actualFileDF = compareToFileList == null ? buildActualFileDF() : filteredCompareToFileList();
 
-    Column actualFileName = filenameUDF.apply(actualFileDF.col(FILE_PATH));
-    Column validFileName = filenameUDF.apply(validFileDF.col(FILE_PATH));
-    Column nameEqual = actualFileName.equalTo(validFileName);
-    Column actualContains = actualFileDF.col(FILE_PATH).contains(validFileDF.col(FILE_PATH));
-    Column joinCond = nameEqual.and(actualContains);
-    List<String> orphanFiles = actualFileDF.join(validFileDF, joinCond, "leftanti")
-        .as(Encoders.STRING())
-        .collectAsList();
+    SetAccumulator<Pair<String, String>> setAccumulator = new SetAccumulator<>();

Review Comment:
   What about passing `spark()` to `findOrphanFiles` and creating the accumulator there? That's the method that actually uses it.



##########
spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/FileDescriptor.java:
##########
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.actions;
+
+public class FileDescriptor {

Review Comment:
   I'd also make it static nested class in the action as it is only used in the action.



##########
api/src/main/java/org/apache/iceberg/actions/DeleteOrphanFiles.java:
##########
@@ -89,4 +125,13 @@ interface Result {
      */
     Iterable<String> orphanFileLocations();
   }
+
+  /**
+   * Defines the Delete Orphan files behaviour when there is mismatch in prefix(scheme/authority)
+   * ERROR - Throws an exception when prefix mismatch
+   * IGNORE - No action when prefix mismatch
+   */
+  enum PrefixMismatchMode {
+    ERROR, IGNORE

Review Comment:
   I thought a bit more about this and maybe we do need `DELETE` mode, even though I was reluctant to support it initially. The behavior will be you get an exception with all conflicting schemes/authorities by default and are asked to resolve conflicts. If that's not enough, then you can use either IGNORE or DELETE to handle the remaining conflicts.



##########
spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/BaseDeleteOrphanFilesSparkAction.java:
##########
@@ -361,4 +457,51 @@ static PathFilter forSpecs(Map<Integer, PartitionSpec> specs) {
       return partitionNames.isEmpty() ? HiddenPathFilter.get() : new PartitionAwareHiddenPathFilter(partitionNames);
     }
   }
+
+  private static MapPartitionsFunction<Row, Row> toFileMetadata(Map<String, String> equalSchemeAndAuthoritiesMap) {
+    return rows -> Iterators.transform(rows, row -> {
+      String filePathAsString = row.getString(0);
+      URI uri = new Path(filePathAsString).toUri();
+      String scheme = equalSchemeAndAuthoritiesMap.getOrDefault(uri.getScheme(), uri.getScheme());
+      String authority = equalSchemeAndAuthoritiesMap.getOrDefault(uri.getAuthority(), uri.getAuthority());
+      return RowFactory.create(scheme, authority, uri.getPath(), filePathAsString);

Review Comment:
   This will become:
   
   ```
   return new FileMetadata(scheme, authority, uri.getPath(), location);
   ```



##########
spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/BaseDeleteOrphanFilesSparkAction.java:
##########
@@ -361,4 +457,51 @@ static PathFilter forSpecs(Map<Integer, PartitionSpec> specs) {
       return partitionNames.isEmpty() ? HiddenPathFilter.get() : new PartitionAwareHiddenPathFilter(partitionNames);
     }
   }
+
+  private static MapPartitionsFunction<Row, Row> toFileMetadata(Map<String, String> equalSchemeAndAuthoritiesMap) {
+    return rows -> Iterators.transform(rows, row -> {
+      String filePathAsString = row.getString(0);
+      URI uri = new Path(filePathAsString).toUri();
+      String scheme = equalSchemeAndAuthoritiesMap.getOrDefault(uri.getScheme(), uri.getScheme());
+      String authority = equalSchemeAndAuthoritiesMap.getOrDefault(uri.getAuthority(), uri.getAuthority());
+      return RowFactory.create(scheme, authority, uri.getPath(), filePathAsString);
+    });
+  }
+
+  static class MapOrphanFilesFunction implements MapPartitionsFunction<FileDescriptor, String> {
+
+    private final SetAccumulator<Pair<String, String>> setAcc;
+
+    MapOrphanFilesFunction(SetAccumulator<Pair<String, String>> accumulator) {
+      this.setAcc = accumulator;
+    }
+
+    @Override
+    public Iterator<String> call(Iterator<FileDescriptor> iter) throws Exception {
+      Iterator<String> orphanFilesIterator = Iterators.transform(iter,
+          row -> isOrphan(row) ? row.getActualFilePath() : null);
+      return Iterators.filter(orphanFilesIterator, str -> !Strings.isNullOrEmpty(str));
+    }
+
+    boolean isOrphan(FileDescriptor desc) {
+      if (desc.getValidPath() == null) {
+        return true;
+      }
+      if (!isEqual(desc.getActualScheme(), desc.getValidScheme())) {
+        setAcc.add(Pair.of(desc.getActualScheme(), desc.getValidScheme()));
+      }
+      if (!isEqual(desc.getActualAuthority(), desc.getValidAuthority())) {
+        setAcc.add(Pair.of(desc.getActualAuthority(), desc.getValidAuthority()));
+      }
+      return false;
+    }
+
+    private boolean isEqual(String actual, String valid) {

Review Comment:
   It is a little bit hard to wrap the head around this logic. Would something like this be easier to grasp?
   
   ```
     private static MapPartitionsFunction<Tuple2<FileMetadata, FileMetadata>, String> findOrphanFiles(
         PrefixMismatchMode mode,
         SetAccumulator<Pair<String, String>> conflicts) {
   
       return rows -> {
         Iterator<String> transformed = Iterators.transform(rows, row -> {
           FileMetadata actual = row._1;
           FileMetadata valid = row._2;
   
           if (valid.path == null) {
             return actual.location;
           }
   
           boolean schemeMatch = Strings.isNullOrEmpty(valid.scheme) || valid.scheme.equalsIgnoreCase(actual.scheme);
           boolean authorityMatch = Strings.isNullOrEmpty(valid.authority) || valid.authority.equalsIgnoreCase(actual.authority);
   
           switch (mode) {
             ...
           }
         });
   
         return Iterators.filter(transformed, Objects::nonNull);
       };
     }
   ```



##########
spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/BaseDeleteOrphanFilesSparkAction.java:
##########
@@ -213,14 +239,15 @@ private DeleteOrphanFiles.Result doExecute() {
     Dataset<Row> validFileDF = validContentFileDF.union(validMetadataFileDF);
     Dataset<Row> actualFileDF = compareToFileList == null ? buildActualFileDF() : filteredCompareToFileList();
 
-    Column actualFileName = filenameUDF.apply(actualFileDF.col(FILE_PATH));
-    Column validFileName = filenameUDF.apply(validFileDF.col(FILE_PATH));
-    Column nameEqual = actualFileName.equalTo(validFileName);
-    Column actualContains = actualFileDF.col(FILE_PATH).contains(validFileDF.col(FILE_PATH));
-    Column joinCond = nameEqual.and(actualContains);
-    List<String> orphanFiles = actualFileDF.join(validFileDF, joinCond, "leftanti")
-        .as(Encoders.STRING())
-        .collectAsList();
+    SetAccumulator<Pair<String, String>> setAccumulator = new SetAccumulator<>();
+    spark().sparkContext().register(setAccumulator);
+    List<String> orphanFiles =

Review Comment:
   What about one of the following formatting variants?
   
   ```
   List<String> orphanFiles = findOrphanFiles(
       spark(), actualFileDF, validFileDF, equalSchemes, equalAuthorities, prefixMismatchMode);
   ```
   
   ```
   List<String> orphanFiles = findOrphanFiles(
       spark(), actualFileDF, validFileDF,
       equalSchemes, equalAuthorities, prefixMismatchMode);
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org