You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@iceberg.apache.org by GitBox <gi...@apache.org> on 2022/06/06 05:30:42 UTC

[GitHub] [iceberg] karuppayya commented on a diff in pull request #4652: ICEBERG-4346: Better handling of Orphan files

karuppayya commented on code in PR #4652:
URL: https://github.com/apache/iceberg/pull/4652#discussion_r889833722


##########
spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/BaseDeleteOrphanFilesSparkAction.java:
##########
@@ -361,4 +403,90 @@ static PathFilter forSpecs(Map<Integer, PartitionSpec> specs) {
       return partitionNames.isEmpty() ? HiddenPathFilter.get() : new PartitionAwareHiddenPathFilter(partitionNames);
     }
   }
+
+  private static Dataset<Row> sanitizeAndSplitPaths(Dataset<Row> filesDF) {
+    return filesDF.mapPartitions((MapPartitionsFunction<Row, Row>) input ->
+        Iterators.transform(input, row -> {
+          String pathString = row.getString(0);
+          Path path = new Path(pathString);
+          URI uri = path.toUri();
+          List<Object> values = Lists.newArrayList(uri.getScheme(), uri.getAuthority(), uri.getPath(), pathString);
+          return Row$.MODULE$.apply(scala.collection.JavaConverters.asScalaBuffer(values).toSeq());
+        }), RowEncoder.apply(fileSchema));
+  }
+  static class MapOrphanFilesFunction implements MapPartitionsFunction<Row, String> {
+
+    private final List<String> equivalentSchemes;
+    private final List<String> equivalentAuthorities;
+    private final PrefixMisMatchMode mismatchMode;
+    private final StructType scheme;
+
+    MapOrphanFilesFunction(List<String> equivalentSchemes,
+                           List<String> equivalentAuthorities,
+                           PrefixMisMatchMode mismatchMode,
+                           StructType schema) {
+      this.equivalentSchemes = equivalentSchemes;
+      this.equivalentAuthorities = equivalentAuthorities;
+      this.mismatchMode = mismatchMode;
+      this.scheme = schema;
+    }
+
+    @Override
+    public Iterator<String> call(Iterator<Row> value) throws Exception {
+
+      Iterator<String> orphanFilesIterator = Iterators.transform(value, row -> {
+        if (isOrphan(row)) {
+          return row.getString(3);
+        } else {
+          return null;
+        }
+      });
+      return Iterators.filter(orphanFilesIterator, StringUtils::isNotBlank);
+    }
+
+    boolean isOrphan(Row row) {
+      String[] fields = scheme.fieldNames();
+
+      // actual file related fields
+      assert (fields[0].equalsIgnoreCase("scheme"));

Review Comment:
   @rdblue 
   I see the following comment in the `org.apache.spark.sql.Encoders#javaSerialization`
   ```
     /**
      * Creates an encoder that serializes objects of type T using generic Java serialization.
      * This encoder maps T into a single byte array (binary) field.
      *
      * T must be publicly accessible.
      *
      * @note This is extremely inefficient and should only be used as the last resort.
      *
      * @since 1.6.0
      */
   ```
   Like mentioned in Anton's comment, should we use the indices since the DF is constructed in the code?
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org