You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@iceberg.apache.org by GitBox <gi...@apache.org> on 2022/08/12 18:27:14 UTC

[GitHub] [iceberg] szehon-ho commented on a diff in pull request #5495: Spark 3.3: Reduce serialization in DeleteOrphanFilesSparkAction

szehon-ho commented on code in PR #5495:
URL: https://github.com/apache/iceberg/pull/5495#discussion_r944698719


##########
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/DeleteOrphanFilesSparkAction.java:
##########
@@ -426,65 +405,159 @@ private static Map<String, String> flattenMap(Map<String, String> map) {
     if (map != null) {
       for (String key : map.keySet()) {
         String value = map.get(key);
-        for (String splitKey : COMMA.split(key)) {
+        for (String splitKey : COMMA_SPLITTER.split(key)) {
           flattenedMap.put(splitKey.trim(), value.trim());
         }
       }
     }
     return flattenedMap;
   }
 
-  private static MapPartitionsFunction<Tuple2<FileMetadata, FileMetadata>, String> findOrphanFiles(
-      PrefixMismatchMode mode, SetAccumulator<Pair<String, String>> conflicts) {
-    return rows -> {
-      Iterator<String> transformed =
-          Iterators.transform(
-              rows,
-              row -> {
-                FileMetadata actual = row._1;
-                FileMetadata valid = row._2;
-
-                if (valid == null) {
-                  return actual.location;
-                }
-
-                boolean schemeMatch =
-                    Strings.isNullOrEmpty(valid.scheme)
-                        || valid.scheme.equalsIgnoreCase(actual.scheme);
-                boolean authorityMatch =
-                    Strings.isNullOrEmpty(valid.authority)
-                        || valid.authority.equalsIgnoreCase(actual.authority);
-
-                if ((!schemeMatch || !authorityMatch) && mode == PrefixMismatchMode.DELETE) {
-                  return actual.location;
-                } else {
-                  if (!schemeMatch) {
-                    conflicts.add(Pair.of(valid.scheme, actual.scheme));
-                  }
-                  if (!authorityMatch) {
-                    conflicts.add(Pair.of(valid.authority, actual.authority));
-                  }
-                }
-
-                return null;
-              });
-      return Iterators.filter(transformed, Objects::nonNull);
-    };
+  private static class ListDirsRecursively implements FlatMapFunction<Iterator<String>, String> {
+
+    private final Broadcast<SerializableConfiguration> hadoopConf;
+    private final long olderThanTimestamp;
+    private final PathFilter pathFilter;
+
+    ListDirsRecursively(
+        Broadcast<SerializableConfiguration> hadoopConf,
+        long olderThanTimestamp,
+        PathFilter pathFilter) {
+
+      this.hadoopConf = hadoopConf;
+      this.olderThanTimestamp = olderThanTimestamp;
+      this.pathFilter = pathFilter;
+    }
+
+    @Override
+    public Iterator<String> call(Iterator<String> dirs) throws Exception {
+      List<String> subDirs = Lists.newArrayList();
+      List<String> files = Lists.newArrayList();
+
+      Predicate<FileStatus> predicate = file -> file.getModificationTime() < olderThanTimestamp;
+
+      dirs.forEachRemaining(
+          dir -> {

Review Comment:
   from the previous pr, but looks like extra backet can be replaced



##########
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/DeleteOrphanFilesSparkAction.java:
##########
@@ -426,65 +405,159 @@ private static Map<String, String> flattenMap(Map<String, String> map) {
     if (map != null) {
       for (String key : map.keySet()) {
         String value = map.get(key);
-        for (String splitKey : COMMA.split(key)) {
+        for (String splitKey : COMMA_SPLITTER.split(key)) {
           flattenedMap.put(splitKey.trim(), value.trim());
         }
       }
     }
     return flattenedMap;
   }
 
-  private static MapPartitionsFunction<Tuple2<FileMetadata, FileMetadata>, String> findOrphanFiles(
-      PrefixMismatchMode mode, SetAccumulator<Pair<String, String>> conflicts) {
-    return rows -> {
-      Iterator<String> transformed =
-          Iterators.transform(
-              rows,
-              row -> {
-                FileMetadata actual = row._1;
-                FileMetadata valid = row._2;
-
-                if (valid == null) {
-                  return actual.location;
-                }
-
-                boolean schemeMatch =
-                    Strings.isNullOrEmpty(valid.scheme)
-                        || valid.scheme.equalsIgnoreCase(actual.scheme);
-                boolean authorityMatch =
-                    Strings.isNullOrEmpty(valid.authority)
-                        || valid.authority.equalsIgnoreCase(actual.authority);
-
-                if ((!schemeMatch || !authorityMatch) && mode == PrefixMismatchMode.DELETE) {
-                  return actual.location;
-                } else {
-                  if (!schemeMatch) {
-                    conflicts.add(Pair.of(valid.scheme, actual.scheme));
-                  }
-                  if (!authorityMatch) {
-                    conflicts.add(Pair.of(valid.authority, actual.authority));
-                  }
-                }
-
-                return null;
-              });
-      return Iterators.filter(transformed, Objects::nonNull);
-    };
+  private static class ListDirsRecursively implements FlatMapFunction<Iterator<String>, String> {
+
+    private final Broadcast<SerializableConfiguration> hadoopConf;
+    private final long olderThanTimestamp;
+    private final PathFilter pathFilter;
+
+    ListDirsRecursively(
+        Broadcast<SerializableConfiguration> hadoopConf,
+        long olderThanTimestamp,
+        PathFilter pathFilter) {
+
+      this.hadoopConf = hadoopConf;
+      this.olderThanTimestamp = olderThanTimestamp;
+      this.pathFilter = pathFilter;
+    }
+
+    @Override
+    public Iterator<String> call(Iterator<String> dirs) throws Exception {
+      List<String> subDirs = Lists.newArrayList();
+      List<String> files = Lists.newArrayList();
+
+      Predicate<FileStatus> predicate = file -> file.getModificationTime() < olderThanTimestamp;
+
+      dirs.forEachRemaining(
+          dir -> {
+            listDirRecursively(
+                dir,
+                predicate,
+                hadoopConf.value().value(),
+                MAX_EXECUTOR_LISTING_DEPTH,
+                MAX_EXECUTOR_LISTING_DIRECT_SUB_DIRS,
+                subDirs,
+                pathFilter,
+                files);
+          });
+
+      if (!subDirs.isEmpty()) {
+        throw new RuntimeException(
+            "Could not list sub directories, reached maximum depth: " + MAX_EXECUTOR_LISTING_DEPTH);
+      }
+
+      return files.iterator();
+    }
   }
 
-  private static MapPartitionsFunction<Row, FileMetadata> toFileMetadata(
-      Map<String, String> equalSchemesMap, Map<String, String> equalAuthoritiesMap) {
-    return rows ->
-        Iterators.transform(
-            rows,
-            row -> {
-              String location = row.getString(0);
-              URI uri = new Path(location).toUri();
-              String scheme = equalSchemesMap.getOrDefault(uri.getScheme(), uri.getScheme());
-              String authority =
-                  equalAuthoritiesMap.getOrDefault(uri.getAuthority(), uri.getAuthority());
-              return new FileMetadata(scheme, authority, uri.getPath(), location);
-            });
+  private static class FindOrphanFiles
+      implements MapPartitionsFunction<Tuple2<FileURI, FileURI>, String> {
+
+    private final PrefixMismatchMode mode;
+    private final SetAccumulator<Pair<String, String>> conflicts;
+
+    FindOrphanFiles(PrefixMismatchMode mode, SetAccumulator<Pair<String, String>> conflicts) {
+      this.mode = mode;
+      this.conflicts = conflicts;
+    }
+
+    @Override
+    public Iterator<String> call(Iterator<Tuple2<FileURI, FileURI>> rows) throws Exception {
+      Iterator<String> orphanFiles = Iterators.transform(rows, this::toOrphanFile);
+      return Iterators.filter(orphanFiles, Objects::nonNull);
+    }
+
+    private String toOrphanFile(Tuple2<FileURI, FileURI> row) {
+      FileURI actual = row._1;
+      FileURI valid = row._2;
+
+      if (valid == null) {
+        return actual.valueAsString;
+      }
+
+      boolean schemeMatch = uriComponentMatch(valid.scheme, actual.scheme);
+      boolean authorityMatch = uriComponentMatch(valid.authority, actual.authority);
+
+      if ((!schemeMatch || !authorityMatch) && mode == PrefixMismatchMode.DELETE) {
+        return actual.valueAsString;
+      } else {
+        if (!schemeMatch) {
+          conflicts.add(Pair.of(valid.scheme, actual.scheme));
+        }
+
+        if (!authorityMatch) {
+          conflicts.add(Pair.of(valid.authority, actual.authority));
+        }
+
+        return null;
+      }
+    }
+
+    private boolean uriComponentMatch(String valid, String actual) {
+      return Strings.isNullOrEmpty(valid) || valid.equalsIgnoreCase(actual);
+    }
+  }
+
+  @VisibleForTesting
+  static class StringToFileURI extends ToFileURI<String> {

Review Comment:
   I saw the comment about making classes for consistency, but style question, would a Function<I, String> uriAsString in constructor of ToFileURI be better?  Can eliminate these two classes that way, which are mostly boilerplate except the uriAsString method



##########
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/DeleteOrphanFilesSparkAction.java:
##########
@@ -256,41 +256,62 @@ private DeleteOrphanFiles.Result doExecute() {
     return new BaseDeleteOrphanFilesActionResult(orphanFiles);
   }
 
-  private Dataset<Row> buildValidFileDF() {
-    return contentFileDS(table)
-        .union(manifestDS(table))
-        .union(manifestListDS(table))
-        .union(otherMetadataFileDS(table))
-        .toDF(FILE_PATH, FILE_TYPE)
-        .select(FILE_PATH);
+  private Dataset<FileURI> validFileIdentDS() {
+    // apply the transformation before union to avoid extra serialization
+    FileInfoToFileURI toURI = new FileInfoToFileURI(equalSchemes, equalAuthorities);
+
+    Dataset<FileURI> contentFileIdentDS = toURI.apply(contentFileDS(table));
+    Dataset<FileURI> manifestFileIdentDS = toURI.apply(manifestDS(table));
+    Dataset<FileURI> manifestListIdentDS = toURI.apply(manifestListDS(table));
+    Dataset<FileURI> otherMetadataFileIdentDS = toURI.apply(otherMetadataFileDS(table));
+
+    return contentFileIdentDS
+        .union(manifestFileIdentDS)
+        .union(manifestListIdentDS)
+        .union(otherMetadataFileIdentDS);
   }
 
-  private Dataset<Row> buildActualFileDF() {
+  private Dataset<FileURI> actualFileIdentDS() {

Review Comment:
   As we already typed the Dataset as FileURI, should the method be called 'actualFileURIDS'



##########
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/DeleteOrphanFilesSparkAction.java:
##########
@@ -517,33 +592,34 @@ static PathFilter forSpecs(Map<Integer, PartitionSpec> specs) {
           specs.values().stream()
               .map(PartitionSpec::fields)
               .flatMap(List::stream)
-              .filter(
-                  partitionField ->
-                      partitionField.name().startsWith("_")
-                          || partitionField.name().startsWith("."))
-              .map(partitionField -> partitionField.name() + "=")
+              .filter(field -> field.name().startsWith("_") || field.name().startsWith("."))
+              .map(field -> field.name() + "=")
               .collect(Collectors.toSet());
 
-      return partitionNames.isEmpty()
-          ? HiddenPathFilter.get()
-          : new PartitionAwareHiddenPathFilter(partitionNames);
+      if (partitionNames.isEmpty()) {
+        return HiddenPathFilter.get();
+      } else {
+        return new PartitionAwareHiddenPathFilter(partitionNames);
+      }
     }
   }
 
-  public static class FileMetadata implements Serializable {
+  public static class FileURI {
+    public static final Encoder<FileURI> ENCODER = Encoders.bean(FileURI.class);
+
     private String scheme;
     private String authority;
     private String path;
-    private String location;
+    private String valueAsString;
 
-    public FileMetadata(String scheme, String authority, String path, String location) {
+    public FileURI(String scheme, String authority, String path, String valueAsString) {

Review Comment:
   Optional, would uriAsString be more descriptive, if we are changing it anyway?



##########
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/DeleteOrphanFilesSparkAction.java:
##########
@@ -103,11 +102,14 @@ public class DeleteOrphanFilesSparkAction extends BaseSparkAction<DeleteOrphanFi
     implements DeleteOrphanFiles {
 
   private static final Logger LOG = LoggerFactory.getLogger(DeleteOrphanFilesSparkAction.class);
-  private static final Splitter COMMA = Splitter.on(",");
   private static final Map<String, String> EQUAL_SCHEMES_DEFAULT = ImmutableMap.of("s3n,s3a", "s3");

Review Comment:
   Sorry I'm late to other review and might have missed the discussion, but the format of equalSchemes and equalAuthorities looks a bit confusing for a user to set.  Why is it a map of (list -> value) and not the other way around?  Or what is the differentiation between key/value , I imagine if everything of key/value is equal, it should be a list of lists?  



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org