You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@iceberg.apache.org by GitBox <gi...@apache.org> on 2020/09/17 03:30:14 UTC

[GitHub] [iceberg] manishmalhotrawork opened a new pull request #1471: RemoveOrphanFiles: consider only path to compare and delete, avoid authority and scheme

manishmalhotrawork opened a new pull request #1471:
URL: https://github.com/apache/iceberg/pull/1471


   ### **Problem:**
   
   - Different authorities and scheme names can result into identifying/deleting live/valid files.
   - So, if file paths are like this:
   
   ```
   in metadata/manifests:
   
   hdfs:/user/head/warehouse/core_attribute_wom_live/data/site_group=PGKS/parent_wom=037/00048-13203-d1130e00-3244-4a6d-8772-a6a49f948778-00026.parquet
   -----------------------------------------------------------------------------------------------------------------------------------------------------------
   from file_system:
   
   hdfs://nameservice1/user/head/warehouse/core_attribute_wom_live/data/site_group=PGKS/parent_wom=037/00048-13203-d1130e00-3244-4a6d-8772-a6a49f948778-00026.parquet
   
   OR
   
   myhdfs://nameservice1/user/head/warehouse/core_attribute_wom_live/data/site_group=PGKS/parent_wom=037/00048-13203-d1130e00-3244-4a6d-8772-a6a49f948778-00026.parquet
   
   ```
   which means, in comparison between metadata/valid and FS will not match, and will be identified to be deleted.
   
   - Specially in case of HDFS this is very usual to access same HDFS using different authority-names and also with authority as well. It all depends on how core-site.xml and on server side these names are mapped.
   
   ### **Solution:**
   
   - use `Path.toURI` to find `uri.getPath` (no authority and scheme) to calculate relative path for the files.
   - now compare relative file path between metadata/valid files and FS files.
   
    


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on pull request #1471: RemoveOrphanFiles: consider only path to compare and delete, avoid authority and scheme

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on pull request #1471:
URL: https://github.com/apache/iceberg/pull/1471#issuecomment-694600590


   I've been thinking about this for a while but could not come up with a good alternative. The only other option is to ignore or fail if we could not resolve files against the root location we list. That means some files can be orphan forever.
   
   It seems that checking paths should be ok as it will contain both the table location + partition path + file name (that most likely will include some UUID). In the worst case, we will not remove some orphan files and that should be extremely unlikely.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] manishmalhotrawork commented on a change in pull request #1471: RemoveOrphanFiles: consider only path to compare and delete, avoid authority and scheme

Posted by GitBox <gi...@apache.org>.
manishmalhotrawork commented on a change in pull request #1471:
URL: https://github.com/apache/iceberg/pull/1471#discussion_r490538944



##########
File path: spark/src/main/java/org/apache/iceberg/actions/RemoveOrphanFilesAction.java
##########
@@ -254,4 +270,53 @@ private static void listDirRecursively(
       return files.iterator();
     };
   }
+
+  protected static List<String> findOrphanFiles(
+      Dataset<Row> validFileDF,
+      Dataset<Row> actualFileDF) {
+    Column nameEqual = filename.apply(actualFileDF.col(RELATIVE_FILE_PATH))
+        .equalTo(filename.apply(validFileDF.col(RELATIVE_FILE_PATH)));
+
+    Column pathContains = actualFileDF.col(RELATIVE_FILE_PATH)
+        .contains(validFileDF.col(RELATIVE_FILE_PATH));
+
+    Column joinCond = nameEqual.and(pathContains);

Review comment:
       @RussellSpitzer `nameEquals` is only for fileName and does not include path. Also this condition was already there so didnt want to change it.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] RussellSpitzer commented on a change in pull request #1471: RemoveOrphanFiles: consider only path to compare and delete, avoid authority and scheme

Posted by GitBox <gi...@apache.org>.
RussellSpitzer commented on a change in pull request #1471:
URL: https://github.com/apache/iceberg/pull/1471#discussion_r490352331



##########
File path: spark/src/main/java/org/apache/iceberg/actions/RemoveOrphanFilesAction.java
##########
@@ -254,4 +270,53 @@ private static void listDirRecursively(
       return files.iterator();
     };
   }
+
+  protected static List<String> findOrphanFiles(
+      Dataset<Row> validFileDF,
+      Dataset<Row> actualFileDF) {
+    Column nameEqual = filename.apply(actualFileDF.col(RELATIVE_FILE_PATH))
+        .equalTo(filename.apply(validFileDF.col(RELATIVE_FILE_PATH)));
+
+    Column pathContains = actualFileDF.col(RELATIVE_FILE_PATH)
+        .contains(validFileDF.col(RELATIVE_FILE_PATH));
+
+    Column joinCond = nameEqual.and(pathContains);
+    Column decodeFilepath = decode.apply(actualFileDF.col(FILE_PATH));
+    return actualFileDF.join(validFileDF, joinCond, "leftanti").select(decodeFilepath)
+        .as(Encoders.STRING())
+        .collectAsList();
+  }
+
+  /**
+   * From
+   * <pre>{@code
+   * Dataset<Row<file_path_with_scheme_authority>>
+   *    will be transformed to
+   *    Dataset<Row<file_path_no_scheme_authority, file_path_with_scheme_authority>>
+   *  }</pre>
+   *
+   * This is required to compare the valid and all files to find the orphan files.
+   * Based on the result data set, only path will be compared while comparing valid and all files path.
+   * As in the case of hadoop, s3, there could be different authority names to access same path, which can give us files
+   * which are part of metadata and not orphan.
+   *
+   * @param filePathWithSchemeAndAuthority : complete file path, can include scheme, authority and path.
+   * @return : {@code file_path_no_scheme_authority, file_path}
+   */
+  protected static Dataset<Row> addRelativePathColumn(Dataset<Row> filePathWithSchemeAndAuthority) {
+    return filePathWithSchemeAndAuthority.withColumn(URI_DETAIL,
+        fileWithRelativePath.apply(
+            filePathWithSchemeAndAuthority.apply(FILE_PATH)
+        )).selectExpr(
+        URI_DETAIL + ". " + RELATIVE_FILE_PATH + " as "  + RELATIVE_FILE_PATH, // relative path

Review comment:
       extra space in here, I would suggest doing this as a String.format just to be safe

##########
File path: spark/src/main/java/org/apache/iceberg/actions/RemoveOrphanFilesAction.java
##########
@@ -79,6 +87,20 @@
     }
   }, DataTypes.StringType);
 
+  private static final UserDefinedFunction decode = functions.udf((String fullyQualifiedPath) -> {
+    return URLDecoder.decode(fullyQualifiedPath, "UTF-8");
+  }, DataTypes.StringType);
+
+  /**
+   * transform a file path to

Review comment:
       Nit: capitilaization 

##########
File path: spark/src/main/java/org/apache/iceberg/actions/RemoveOrphanFilesAction.java
##########
@@ -79,6 +87,20 @@
     }
   }, DataTypes.StringType);
 
+  private static final UserDefinedFunction decode = functions.udf((String fullyQualifiedPath) -> {
+    return URLDecoder.decode(fullyQualifiedPath, "UTF-8");
+  }, DataTypes.StringType);
+
+  /**
+   * transform a file path to
+   * {@code Dataset<Row<file_path_no_scheme_authority, file_path_with_scheme_authority>>}
+   */
+  private static final UserDefinedFunction fileWithRelativePath = functions.udf((String fullyQualifiedPath) -> {

Review comment:
       I don't want to bike shed to much here, but I think this name ends up being confusing because later we use it in conjunction with very similarly named variables which represent Dataframes and also use Apply methods. I would recommend we rename this to something with "UDF" in it so it's clear.

##########
File path: spark/src/main/java/org/apache/iceberg/actions/RemoveOrphanFilesAction.java
##########
@@ -254,4 +270,53 @@ private static void listDirRecursively(
       return files.iterator();
     };
   }
+
+  protected static List<String> findOrphanFiles(
+      Dataset<Row> validFileDF,
+      Dataset<Row> actualFileDF) {
+    Column nameEqual = filename.apply(actualFileDF.col(RELATIVE_FILE_PATH))
+        .equalTo(filename.apply(validFileDF.col(RELATIVE_FILE_PATH)));
+
+    Column pathContains = actualFileDF.col(RELATIVE_FILE_PATH)
+        .contains(validFileDF.col(RELATIVE_FILE_PATH));
+
+    Column joinCond = nameEqual.and(pathContains);
+    Column decodeFilepath = decode.apply(actualFileDF.col(FILE_PATH));
+    return actualFileDF.join(validFileDF, joinCond, "leftanti").select(decodeFilepath)
+        .as(Encoders.STRING())
+        .collectAsList();
+  }
+
+  /**
+   * From
+   * <pre>{@code
+   * Dataset<Row<file_path_with_scheme_authority>>
+   *    will be transformed to
+   *    Dataset<Row<file_path_no_scheme_authority, file_path_with_scheme_authority>>
+   *  }</pre>
+   *
+   * This is required to compare the valid and all files to find the orphan files.
+   * Based on the result data set, only path will be compared while comparing valid and all files path.
+   * As in the case of hadoop, s3, there could be different authority names to access same path, which can give us files
+   * which are part of metadata and not orphan.
+   *
+   * @param filePathWithSchemeAndAuthority : complete file path, can include scheme, authority and path.
+   * @return : {@code file_path_no_scheme_authority, file_path}
+   */
+  protected static Dataset<Row> addRelativePathColumn(Dataset<Row> filePathWithSchemeAndAuthority) {
+    return filePathWithSchemeAndAuthority.withColumn(URI_DETAIL,
+        fileWithRelativePath.apply(

Review comment:
       This is where the UDF naming looks confusing to me since we call apply on 2 objects which are very different and they have similar names, one is data one is code.

##########
File path: spark/src/test/java/org/apache/iceberg/actions/TestRemoveOrphanFilesAction.java
##########
@@ -528,6 +566,71 @@ public void testRemoveOrphanFilesWithRelativeFilePath() throws IOException, Inte
     Assert.assertTrue("Invalid file should be present", fs.exists(new Path(invalidFiles.get(0))));
   }
 
+  @Test
+  public void testRemoveOrphanFilesWithSpecialCharsFilePath() throws IOException, InterruptedException {
+    File whiteSpaceDir = null;
+    try {
+      whiteSpaceDir = new File(tableDir.getAbsolutePath() + "/white space");
+      whiteSpaceDir.mkdirs();
+
+      Table table = TABLES.create(
+          SCHEMA,
+          PartitionSpec.unpartitioned(),
+          Maps.newHashMap(),
+          tableDir.getAbsolutePath() + "/white space");
+
+      List<ThreeColumnRecord> records = Lists.newArrayList(
+          new ThreeColumnRecord(1, "AAAAAAAAAA", "AAAA")
+      );
+
+      Dataset<Row> df = spark.createDataFrame(records, ThreeColumnRecord.class).coalesce(1);
+
+      df.select("c1", "c2", "c3")
+          .write()
+          .format("iceberg")
+          .mode("append")
+          .save(whiteSpaceDir.getAbsolutePath());
+
+      List<String> validFiles = spark.read().format("iceberg")
+          .load(whiteSpaceDir + "#files")
+          .select("file_path")
+          .as(Encoders.STRING())
+          .collectAsList();
+      Assert.assertEquals("Should be 1 valid files", 1, validFiles.size());
+      String validFile = validFiles.get(0);
+
+      df.write().mode("append").parquet(whiteSpaceDir + "/data");
+
+      Path dataPath = new Path(whiteSpaceDir + "/data");
+      FileSystem fs = dataPath.getFileSystem(spark.sessionState().newHadoopConf());
+      List<String> allFiles = Arrays.stream(fs.listStatus(dataPath, HiddenPathFilter.get()))
+          .filter(FileStatus::isFile)
+          .map(file -> file.getPath().toString())
+          .collect(Collectors.toList());
+      Assert.assertEquals("Should be 2 files", 2, allFiles.size());
+
+      List<String> invalidFiles = Lists.newArrayList(allFiles);
+      invalidFiles.removeIf(file -> file.contains(validFile));
+      Assert.assertEquals("Should be 1 invalid file", 1, invalidFiles.size());
+
+      // sleep for 1 second to unsure files will be old enough
+      Thread.sleep(1000);
+
+      Actions actions = Actions.forTable(table);
+      List<String> result = actions.removeOrphanFiles()
+          .olderThan(System.currentTimeMillis())
+          .execute();
+      Assert.assertEquals("Action should find 1 file", invalidFiles, result);

Review comment:
       Couldn't we check whether result matches invalid files?

##########
File path: spark/src/main/java/org/apache/iceberg/actions/RemoveOrphanFilesAction.java
##########
@@ -254,4 +270,53 @@ private static void listDirRecursively(
       return files.iterator();
     };
   }
+
+  protected static List<String> findOrphanFiles(
+      Dataset<Row> validFileDF,
+      Dataset<Row> actualFileDF) {
+    Column nameEqual = filename.apply(actualFileDF.col(RELATIVE_FILE_PATH))
+        .equalTo(filename.apply(validFileDF.col(RELATIVE_FILE_PATH)));
+
+    Column pathContains = actualFileDF.col(RELATIVE_FILE_PATH)
+        .contains(validFileDF.col(RELATIVE_FILE_PATH));
+
+    Column joinCond = nameEqual.and(pathContains);

Review comment:
       Isn't PathContains always true if nameEqual? I feel like you just need nameEqual Here

##########
File path: spark/src/test/java/org/apache/iceberg/actions/TestRemoveOrphanFilesAction.java
##########
@@ -528,6 +566,71 @@ public void testRemoveOrphanFilesWithRelativeFilePath() throws IOException, Inte
     Assert.assertTrue("Invalid file should be present", fs.exists(new Path(invalidFiles.get(0))));
   }
 
+  @Test
+  public void testRemoveOrphanFilesWithSpecialCharsFilePath() throws IOException, InterruptedException {
+    File whiteSpaceDir = null;
+    try {
+      whiteSpaceDir = new File(tableDir.getAbsolutePath() + "/white space");

Review comment:
       Not sure we need to be careful with cleanup here, couldn't we just use temp.newDir and then skip the cleanup?

##########
File path: spark/src/test/java/org/apache/iceberg/actions/TestRemoveOrphanFilesAction.java
##########
@@ -569,4 +672,349 @@ public void testRemoveOrphanFilesWithHadoopCatalog() throws InterruptedException
         .collectAsList();
     Assert.assertEquals("Rows must match", records, actualRecords);
   }
+
+  @Test
+  public void testFindOrphanFilesWithSameAuthority() throws Exception {
+    List<Row> validFilesData = getRowsWithFilePath(HDFS_SERVICENAME_USER_LOG_DATA_DUMMY_FILE, 3);
+
+    Dataset<Row> validFileDS = RemoveOrphanFilesAction
+        .addRelativePathColumn(
+            spark.createDataset(validFilesData, RowEncoder.apply(constructStructureWithString())));
+
+    List<Row> actualFilesData = new ArrayList<>();
+
+    actualFilesData.addAll(validFilesData);
+    List<Row> expectedOrphanFiles = getRowsWithFilePath(HDFS_SERVICENAME_USER_LOG_DATA_ORPHAN_FILE, 4);
+    actualFilesData.addAll(expectedOrphanFiles);
+
+    Dataset<Row> actualFileDS = RemoveOrphanFilesAction
+        .addRelativePathColumn(
+            spark.createDataset(actualFilesData, RowEncoder.apply(constructStructureWithString())));
+
+    List<String> orphanFiles = RemoveOrphanFilesAction.findOrphanFiles(validFileDS, actualFileDS);
+
+    Assert.assertNotNull(orphanFiles);
+
+    Assert.assertEquals(expectedOrphanFiles.stream().map(row -> row.get(0).toString())
+        .collect(Collectors.toList()), orphanFiles);
+  }
+
+  @Test
+  public void testFindOrphanFilesWithValidFileHasNoAuthority() throws Exception {
+    List<Row> validFilesData = getRowsWithFilePath(HDFS_USER_LOG_DATA_DUMMY_FILE, 4);
+    Dataset<Row> validFileDS = RemoveOrphanFilesAction
+        .addRelativePathColumn(
+            spark.createDataset(validFilesData, RowEncoder.apply(constructStructureWithString())));
+
+    List<Row> actualFilesData = new ArrayList<>();
+
+    actualFilesData.addAll(getRowsWithFilePath(HDFS_SERVICENAME_USER_LOG_DATA_DUMMY_FILE, 4));
+    List<Row> expectedOrphanFiles = getRowsWithFilePath(HDFS_SERVICENAME_USER_LOG_DATA_ORPHAN_FILE, 4);
+    actualFilesData.addAll(expectedOrphanFiles);
+
+    Dataset<Row> actualFileDS = RemoveOrphanFilesAction
+        .addRelativePathColumn(
+            spark.createDataset(actualFilesData, RowEncoder.apply(constructStructureWithString())));
+
+    List<String> orphanFiles = RemoveOrphanFilesAction.findOrphanFiles(validFileDS, actualFileDS);
+
+    Assert.assertEquals(expectedOrphanFiles.stream().map(row -> row.get(0).toString())
+        .collect(Collectors.toList()), orphanFiles);
+  }
+
+  @Test

Review comment:
       My gut tells me we should be able to parameterize all of tests around the FileName so we can remove a bit of duplicate code here.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] RussellSpitzer commented on a change in pull request #1471: RemoveOrphanFiles: consider only path to compare and delete, avoid authority and scheme

Posted by GitBox <gi...@apache.org>.
RussellSpitzer commented on a change in pull request #1471:
URL: https://github.com/apache/iceberg/pull/1471#discussion_r490575500



##########
File path: spark/src/main/java/org/apache/iceberg/actions/RemoveOrphanFilesAction.java
##########
@@ -79,6 +87,20 @@
     }
   }, DataTypes.StringType);
 
+  private static final UserDefinedFunction decodeUDF = functions.udf((String fullyQualifiedPath) -> {
+    return URLDecoder.decode(fullyQualifiedPath, "UTF-8");
+  }, DataTypes.StringType);
+
+  /**
+   * Transform a file path to
+   * {@code Dataset<Row<file_path_no_scheme_authority, file_path_with_scheme_authority>>}
+   */
+  private static final UserDefinedFunction addFilePathOnlyUDF = functions.udf((String fullyQualifiedPath) -> {
+    Path path = new Path(fullyQualifiedPath);
+    // only_path, fully qualified path
+    return RowFactory.create(path.toUri().getPath(), path.toUri().toString());
+  }, DataTypes.createStructType(fileDetailStructType().fields()));

Review comment:
       See comment here https://github.com/apache/iceberg/pull/1471/files#r490575150 for more thoughts




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] RussellSpitzer commented on pull request #1471: RemoveOrphanFiles: consider only path to compare and delete, avoid authority and scheme

Posted by GitBox <gi...@apache.org>.
RussellSpitzer commented on pull request #1471:
URL: https://github.com/apache/iceberg/pull/1471#issuecomment-717505521


   @rdblue + @manishmalhotrawork I wasn't sure if the two of you had reached consensus yet on what you would like the behavior here to be, but since I think this would be a great fix for 0.10 maybe we should start discussion again and get this PR ready.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] manishmalhotrawork commented on pull request #1471: RemoveOrphanFiles: consider only path to compare and delete, avoid authority and scheme

Posted by GitBox <gi...@apache.org>.
manishmalhotrawork commented on pull request #1471:
URL: https://github.com/apache/iceberg/pull/1471#issuecomment-694488447


   @RussellSpitzer taken care with most of the comments, can you please check.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on pull request #1471: RemoveOrphanFiles: consider only path to compare and delete, avoid authority and scheme

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on pull request #1471:
URL: https://github.com/apache/iceberg/pull/1471#issuecomment-694612728


   I think this logic can be simplified a bit. My first thought was to leverage the following code:
   
   ```
     private static final UserDefinedFunction relativePathUDF = functions.udf((String location) -> {
       Path fullyQualifiedPath = new Path(location);
       return fullyQualifiedPath.toUri().getPath();
     }, DataTypes.StringType);
   
     ...
   
     Dataset<Row> validFileDF = withRelativePathColumn(validDataFileDF.union(validMetadataFileDF));
     Dataset<Row> actualFileDF = withRelativePathColumn(buildActualFileDF());
   
     ...
   
     Column joinCond = actualFileDF.col("relative_path").equalTo(validFileDF.col("relative_path"));
     return actualFileDF.join(validFileDF, joinCond, "leftanti").select("file_path")
         .as(Encoders.STRING())
         .collectAsList();
   ```
   
   We wouldn't need the filename UDF and it would be very straightforward. Unfortunately, that does not seem to work for certain locations. For example, `path.toUri().getPath()` for `hdfs://user/location/sublocation/filename.parquet` will return `location/sublocation/filename.parquet` and `user` is considered as authority.
   
   Even if we have to keep `contains`, I think we can still leverage a single UDF:
   
   ```
     private static final UserDefinedFunction fileDetailUDF = functions.udf((String location) -> {
       Path fullyQualifiedPath = new Path(location);
       String fileName = fullyQualifiedPath.getName();
       String relativePath = fullyQualifiedPath.toUri().getPath();
       return RowFactory.create(fileName, relativePath);
     }, FILE_DETAIL_STRUCT);
   ```
   
   Then our join condition will be == on file names and contains on relative locations.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] manishmalhotrawork commented on pull request #1471: RemoveOrphanFiles: consider only path to compare and delete, avoid authority and scheme

Posted by GitBox <gi...@apache.org>.
manishmalhotrawork commented on pull request #1471:
URL: https://github.com/apache/iceberg/pull/1471#issuecomment-694679340


   > I think this logic can be simplified a bit. My first thought was to leverage the following code:
   > 
   > ```
   >   private static final UserDefinedFunction relativePathUDF = functions.udf((String location) -> {
   >     Path fullyQualifiedPath = new Path(location);
   >     return fullyQualifiedPath.toUri().getPath();
   >   }, DataTypes.StringType);
   > 
   >   ...
   > 
   >   Dataset<Row> validFileDF = withRelativePathColumn(validDataFileDF.union(validMetadataFileDF));
   >   Dataset<Row> actualFileDF = withRelativePathColumn(buildActualFileDF());
   > 
   >   ...
   > 
   >   Column joinCond = actualFileDF.col("relative_path").equalTo(validFileDF.col("relative_path"));
   >   return actualFileDF.join(validFileDF, joinCond, "leftanti").select("file_path")
   >       .as(Encoders.STRING())
   >       .collectAsList();
   > ```
   > 
   > We wouldn't need the filename UDF and it would be very straightforward. Unfortunately, that does not seem to work for certain locations. For example, `path.toUri().getPath()` for `hdfs://user/location/sublocation/filename.parquet` will return `location/sublocation/filename.parquet` and `user` is considered as authority.
   > 
   > Even if we have to keep `contains` and equality of file names, I think we can still leverage a single UDF:
   > 
   > ```
   >   private static final UserDefinedFunction fileDetailUDF = functions.udf((String location) -> {
   >     Path fullyQualifiedPath = new Path(location);
   >     String fileName = fullyQualifiedPath.getName();
   >     String relativePath = fullyQualifiedPath.toUri().getPath();
   >     return RowFactory.create(fileName, relativePath);
   >   }, FILE_DETAIL_STRUCT);
   > ```
   > 
   > Then our join condition will be == on file names and contains on relative locations.
   
   thanks @aokolnychyi !
   
   We can keep one UDF for both the fileName and paths.
   Though I think, we would need to keep `fullyQualifiedPath` as well, because it's required as orphan files path, to be deleted.
   For join conditions, these 2 columns should be good.
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] RussellSpitzer commented on a change in pull request #1471: RemoveOrphanFiles: consider only path to compare and delete, avoid authority and scheme

Posted by GitBox <gi...@apache.org>.
RussellSpitzer commented on a change in pull request #1471:
URL: https://github.com/apache/iceberg/pull/1471#discussion_r512868250



##########
File path: spark/src/test/java/org/apache/iceberg/actions/TestRemoveOrphanFilesAction.java
##########
@@ -352,6 +414,84 @@ public void testOlderThanTimestamp() throws InterruptedException {
     Assert.assertEquals("Should delete only 2 files", 2, result.size());
   }
 
+  @Test
+  public void testOlderThanTimestampWithPartitionWithWhitSpace()

Review comment:
       Typo in name, "WhitSpace"




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] manishmalhotrawork commented on a change in pull request #1471: RemoveOrphanFiles: consider only path to compare and delete, avoid authority and scheme

Posted by GitBox <gi...@apache.org>.
manishmalhotrawork commented on a change in pull request #1471:
URL: https://github.com/apache/iceberg/pull/1471#discussion_r490544695



##########
File path: spark/src/test/java/org/apache/iceberg/actions/TestRemoveOrphanFilesAction.java
##########
@@ -528,6 +566,71 @@ public void testRemoveOrphanFilesWithRelativeFilePath() throws IOException, Inte
     Assert.assertTrue("Invalid file should be present", fs.exists(new Path(invalidFiles.get(0))));
   }
 
+  @Test
+  public void testRemoveOrphanFilesWithSpecialCharsFilePath() throws IOException, InterruptedException {
+    File whiteSpaceDir = null;
+    try {
+      whiteSpaceDir = new File(tableDir.getAbsolutePath() + "/white space");

Review comment:
       sure, used tmp now.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a change in pull request #1471: RemoveOrphanFiles: consider only path to compare and delete, avoid authority and scheme

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on a change in pull request #1471:
URL: https://github.com/apache/iceberg/pull/1471#discussion_r491051153



##########
File path: spark/src/main/java/org/apache/iceberg/actions/RemoveOrphanFilesAction.java
##########
@@ -70,15 +73,34 @@
 public class RemoveOrphanFilesAction extends BaseAction<List<String>> {
 
   private static final Logger LOG = LoggerFactory.getLogger(RemoveOrphanFilesAction.class);
-  private static final UserDefinedFunction filename = functions.udf((String path) -> {
-    int lastIndex = path.lastIndexOf(File.separator);
-    if (lastIndex == -1) {
-      return path;
-    } else {
-      return path.substring(lastIndex + 1);
-    }
+
+  private static final String URI_DETAIL = "URI_DETAIL";
+  private static final String FILE_NAME = "file_name";
+  private static final String FILE_PATH = "file_path";
+  private static final String FILE_PATH_ONLY = "file_path_only";
+  private static final StructType FILE_DETAIL_STRUCT =  new StructType(new StructField[] {
+      DataTypes.createStructField(FILE_NAME, DataTypes.StringType, false),
+      DataTypes.createStructField(FILE_PATH_ONLY, DataTypes.StringType, false),
+      DataTypes.createStructField(FILE_PATH, DataTypes.StringType, false)
+  });
+
+  private static final UserDefinedFunction decodeUDF = functions.udf((String fullyQualifiedPath) -> {
+    return URLDecoder.decode(fullyQualifiedPath, "UTF-8");
   }, DataTypes.StringType);
 
+  /**
+   * Transform a file path to
+   * {@code Dataset<Row<file_name, file_path_no_scheme_authority, file_path_with_scheme_authority>>}
+   */
+  private static final UserDefinedFunction addFileDetailsUDF = functions.udf((String fileLocation) -> {
+    Path fullyQualifiedPath = new Path(fileLocation);
+    String fileName = fullyQualifiedPath.getName();
+    String filePathOnly = fullyQualifiedPath.toUri().getPath();
+    String filePath = fullyQualifiedPath.toUri().toString();

Review comment:
       Don't we have `file_path` column already that contains the fully qualified path to be deleted? We pass that column to this UDF. I think we can directly use it and it should be already decoded if I am not mistaken. If so, then we won't need `decodeUDF`.
   
   @manishmalhotrawork, what do you think?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] manishmalhotrawork commented on pull request #1471: RemoveOrphanFiles: consider only path to compare and delete, avoid authority and scheme

Posted by GitBox <gi...@apache.org>.
manishmalhotrawork commented on pull request #1471:
URL: https://github.com/apache/iceberg/pull/1471#issuecomment-693786848


   @rdblue @aokolnychyi can you please review it !!


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] RussellSpitzer commented on a change in pull request #1471: RemoveOrphanFiles: consider only path to compare and delete, avoid authority and scheme

Posted by GitBox <gi...@apache.org>.
RussellSpitzer commented on a change in pull request #1471:
URL: https://github.com/apache/iceberg/pull/1471#discussion_r490569223



##########
File path: spark/src/test/java/org/apache/iceberg/actions/TestRemoveOrphanFilesAction.java
##########
@@ -569,4 +663,220 @@ public void testRemoveOrphanFilesWithHadoopCatalog() throws InterruptedException
         .collectAsList();
     Assert.assertEquals("Rows must match", records, actualRecords);
   }
+
+  @Test
+  public void testFindOrphanFilesWithSameAuthority() throws Exception {
+    executeTest(
+        HDFS_SERVICENAME_USER_LOG_DATA_DUMMY_FILE,

Review comment:
       Makes me really want to do that Junit5 upgrade :) But this is better than before 👍 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] RussellSpitzer commented on pull request #1471: RemoveOrphanFiles: consider only path to compare and delete, avoid authority and scheme

Posted by GitBox <gi...@apache.org>.
RussellSpitzer commented on pull request #1471:
URL: https://github.com/apache/iceberg/pull/1471#issuecomment-949747196


   > In case of `S3FileIO`, default scheme is `s3://`. Writes happening from different clients, will have schemes based on `io-impl` property. The manifest might have mix of `s3://`, `s3a://` etc But the file listing(in DeleteOrphanFiles), will have only only a single prefix(which is determined by the Client Hadoop configuration). This will result in orphan files not being cleaned. When the user is aware that the scheme can be ignored, I think we should provide a configuration to do that.
   > 
   > I am not able to come up with a concrete case for the authority(may be HDFS with and with authority), but that could also be a configuration.
   > 
   > @RussellSpitzer @aokolnychyi @rdblue @flyrain @raptond Your thoughts on this?
   
   As we've talked about this before offline, I am a strong believer that not deleting files that we aren't sure about. So if for example a user has a files in
   ```
   hdfs://earth/sgcommand 
   hdfs://alphasite/sgcommand
   ```
   
   And the metadata has any of the following
   ```
   a: hdfs:///sgcommand/
   or
   b: hdfs://alphasite/sgcommand/
   or
   c: hdfs://earth/sgcommand/
   ```
   
   Our default behavior should be to not delete both original files. I think if we want we can add a optional flag for "exact matches"
   which would 
   ```
   case a: delete both files
   case b: delete only earth
   case c: delete only alphasite
   ```
   
   So by default I believe we should only compare the `path` portion of any uri assuming that everything else can be strongly dependent on the FileSystem implementation actually doing the file listing.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] karuppayya commented on pull request #1471: RemoveOrphanFiles: consider only path to compare and delete, avoid authority and scheme

Posted by GitBox <gi...@apache.org>.
karuppayya commented on pull request #1471:
URL: https://github.com/apache/iceberg/pull/1471#issuecomment-947224930


   In case of `S3FileIO`, default scheme is `s3://`.
   Writes happening from different clients, will have schemes based on `io-impl` property. The manifest might have mix of `s3://`, `s3a://` etc
   But the file listing(in DeleteOrphanFiles), will have only only a single prefix(which is determined by the Client Hadoop configuration). This will result in orphan files not being cleaned.
   When the user is aware that the scheme can be ignored, I think we should provide a configuration to do that.
   
   I am not able to come up with a concrete case for the authority(may be HDFS with and with authority), but that could also be a configuration.
   
   @RussellSpitzer @aokolnychyi @rdblue @flyrain @raptond Your thoughts on this?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] RussellSpitzer commented on a change in pull request #1471: RemoveOrphanFiles: consider only path to compare and delete, avoid authority and scheme

Posted by GitBox <gi...@apache.org>.
RussellSpitzer commented on a change in pull request #1471:
URL: https://github.com/apache/iceberg/pull/1471#discussion_r512867854



##########
File path: spark3/src/test/java/org/apache/iceberg/spark/actions/TestRemoveOrphanFilesAction3.java
##########
@@ -20,6 +20,16 @@
 package org.apache.iceberg.spark.actions;
 
 import org.apache.iceberg.actions.TestRemoveOrphanFilesAction;
+import org.junit.Ignore;
 
 public class TestRemoveOrphanFilesAction3 extends TestRemoveOrphanFilesAction {
+
+  /**
+  * Todo: Its failing for Spark3, so has to fix it in the parent class.
+  * Ignoring for now, as still Spark3 is not supported.

Review comment:
       Spark3 Is supported in OSS Iceberg, Do you mean this particular function is not supported in Spark3? If so I think we should have a more clear reason why




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] manishmalhotrawork commented on a change in pull request #1471: RemoveOrphanFiles: consider only path to compare and delete, avoid authority and scheme

Posted by GitBox <gi...@apache.org>.
manishmalhotrawork commented on a change in pull request #1471:
URL: https://github.com/apache/iceberg/pull/1471#discussion_r491597828



##########
File path: spark/src/main/java/org/apache/iceberg/actions/RemoveOrphanFilesAction.java
##########
@@ -70,15 +73,34 @@
 public class RemoveOrphanFilesAction extends BaseAction<List<String>> {
 
   private static final Logger LOG = LoggerFactory.getLogger(RemoveOrphanFilesAction.class);
-  private static final UserDefinedFunction filename = functions.udf((String path) -> {
-    int lastIndex = path.lastIndexOf(File.separator);
-    if (lastIndex == -1) {
-      return path;
-    } else {
-      return path.substring(lastIndex + 1);
-    }
+
+  private static final String URI_DETAIL = "URI_DETAIL";
+  private static final String FILE_NAME = "file_name";
+  private static final String FILE_PATH = "file_path";
+  private static final String FILE_PATH_ONLY = "file_path_only";
+  private static final StructType FILE_DETAIL_STRUCT =  new StructType(new StructField[] {
+      DataTypes.createStructField(FILE_NAME, DataTypes.StringType, false),
+      DataTypes.createStructField(FILE_PATH_ONLY, DataTypes.StringType, false),
+      DataTypes.createStructField(FILE_PATH, DataTypes.StringType, false)
+  });
+
+  private static final UserDefinedFunction decodeUDF = functions.udf((String fullyQualifiedPath) -> {
+    return URLDecoder.decode(fullyQualifiedPath, "UTF-8");
   }, DataTypes.StringType);
 
+  /**
+   * Transform a file path to
+   * {@code Dataset<Row<file_name, file_path_no_scheme_authority, file_path_with_scheme_authority>>}
+   */
+  private static final UserDefinedFunction addFileDetailsUDF = functions.udf((String fileLocation) -> {
+    Path fullyQualifiedPath = new Path(fileLocation);
+    String fileName = fullyQualifiedPath.getName();
+    String filePathOnly = fullyQualifiedPath.toUri().getPath();
+    String filePath = fullyQualifiedPath.toUri().toString();

Review comment:
       @aokolnychyi updated with the change.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi edited a comment on pull request #1471: RemoveOrphanFiles: consider only path to compare and delete, avoid authority and scheme

Posted by GitBox <gi...@apache.org>.
aokolnychyi edited a comment on pull request #1471:
URL: https://github.com/apache/iceberg/pull/1471#issuecomment-694612728


   I think this logic can be simplified a bit. My first thought was to leverage the following code:
   
   ```
     private static final UserDefinedFunction relativePathUDF = functions.udf((String location) -> {
       Path fullyQualifiedPath = new Path(location);
       return fullyQualifiedPath.toUri().getPath();
     }, DataTypes.StringType);
   
     ...
   
     Dataset<Row> validFileDF = withRelativePathColumn(validDataFileDF.union(validMetadataFileDF));
     Dataset<Row> actualFileDF = withRelativePathColumn(buildActualFileDF());
   
     ...
   
     Column joinCond = actualFileDF.col("relative_path").equalTo(validFileDF.col("relative_path"));
     return actualFileDF.join(validFileDF, joinCond, "leftanti").select("file_path")
         .as(Encoders.STRING())
         .collectAsList();
   ```
   
   We wouldn't need the filename UDF and it would be very straightforward. Unfortunately, that does not seem to work for certain locations. For example, `path.toUri().getPath()` for `hdfs://user/location/sublocation/filename.parquet` will return `location/sublocation/filename.parquet` and `user` is considered as authority.
   
   Even if we have to keep `contains` and equality of file names, I think we can still leverage a single UDF:
   
   ```
     private static final UserDefinedFunction fileDetailUDF = functions.udf((String location) -> {
       Path fullyQualifiedPath = new Path(location);
       String fileName = fullyQualifiedPath.getName();
       String relativePath = fullyQualifiedPath.toUri().getPath();
       return RowFactory.create(fileName, relativePath);
     }, FILE_DETAIL_STRUCT);
   ```
   
   Then our join condition will be == on file names and contains on relative locations.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] manishmalhotrawork commented on a change in pull request #1471: RemoveOrphanFiles: consider only path to compare and delete, avoid authority and scheme

Posted by GitBox <gi...@apache.org>.
manishmalhotrawork commented on a change in pull request #1471:
URL: https://github.com/apache/iceberg/pull/1471#discussion_r490542593



##########
File path: spark/src/test/java/org/apache/iceberg/actions/TestRemoveOrphanFilesAction.java
##########
@@ -569,4 +672,349 @@ public void testRemoveOrphanFilesWithHadoopCatalog() throws InterruptedException
         .collectAsList();
     Assert.assertEquals("Rows must match", records, actualRecords);
   }
+
+  @Test
+  public void testFindOrphanFilesWithSameAuthority() throws Exception {
+    List<Row> validFilesData = getRowsWithFilePath(HDFS_SERVICENAME_USER_LOG_DATA_DUMMY_FILE, 3);
+
+    Dataset<Row> validFileDS = RemoveOrphanFilesAction
+        .addRelativePathColumn(
+            spark.createDataset(validFilesData, RowEncoder.apply(constructStructureWithString())));
+
+    List<Row> actualFilesData = new ArrayList<>();
+
+    actualFilesData.addAll(validFilesData);
+    List<Row> expectedOrphanFiles = getRowsWithFilePath(HDFS_SERVICENAME_USER_LOG_DATA_ORPHAN_FILE, 4);
+    actualFilesData.addAll(expectedOrphanFiles);
+
+    Dataset<Row> actualFileDS = RemoveOrphanFilesAction
+        .addRelativePathColumn(
+            spark.createDataset(actualFilesData, RowEncoder.apply(constructStructureWithString())));
+
+    List<String> orphanFiles = RemoveOrphanFilesAction.findOrphanFiles(validFileDS, actualFileDS);
+
+    Assert.assertNotNull(orphanFiles);
+
+    Assert.assertEquals(expectedOrphanFiles.stream().map(row -> row.get(0).toString())
+        .collect(Collectors.toList()), orphanFiles);
+  }
+
+  @Test
+  public void testFindOrphanFilesWithValidFileHasNoAuthority() throws Exception {
+    List<Row> validFilesData = getRowsWithFilePath(HDFS_USER_LOG_DATA_DUMMY_FILE, 4);
+    Dataset<Row> validFileDS = RemoveOrphanFilesAction
+        .addRelativePathColumn(
+            spark.createDataset(validFilesData, RowEncoder.apply(constructStructureWithString())));
+
+    List<Row> actualFilesData = new ArrayList<>();
+
+    actualFilesData.addAll(getRowsWithFilePath(HDFS_SERVICENAME_USER_LOG_DATA_DUMMY_FILE, 4));
+    List<Row> expectedOrphanFiles = getRowsWithFilePath(HDFS_SERVICENAME_USER_LOG_DATA_ORPHAN_FILE, 4);
+    actualFilesData.addAll(expectedOrphanFiles);
+
+    Dataset<Row> actualFileDS = RemoveOrphanFilesAction
+        .addRelativePathColumn(
+            spark.createDataset(actualFilesData, RowEncoder.apply(constructStructureWithString())));
+
+    List<String> orphanFiles = RemoveOrphanFilesAction.findOrphanFiles(validFileDS, actualFileDS);
+
+    Assert.assertEquals(expectedOrphanFiles.stream().map(row -> row.get(0).toString())
+        .collect(Collectors.toList()), orphanFiles);
+  }
+
+  @Test

Review comment:
       done




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] RussellSpitzer commented on a change in pull request #1471: RemoveOrphanFiles: consider only path to compare and delete, avoid authority and scheme

Posted by GitBox <gi...@apache.org>.
RussellSpitzer commented on a change in pull request #1471:
URL: https://github.com/apache/iceberg/pull/1471#discussion_r490575150



##########
File path: spark/src/main/java/org/apache/iceberg/actions/RemoveOrphanFilesAction.java
##########
@@ -254,4 +270,54 @@ private static void listDirRecursively(
       return files.iterator();
     };
   }
+
+  protected static List<String> findOrphanFiles(
+      Dataset<Row> validFileDF,
+      Dataset<Row> actualFileDF) {
+    Column nameEqual = filenameUDF.apply(actualFileDF.col(FILE_PATH_ONLY))
+        .equalTo(filenameUDF.apply(validFileDF.col(FILE_PATH_ONLY)));
+
+    Column pathContains = actualFileDF.col(FILE_PATH_ONLY)
+        .contains(validFileDF.col(FILE_PATH_ONLY));
+
+    Column joinCond = nameEqual.and(pathContains);
+    Column decodeFilepath = decodeUDF.apply(actualFileDF.col(FILE_PATH));
+    return actualFileDF.join(validFileDF, joinCond, "leftanti").select(decodeFilepath)
+        .as(Encoders.STRING())
+        .collectAsList();
+  }
+
+  /**
+   * From
+   * <pre>{@code
+   * Dataset<Row<file_path_with_scheme_authority>>
+   *    will be transformed to
+   *    Dataset<Row<file_path_no_scheme_authority, file_path_with_scheme_authority>>
+   *  }</pre>
+   *
+   * This is required to compare the valid and all files to find the orphan files.
+   * Based on the result data set, only path will be compared while comparing valid and all files path.
+   * As in the case of hadoop, s3, there could be different authority names to access same path, which can give us files
+   * which are part of metadata and not orphan.
+   *
+   * @param filePathWithSchemeAndAuthority : complete file path, can include scheme, authority and path.
+   * @return : {@code file_path_no_scheme_authority, file_path}
+   */
+  protected static Dataset<Row> addFilePathOnlyColumn(Dataset<Row> filePathWithSchemeAndAuthority) {
+    String selectExprFormat = "%s.%s as %s";
+    return filePathWithSchemeAndAuthority.withColumn(URI_DETAIL,
+        addFilePathOnlyUDF.apply(
+            filePathWithSchemeAndAuthority.apply(FILE_PATH)
+        )).selectExpr(
+            String.format(selectExprFormat, URI_DETAIL, FILE_PATH_ONLY, FILE_PATH_ONLY), // file path only
+            String.format(selectExprFormat, URI_DETAIL, FILE_PATH, FILE_PATH)); // fully qualified path
+  }
+
+  static StructType fileDetailStructType() {

Review comment:
       ```java
     private static final StructType FILE_DETAIL_STRUCT =  new StructType(new StructField[]{
         DataTypes.createStructField(FILE_PATH_ONLY, DataTypes.StringType, false),
         DataTypes.createStructField(FILE_PATH, DataTypes.StringType, false)
         });
   
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] RussellSpitzer edited a comment on pull request #1471: RemoveOrphanFiles: consider only path to compare and delete, avoid authority and scheme

Posted by GitBox <gi...@apache.org>.
RussellSpitzer edited a comment on pull request #1471:
URL: https://github.com/apache/iceberg/pull/1471#issuecomment-949747196


   > In case of `S3FileIO`, default scheme is `s3://`. Writes happening from different clients, will have schemes based on `io-impl` property. The manifest might have mix of `s3://`, `s3a://` etc But the file listing(in DeleteOrphanFiles), will have only only a single prefix(which is determined by the Client Hadoop configuration). This will result in orphan files not being cleaned. When the user is aware that the scheme can be ignored, I think we should provide a configuration to do that.
   > 
   > I am not able to come up with a concrete case for the authority(may be HDFS with and with authority), but that could also be a configuration.
   > 
   > @RussellSpitzer @aokolnychyi @rdblue @flyrain @raptond Your thoughts on this?
   
   As we've talked about this before offline, I am a strong believer that we should not delete files that we aren't sure about. So if for example a user has a files in
   ```
   hdfs://earth/sgcommand 
   hdfs://alphasite/sgcommand
   ```
   
   And the metadata has any of the following
   ```
   a: hdfs:///sgcommand/
   or
   b: hdfs://alphasite/sgcommand/
   or
   c: hdfs://earth/sgcommand/
   ```
   
   Our default behavior should be to not delete both original files. I think if we want we can add a optional flag for "exact matches"
   which would 
   ```
   case a: delete both files
   case b: delete only earth
   case c: delete only alphasite
   ```
   
   So by default I believe we should only compare the `path` portion of any uri assuming that everything else can be strongly dependent on the FileSystem implementation actually doing the file listing.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] manishmalhotrawork commented on pull request #1471: RemoveOrphanFiles: consider only path to compare and delete, avoid authority and scheme

Posted by GitBox <gi...@apache.org>.
manishmalhotrawork commented on pull request #1471:
URL: https://github.com/apache/iceberg/pull/1471#issuecomment-738642342


   > @rdblue + @manishmalhotrawork I wasn't sure if the two of you had reached consensus yet on what you would like the behavior here to be, but since I think this would be a great fix for 0.10 maybe we should start discussion again and get this PR ready.
   
   @RussellSpitzer @rdblue  sorry was stuck with other work :) 
   yes lets discuss this, I think we just need a quick discussion and apply @rdblue suggestions. 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on pull request #1471: RemoveOrphanFiles: consider only path to compare and delete, avoid authority and scheme

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on pull request #1471:
URL: https://github.com/apache/iceberg/pull/1471#issuecomment-694613468


   @rdblue @RussellSpitzer @manishmalhotrawork what do you think?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] manishmalhotrawork commented on a change in pull request #1471: RemoveOrphanFiles: consider only path to compare and delete, avoid authority and scheme

Posted by GitBox <gi...@apache.org>.
manishmalhotrawork commented on a change in pull request #1471:
URL: https://github.com/apache/iceberg/pull/1471#discussion_r490545188



##########
File path: spark/src/test/java/org/apache/iceberg/actions/TestRemoveOrphanFilesAction.java
##########
@@ -528,6 +566,71 @@ public void testRemoveOrphanFilesWithRelativeFilePath() throws IOException, Inte
     Assert.assertTrue("Invalid file should be present", fs.exists(new Path(invalidFiles.get(0))));
   }
 
+  @Test
+  public void testRemoveOrphanFilesWithSpecialCharsFilePath() throws IOException, InterruptedException {
+    File whiteSpaceDir = null;
+    try {
+      whiteSpaceDir = new File(tableDir.getAbsolutePath() + "/white space");
+      whiteSpaceDir.mkdirs();
+
+      Table table = TABLES.create(
+          SCHEMA,
+          PartitionSpec.unpartitioned(),
+          Maps.newHashMap(),
+          tableDir.getAbsolutePath() + "/white space");
+
+      List<ThreeColumnRecord> records = Lists.newArrayList(
+          new ThreeColumnRecord(1, "AAAAAAAAAA", "AAAA")
+      );
+
+      Dataset<Row> df = spark.createDataFrame(records, ThreeColumnRecord.class).coalesce(1);
+
+      df.select("c1", "c2", "c3")
+          .write()
+          .format("iceberg")
+          .mode("append")
+          .save(whiteSpaceDir.getAbsolutePath());
+
+      List<String> validFiles = spark.read().format("iceberg")
+          .load(whiteSpaceDir + "#files")
+          .select("file_path")
+          .as(Encoders.STRING())
+          .collectAsList();
+      Assert.assertEquals("Should be 1 valid files", 1, validFiles.size());
+      String validFile = validFiles.get(0);
+
+      df.write().mode("append").parquet(whiteSpaceDir + "/data");
+
+      Path dataPath = new Path(whiteSpaceDir + "/data");
+      FileSystem fs = dataPath.getFileSystem(spark.sessionState().newHadoopConf());
+      List<String> allFiles = Arrays.stream(fs.listStatus(dataPath, HiddenPathFilter.get()))
+          .filter(FileStatus::isFile)
+          .map(file -> file.getPath().toString())
+          .collect(Collectors.toList());
+      Assert.assertEquals("Should be 2 files", 2, allFiles.size());
+
+      List<String> invalidFiles = Lists.newArrayList(allFiles);
+      invalidFiles.removeIf(file -> file.contains(validFile));
+      Assert.assertEquals("Should be 1 invalid file", 1, invalidFiles.size());
+
+      // sleep for 1 second to unsure files will be old enough
+      Thread.sleep(1000);
+
+      Actions actions = Actions.forTable(table);
+      List<String> result = actions.removeOrphanFiles()
+          .olderThan(System.currentTimeMillis())
+          .execute();
+      Assert.assertEquals("Action should find 1 file", invalidFiles, result);

Review comment:
       sure, checked file as well.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] RussellSpitzer commented on a change in pull request #1471: RemoveOrphanFiles: consider only path to compare and delete, avoid authority and scheme

Posted by GitBox <gi...@apache.org>.
RussellSpitzer commented on a change in pull request #1471:
URL: https://github.com/apache/iceberg/pull/1471#discussion_r490573422



##########
File path: spark/src/main/java/org/apache/iceberg/actions/RemoveOrphanFilesAction.java
##########
@@ -254,4 +270,54 @@ private static void listDirRecursively(
       return files.iterator();
     };
   }
+
+  protected static List<String> findOrphanFiles(
+      Dataset<Row> validFileDF,
+      Dataset<Row> actualFileDF) {
+    Column nameEqual = filenameUDF.apply(actualFileDF.col(FILE_PATH_ONLY))
+        .equalTo(filenameUDF.apply(validFileDF.col(FILE_PATH_ONLY)));
+
+    Column pathContains = actualFileDF.col(FILE_PATH_ONLY)
+        .contains(validFileDF.col(FILE_PATH_ONLY));
+
+    Column joinCond = nameEqual.and(pathContains);
+    Column decodeFilepath = decodeUDF.apply(actualFileDF.col(FILE_PATH));
+    return actualFileDF.join(validFileDF, joinCond, "leftanti").select(decodeFilepath)
+        .as(Encoders.STRING())
+        .collectAsList();
+  }
+
+  /**
+   * From
+   * <pre>{@code
+   * Dataset<Row<file_path_with_scheme_authority>>
+   *    will be transformed to
+   *    Dataset<Row<file_path_no_scheme_authority, file_path_with_scheme_authority>>
+   *  }</pre>
+   *
+   * This is required to compare the valid and all files to find the orphan files.
+   * Based on the result data set, only path will be compared while comparing valid and all files path.
+   * As in the case of hadoop, s3, there could be different authority names to access same path, which can give us files
+   * which are part of metadata and not orphan.
+   *
+   * @param filePathWithSchemeAndAuthority : complete file path, can include scheme, authority and path.
+   * @return : {@code file_path_no_scheme_authority, file_path}
+   */
+  protected static Dataset<Row> addFilePathOnlyColumn(Dataset<Row> filePathWithSchemeAndAuthority) {
+    String selectExprFormat = "%s.%s as %s";
+    return filePathWithSchemeAndAuthority.withColumn(URI_DETAIL,
+        addFilePathOnlyUDF.apply(
+            filePathWithSchemeAndAuthority.apply(FILE_PATH)
+        )).selectExpr(
+            String.format(selectExprFormat, URI_DETAIL, FILE_PATH_ONLY, FILE_PATH_ONLY), // file path only
+            String.format(selectExprFormat, URI_DETAIL, FILE_PATH, FILE_PATH)); // fully qualified path
+  }
+
+  static StructType fileDetailStructType() {

Review comment:
       This should probably be a final private static field




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] RussellSpitzer commented on a change in pull request #1471: RemoveOrphanFiles: consider only path to compare and delete, avoid authority and scheme

Posted by GitBox <gi...@apache.org>.
RussellSpitzer commented on a change in pull request #1471:
URL: https://github.com/apache/iceberg/pull/1471#discussion_r490573264



##########
File path: spark/src/main/java/org/apache/iceberg/actions/RemoveOrphanFilesAction.java
##########
@@ -79,6 +87,20 @@
     }
   }, DataTypes.StringType);
 
+  private static final UserDefinedFunction decodeUDF = functions.udf((String fullyQualifiedPath) -> {
+    return URLDecoder.decode(fullyQualifiedPath, "UTF-8");
+  }, DataTypes.StringType);
+
+  /**
+   * Transform a file path to
+   * {@code Dataset<Row<file_path_no_scheme_authority, file_path_with_scheme_authority>>}
+   */
+  private static final UserDefinedFunction addFilePathOnlyUDF = functions.udf((String fullyQualifiedPath) -> {
+    Path path = new Path(fullyQualifiedPath);
+    // only_path, fully qualified path
+    return RowFactory.create(path.toUri().getPath(), path.toUri().toString());
+  }, DataTypes.createStructType(fileDetailStructType().fields()));

Review comment:
       Isn't this already a struct type? why are we creating a structtype from it?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org