You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@iceberg.apache.org by "nastra (via GitHub)" <gi...@apache.org> on 2023/02/28 13:18:34 UTC

[GitHub] [iceberg] nastra commented on a diff in pull request #4325: Spark:Skip corrupt files in Spark Procedures and Actions

nastra commented on code in PR #4325:
URL: https://github.com/apache/iceberg/pull/4325#discussion_r1119993510


##########
docs/spark-procedures.md:
##########
@@ -425,11 +425,12 @@ By default, the original table is retained with the name `table_BACKUP_`.
 
 #### Usage
 
-| Argument Name | Required? | Type | Description |
-|---------------|-----------|------|-------------|
-| `table`       | ✔️  | string | Name of the table to migrate |
-| `properties`  | ️   | map<string, string> | Properties for the new Iceberg table |
-| `drop_backup` |   | boolean | When true, the original table will not be retained as backup (defaults to false) |
+| Argument Name   | Required? | Type | Description |
+|-----------------|-----------|------|-------------|
+| `table`         | ✔️        | string | Name of the table to migrate |
+| `properties`    | ️         | map<string, string> | Properties for the new Iceberg table |
+| `drop_backup`   |           | boolean | When true, the original table will not be retained as backup (defaults to false) |

Review Comment:
   nit: seems the `|` aren't aligned in the file itself (probably that doesn't make any difference to the final document when you look at it in the browser)



##########
spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/actions/TestCreateActions.java:
##########
@@ -406,6 +408,76 @@ public void testMigrateUnpartitioned() throws Exception {
     assertMigratedFileCount(SparkActions.get().migrateTable(source), source, dest);
   }
 
+  @Test
+  public void testMigrateSkipOnError() throws Exception {
+    Assume.assumeTrue("Cannot migrate to a hadoop based catalog", !type.equals("hadoop"));
+    Assume.assumeTrue(
+        "Can only migrate from Spark Session Catalog", catalog.name().equals("spark_catalog"));
+    String source = sourceName("test_migrate_skip_on_error_table");
+    String dest = source;
+
+    File location = temp.newFolder();
+    spark.sql(String.format(CREATE_PARQUET, source, location));
+    CatalogTable table = loadSessionTable(source);
+    Seq<String> partitionColumns = table.partitionColumnNames();
+    String format = table.provider().get();
+
+    spark
+        .table(baseTableName)
+        .write()
+        .mode(SaveMode.Append)
+        .format(format)
+        .partitionBy(partitionColumns.toSeq())
+        .saveAsTable(source);
+
+    spark
+        .table(baseTableName)
+        .write()
+        .mode(SaveMode.Append)
+        .format(format)
+        .partitionBy(partitionColumns.toSeq())
+        .saveAsTable(source);
+
+    List<File> expectedFiles = expectedFiles(source).collect(Collectors.toList());
+
+    Assert.assertEquals("Expected number of source files", 2, expectedFiles.size());
+
+    // Corrupt the second file
+    File file = expectedFiles.get(1);
+    Assume.assumeTrue("Delete source file!", file.delete());
+    Assume.assumeTrue("Create a empty source file!", file.createNewFile());

Review Comment:
   I don't think we should be using `assumeTrue` here as that would silently skip the test. I'd suggest to rather add an assertion here



##########
spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/actions/TestCreateActions.java:
##########
@@ -406,6 +408,76 @@ public void testMigrateUnpartitioned() throws Exception {
     assertMigratedFileCount(SparkActions.get().migrateTable(source), source, dest);
   }
 
+  @Test
+  public void testMigrateSkipOnError() throws Exception {
+    Assume.assumeTrue("Cannot migrate to a hadoop based catalog", !type.equals("hadoop"));
+    Assume.assumeTrue(
+        "Can only migrate from Spark Session Catalog", catalog.name().equals("spark_catalog"));
+    String source = sourceName("test_migrate_skip_on_error_table");
+    String dest = source;
+
+    File location = temp.newFolder();
+    spark.sql(String.format(CREATE_PARQUET, source, location));
+    CatalogTable table = loadSessionTable(source);
+    Seq<String> partitionColumns = table.partitionColumnNames();
+    String format = table.provider().get();
+
+    spark
+        .table(baseTableName)
+        .write()
+        .mode(SaveMode.Append)
+        .format(format)
+        .partitionBy(partitionColumns.toSeq())
+        .saveAsTable(source);
+
+    spark
+        .table(baseTableName)
+        .write()
+        .mode(SaveMode.Append)
+        .format(format)
+        .partitionBy(partitionColumns.toSeq())
+        .saveAsTable(source);
+
+    List<File> expectedFiles = expectedFiles(source).collect(Collectors.toList());
+
+    Assert.assertEquals("Expected number of source files", 2, expectedFiles.size());

Review Comment:
   ```suggestion
       Assertions.assertThat(expectedFiles).hasSize(2);
   ```



##########
data/src/main/java/org/apache/iceberg/data/TableMigrationUtil.java:
##########
@@ -139,9 +213,18 @@ public static List<DataFile> listPartition(
                   buildDataFile(fileStatus.get(index), partitionKey, spec, metrics, "orc");
             });
       } else {
-        throw new UnsupportedOperationException("Unknown partition format: " + format);
+        if (skipOnError) {
+          LOG.warn("Skipping unknown partition format: {} - {}", format, partitionUri);
+        } else {
+          throw new UnsupportedOperationException(
+              "Unknown partition format: "

Review Comment:
   it would probably be better to use String.format here rather than string concatenation



##########
spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/actions/TestCreateActions.java:
##########
@@ -406,6 +408,76 @@ public void testMigrateUnpartitioned() throws Exception {
     assertMigratedFileCount(SparkActions.get().migrateTable(source), source, dest);
   }
 
+  @Test
+  public void testMigrateSkipOnError() throws Exception {
+    Assume.assumeTrue("Cannot migrate to a hadoop based catalog", !type.equals("hadoop"));
+    Assume.assumeTrue(
+        "Can only migrate from Spark Session Catalog", catalog.name().equals("spark_catalog"));
+    String source = sourceName("test_migrate_skip_on_error_table");
+    String dest = source;
+
+    File location = temp.newFolder();
+    spark.sql(String.format(CREATE_PARQUET, source, location));
+    CatalogTable table = loadSessionTable(source);
+    Seq<String> partitionColumns = table.partitionColumnNames();
+    String format = table.provider().get();
+
+    spark
+        .table(baseTableName)
+        .write()
+        .mode(SaveMode.Append)
+        .format(format)
+        .partitionBy(partitionColumns.toSeq())
+        .saveAsTable(source);
+
+    spark
+        .table(baseTableName)
+        .write()
+        .mode(SaveMode.Append)
+        .format(format)
+        .partitionBy(partitionColumns.toSeq())
+        .saveAsTable(source);
+
+    List<File> expectedFiles = expectedFiles(source).collect(Collectors.toList());
+
+    Assert.assertEquals("Expected number of source files", 2, expectedFiles.size());
+
+    // Corrupt the second file
+    File file = expectedFiles.get(1);
+    Assume.assumeTrue("Delete source file!", file.delete());
+    Assume.assumeTrue("Create a empty source file!", file.createNewFile());
+
+    MigrateTable migrateAction = SparkActions.get().migrateTable(source);
+
+    AssertHelpers.assertThrows(
+        "Expected an exception",
+        RuntimeException.class,
+        "not a Parquet file (length is too low: 0)",
+        migrateAction::execute);
+
+    // skip files which cannot be imported into Iceberg
+    migrateAction = SparkActions.get().migrateTable(source).skipOnError();
+
+    MigrateTable.Result migratedFiles = migrateAction.execute();
+    validateTables(source, dest);
+
+    SparkTable destTable = loadTable(dest);
+    Assert.assertEquals(
+        "Provider should be iceberg",
+        "iceberg",
+        destTable.properties().get(TableCatalog.PROP_PROVIDER));
+    List<Row> actual = spark.table(dest).collectAsList();
+
+    Assert.assertEquals(

Review Comment:
   I'd rather suggest to use `Assertions.assertThat(actual).hasSize(3);` as that is easier to debug when the assertion ever fails. 
   For example, I've modified it locally to `Assertions.assertThat(actual).hasSize(2);` and the output shows what the actual content was, which makes debugging (especially on CI failures) much easier.
   
   ```
   java.lang.AssertionError: 
   Expected size: 2 but was: 3 in:
   [[1,a], [2,b], [3,c]]
   ```



##########
data/src/main/java/org/apache/iceberg/data/TableMigrationUtil.java:
##########
@@ -139,9 +213,18 @@ public static List<DataFile> listPartition(
                   buildDataFile(fileStatus.get(index), partitionKey, spec, metrics, "orc");
             });
       } else {
-        throw new UnsupportedOperationException("Unknown partition format: " + format);
+        if (skipOnError) {
+          LOG.warn("Skipping unknown partition format: {} - {}", format, partitionUri);
+        } else {
+          throw new UnsupportedOperationException(

Review Comment:
   it looks like this part is actually never hit in the tests and I wonder how difficult it would be to have a test that would hit this exception



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org