You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by "nsivabalan (via GitHub)" <gi...@apache.org> on 2023/01/29 18:43:20 UTC

[GitHub] [hudi] nsivabalan commented on a diff in pull request #7787: [HUDI-5646] Guard dropping columns by a config, do not allow by default

nsivabalan commented on code in PR #7787:
URL: https://github.com/apache/hudi/pull/7787#discussion_r1090028568


##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java:
##########
@@ -227,6 +227,14 @@ public class HoodieWriteConfig extends HoodieConfig {
       .defaultValue("true")
       .withDocumentation("Validate the schema used for the write against the latest schema, for backwards compatibility.");
 
+  public static final ConfigProperty<String> SCHEMA_ALLOW_DROP_COLUMNS = ConfigProperty

Review Comment:
   let's define it as a string and not as config property. we don't want to expose this in our configurations page. 
   Or we need to come up w/ a way to tag internal configs so that we can fix our config docs generation.
   
   



##########
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestTableSchemaEvolution.java:
##########
@@ -230,19 +240,27 @@ public void testMORTable() throws Exception {
 
     // Now try updating w/ the original schema (should succeed)
     client = getHoodieWriteClient(hoodieWriteConfig);
-    updateBatch(hoodieWriteConfig, client, "009", "008", Option.empty(),
-                initCommitTime, numUpdateRecords, SparkRDDWriteClient::upsert, false, false, numUpdateRecords, 4 * numRecords, 9);
+    try {
+      updateBatch(hoodieWriteConfig, client, "009", "008", Option.empty(),
+          initCommitTime, numUpdateRecords, SparkRDDWriteClient::upsert, false, false, numUpdateRecords, 4 * numRecords, 9);
+    } catch (HoodieUpsertException e) {

Review Comment:
   similar comment as above



##########
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestBasicSchemaEvolution.scala:
##########
@@ -270,7 +277,10 @@ class TestBasicSchemaEvolution extends HoodieClientTestBase with ScalaAssertionS
         appendData(fourthSchema, fourthBatch)
       }
     } else {
-      appendData(fourthSchema, fourthBatch)
+      assertThrows(classOf[SchemaCompatibilityException]) {
+        appendData(fourthSchema, fourthBatch)
+      }
+      appendData(fourthSchema, fourthBatch, shouldAllowDroppedColumns = true)

Review Comment:
   same here



##########
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestTableSchemaEvolution.java:
##########
@@ -266,19 +284,24 @@ public void testCopyOnWriteTable() throws Exception {
     checkReadRecords("000", numRecords);
 
     // Inserting records w/ new evolved schema (w/ tip column dropped)
-    HoodieWriteConfig hoodieDevolvedWriteConfig = getWriteConfig(TRIP_EXAMPLE_SCHEMA_EVOLVED_COL_DROPPED);
+    HoodieWriteConfig hoodieDevolvedWriteConfig = getWriteConfig(TRIP_EXAMPLE_SCHEMA_EVOLVED_COL_DROPPED, shouldAllowDroppedColumns);
     client = getHoodieWriteClient(hoodieDevolvedWriteConfig);
     final List<HoodieRecord> failedRecords = generateInsertsWithSchema("004", numRecords, TRIP_EXAMPLE_SCHEMA_EVOLVED_COL_DROPPED);
-    writeBatch(client, "004", "003", Option.empty(), "003", numRecords,
-        (String s, Integer a) -> failedRecords, SparkRDDWriteClient::insert, true, numRecords, numRecords * 2, 1, false);
+    try {
+      writeBatch(client, "004", "003", Option.empty(), "003", numRecords,
+          (String s, Integer a) -> failedRecords, SparkRDDWriteClient::insert, true, numRecords, numRecords * 2, 1, false);
+    } catch (HoodieInsertException e) {
+      assertFalse(shouldAllowDroppedColumns);

Review Comment:
   ditto



##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala:
##########
@@ -416,7 +419,8 @@ object HoodieSparkSqlWriter {
             case None =>
               // In case schema reconciliation is enabled we will employ (legacy) reconciliation
               // strategy to produce target writer's schema (see definition below)
-              val (reconciledSchema, isCompatible) = reconcileSchemasLegacy(latestTableSchema, canonicalizedSourceSchema)
+              // NOTE: if schema reconciliation is turned on, then we should allow columns to be dropped
+              val (reconciledSchema, isCompatible) = reconcileSchemasLegacy(latestTableSchema, canonicalizedSourceSchema, shouldAllowDroppedColumns = true)

Review Comment:
   why setting the value for shouldAllowDroppedColumns to true ? we do derive the value at L408 right ?



##########
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestTableSchemaEvolution.java:
##########
@@ -194,20 +199,25 @@ public void testMORTable() throws Exception {
     checkReadRecords("000", numRecords);
 
     // Insert with evolved schema (column dropped) is allowed
-    HoodieWriteConfig hoodieDevolvedWriteConfig = getWriteConfig(TRIP_EXAMPLE_SCHEMA_EVOLVED_COL_DROPPED);
+    HoodieWriteConfig hoodieDevolvedWriteConfig = getWriteConfig(TRIP_EXAMPLE_SCHEMA_EVOLVED_COL_DROPPED, shouldAllowDroppedColumns);
     client = getHoodieWriteClient(hoodieDevolvedWriteConfig);
     final List<HoodieRecord> failedRecords = generateInsertsWithSchema("005", numRecords, TRIP_EXAMPLE_SCHEMA_EVOLVED_COL_DROPPED);
     // We cannot use insertBatch directly here because we want to insert records
     // with a evolved schema and insertBatch inserts records using the TRIP_EXAMPLE_SCHEMA.
-    writeBatch(client, "005", "004", Option.empty(), "003", numRecords,
-        (String s, Integer a) -> failedRecords, SparkRDDWriteClient::insert, false, numRecords, 2 * numRecords, 5, false);
+    try {
+      writeBatch(client, "005", "004", Option.empty(), "003", numRecords,
+          (String s, Integer a) -> failedRecords, SparkRDDWriteClient::insert, false, numRecords, 2 * numRecords, 5, false);
+    } catch (HoodieInsertException e) {

Review Comment:
   after L209, we should also do, before catch block
   ```
   assertTrue(shouldAllowDroppedColumns); 
   ```



##########
hudi-common/src/main/java/org/apache/hudi/avro/AvroSchemaUtils.java:
##########
@@ -36,10 +37,17 @@ public class AvroSchemaUtils {
   private AvroSchemaUtils() {}
 
   /**
-   * See {@link #isSchemaCompatible(Schema, Schema, boolean)} doc for more details
+   * See {@link #isSchemaCompatible(Schema, Schema, boolean, boolean)} doc for more details
    */
   public static boolean isSchemaCompatible(Schema prevSchema, Schema newSchema) {
-    return isSchemaCompatible(prevSchema, newSchema, true);
+    return isSchemaCompatible(prevSchema, newSchema, true, false);

Review Comment:
   instead of false, can we do SCHEMA_ALLOW_DROP_COLUMNS.defaultValue()



##########
hudi-common/src/main/java/org/apache/hudi/avro/AvroSchemaUtils.java:
##########
@@ -50,10 +58,20 @@ public static boolean isSchemaCompatible(Schema prevSchema, Schema newSchema) {
    * @param newSchema new instance of the schema
    * @param checkNaming controls whether schemas fully-qualified names should be checked
    */
-  public static boolean isSchemaCompatible(Schema prevSchema, Schema newSchema, boolean checkNaming) {
+  public static boolean isSchemaCompatible(Schema prevSchema, Schema newSchema, boolean checkNaming, boolean shouldAllowDroppedColumns) {
     // NOTE: We're establishing compatibility of the {@code prevSchema} and {@code newSchema}
     //       as following: {@code newSchema} is considered compatible to {@code prevSchema},
     //       iff data written using {@code prevSchema} could be read by {@code newSchema}
+
+    if (!shouldAllowDroppedColumns) {
+      // Check that each field in the oldSchema can be populated in the newSchema
+      for (final Schema.Field oldSchemaField : prevSchema.getFields()) {

Review Comment:
   we can simplify as 
   ```
   boolean isAnyColDropped = prevSchema.getFields().stream()
             .map(oldSchemaField -> SchemaCompatibility.lookupWriterField(newSchema, oldSchemaField))
             .filter(Objects::isNull).findAny().isPresent();
   ```



##########
hudi-common/src/main/java/org/apache/hudi/avro/AvroSchemaUtils.java:
##########
@@ -88,7 +106,7 @@ public static boolean isCompatibleProjectionOf(Schema sourceSchema, Schema targe
   private static boolean isAtomicSchemasCompatible(Schema oneAtomicType, Schema anotherAtomicType) {
     // NOTE: Checking for compatibility of atomic types, we should ignore their
     //       corresponding fully-qualified names (as irrelevant)
-    return isSchemaCompatible(oneAtomicType, anotherAtomicType, false);
+    return isSchemaCompatible(oneAtomicType, anotherAtomicType, false, true);

Review Comment:
   why we are setting true here as default for shouldAllowDroppedColumns? 



##########
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestBasicSchemaEvolution.scala:
##########
@@ -217,7 +217,14 @@ class TestBasicSchemaEvolution extends HoodieClientTestBase with ScalaAssertionS
       Row("8", "Ron", "14", 1, 1),
       Row("9", "Germiona", "16", 1, 1))
 
-    appendData(thirdSchema, thirdBatch)
+    if (shouldReconcileSchema) {

Review Comment:
   lets fix comments in L204. 



##########
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestTableSchemaEvolution.java:
##########
@@ -299,9 +322,13 @@ public void testCopyOnWriteTable() throws Exception {
 
     // Now try updating w/ the original schema (should succeed)
     client = getHoodieWriteClient(hoodieWriteConfig);
-    updateBatch(hoodieWriteConfig, client, "008", "007", Option.empty(),
-                initCommitTime, numUpdateRecords, SparkRDDWriteClient::upsert, false, true,
-                numUpdateRecords, 3 * numRecords, 8);
+    try {
+      updateBatch(hoodieWriteConfig, client, "008", "007", Option.empty(),
+          initCommitTime, numUpdateRecords, SparkRDDWriteClient::upsert, false, true,
+          numUpdateRecords, 3 * numRecords, 8);
+    } catch (HoodieUpsertException e) {
+      assertFalse(shouldAllowDroppedColumns);

Review Comment:
   ditto



##########
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestBasicSchemaEvolution.scala:
##########
@@ -217,7 +217,14 @@ class TestBasicSchemaEvolution extends HoodieClientTestBase with ScalaAssertionS
       Row("8", "Ron", "14", 1, 1),
       Row("9", "Germiona", "16", 1, 1))
 
-    appendData(thirdSchema, thirdBatch)
+    if (shouldReconcileSchema) {
+      appendData(thirdSchema, thirdBatch)
+    } else {
+      assertThrows(classOf[SchemaCompatibilityException]) {
+        appendData(thirdSchema, thirdBatch)
+      }
+      appendData(thirdSchema, thirdBatch, shouldAllowDroppedColumns = true)

Review Comment:
   again, last arg should just be `shouldAllowDroppedColumns` and not `shouldAllowDroppedColumns = true`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org