You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by wa...@apache.org on 2022/09/23 11:53:27 UTC

[hudi] branch master updated: [HUDI-3523] Introduce AddColumnSchemaPostProcessor to support add columns to the end of a schema (#5031)

This is an automated email from the ASF dual-hosted git repository.

wangxianghu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 092375fc1f [HUDI-3523] Introduce AddColumnSchemaPostProcessor to support add columns to the end of a schema (#5031)
092375fc1f is described below

commit 092375fc1f058c7841d9d63cd04e842c062fae74
Author: wangxianghu <wa...@apache.org>
AuthorDate: Fri Sep 23 19:53:18 2022 +0800

    [HUDI-3523] Introduce AddColumnSchemaPostProcessor to support add columns to the end of a schema (#5031)
---
 .../schema/AddColumnSchemaPostProcessor.java       | 172 +++++++++++++++++++++
 .../hudi/utilities/TestSchemaPostProcessor.java    |  62 +++++++-
 2 files changed, 229 insertions(+), 5 deletions(-)

diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/AddColumnSchemaPostProcessor.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/AddColumnSchemaPostProcessor.java
new file mode 100644
index 0000000000..ed5d8a2355
--- /dev/null
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/AddColumnSchemaPostProcessor.java
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities.schema;
+
+import org.apache.hudi.common.config.ConfigProperty;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.utilities.exception.HoodieSchemaPostProcessException;
+
+import org.apache.avro.LogicalTypes;
+import org.apache.avro.Schema;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+import org.apache.spark.api.java.JavaSparkContext;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Locale;
+
+/**
+ * A {@link SchemaPostProcessor} use to add column to given schema. Currently. only supports adding one column at a time.
+ * Users can specify the position of new column by config {@link Config#SCHEMA_POST_PROCESSOR_ADD_COLUMN_NEXT_PROP},
+ * the new column will be added before this column.
+ * <p>
+ * Currently supported types : bytes, string, int, long, float, double, boolean, decimal
+ */
+public class AddColumnSchemaPostProcessor extends SchemaPostProcessor {
+
+  private static final Logger LOG = LogManager.getLogger(AddColumnSchemaPostProcessor.class);
+
+  public AddColumnSchemaPostProcessor(TypedProperties props, JavaSparkContext jssc) {
+    super(props, jssc);
+  }
+
+  /**
+   * Configs supported.
+   */
+  public static class Config {
+    public static final ConfigProperty<String> SCHEMA_POST_PROCESSOR_ADD_COLUMN_NAME_PROP = ConfigProperty
+        .key("hoodie.deltastreamer.schemaprovider.schema_post_processor.add.column.name")
+        .noDefaultValue()
+        .withDocumentation("New column's name");
+
+    public static final ConfigProperty<String> SCHEMA_POST_PROCESSOR_ADD_COLUMN_TYPE_PROP = ConfigProperty
+        .key("hoodie.deltastreamer.schemaprovider.schema_post_processor.add.column.type")
+        .noDefaultValue()
+        .withDocumentation("New column's type");
+
+    public static final ConfigProperty<String> SCHEMA_POST_PROCESSOR_ADD_COLUMN_DOC_PROP = ConfigProperty
+        .key("hoodie.deltastreamer.schemaprovider.schema_post_processor.add.column.doc")
+        .noDefaultValue()
+        .withDocumentation("New column's doc");
+
+    public static final ConfigProperty<String> SCHEMA_POST_PROCESSOR_ADD_COLUMN_DEFAULT_PROP = ConfigProperty
+        .key("hoodie.deltastreamer.schemaprovider.schema_post_processor.add.column.default")
+        .noDefaultValue()
+        .withDocumentation("New column's default value");
+
+    public static final ConfigProperty<String> SCHEMA_POST_PROCESSOR_ADD_COLUMN_SIZE_PROP = ConfigProperty
+        .key("hoodie.deltastreamer.schemaprovider.schema_post_processor.add.column.size")
+        .noDefaultValue()
+        .withDocumentation("New column's size, used in decimal type");
+
+    public static final ConfigProperty<String> SCHEMA_POST_PROCESSOR_ADD_COLUMN_PRECISION_PROP = ConfigProperty
+        .key("hoodie.deltastreamer.schemaprovider.schema_post_processor.add.column.precision")
+        .noDefaultValue()
+        .withDocumentation("New column's precision, used in decimal type");
+
+    public static final ConfigProperty<String> SCHEMA_POST_PROCESSOR_ADD_COLUMN_SCALE_PROP = ConfigProperty
+        .key("hoodie.deltastreamer.schemaprovider.schema_post_processor.add.column.scale")
+        .noDefaultValue()
+        .withDocumentation("New column's precision, used in decimal type");
+
+    public static final ConfigProperty<String> SCHEMA_POST_PROCESSOR_ADD_COLUMN_NEXT_PROP = ConfigProperty
+        .key("hoodie.deltastreamer.schemaprovider.schema_post_processor.add.column.next")
+        .defaultValue(HoodieRecord.HOODIE_IS_DELETED)
+        .withDocumentation("Column name which locate next to new column, `_hoodie_is_deleted` by default.");
+  }
+
+  public static final String BYTES = "BYTES";
+  public static final String STRING = "STRING";
+  public static final String INT = "INT";
+  public static final String LONG = "LONG";
+  public static final String FLOAT = "FLOAT";
+  public static final String DOUBLE = "DOUBLE";
+  public static final String BOOLEAN = "BOOLEAN";
+  public static final String DECIMAL = "DECIMAL";
+
+  @Override
+  public Schema processSchema(Schema schema) {
+    String newColumnName = this.config.getString(Config.SCHEMA_POST_PROCESSOR_ADD_COLUMN_NAME_PROP.key());
+
+    if (schema.getField(newColumnName) != null) {
+      LOG.warn(String.format("Column %s already exist!", newColumnName));
+      return schema;
+    }
+
+    List<Schema.Field> sourceFields = schema.getFields();
+    List<Schema.Field> targetFields = new ArrayList<>(sourceFields.size() + 1);
+
+    String nextColumnName = this.config.getString(Config.SCHEMA_POST_PROCESSOR_ADD_COLUMN_NEXT_PROP.key(),
+        Config.SCHEMA_POST_PROCESSOR_ADD_COLUMN_NEXT_PROP.defaultValue());
+
+    // mark whether the new column is added
+    boolean isAdded = false;
+    for (Schema.Field sourceField : sourceFields) {
+      if (sourceField.name().equals(nextColumnName)) {
+        targetFields.add(buildNewColumn());
+        isAdded = true;
+      }
+      targetFields.add(new Schema.Field(sourceField.name(), sourceField.schema(), sourceField.doc(), sourceField.defaultVal()));
+    }
+
+    // this would happen when `nextColumn` does not exist. just append the new column to the end
+    if (!isAdded) {
+      targetFields.add(buildNewColumn());
+    }
+
+    return Schema.createRecord(schema.getName(), schema.getDoc(), schema.getNamespace(), false, targetFields);
+  }
+
+  private Schema.Field buildNewColumn() {
+    Schema.Field result;
+
+    String columnName = this.config.getString(Config.SCHEMA_POST_PROCESSOR_ADD_COLUMN_NAME_PROP.key());
+    String type = this.config.getString(Config.SCHEMA_POST_PROCESSOR_ADD_COLUMN_TYPE_PROP.key()).toUpperCase(Locale.ROOT);
+    String doc = this.config.getString(Config.SCHEMA_POST_PROCESSOR_ADD_COLUMN_DOC_PROP.key(), null);
+    Object defaultValue = this.config.getOrDefault(Config.SCHEMA_POST_PROCESSOR_ADD_COLUMN_DEFAULT_PROP.key(),
+        null);
+
+    switch (type) {
+      case STRING:
+      case BYTES:
+      case INT:
+      case LONG:
+      case FLOAT:
+      case DOUBLE:
+      case BOOLEAN:
+        result = new Schema.Field(columnName, Schema.create(Schema.Type.valueOf(type)), doc, defaultValue);
+        break;
+      case DECIMAL:
+        int size = this.config.getInteger(Config.SCHEMA_POST_PROCESSOR_ADD_COLUMN_SIZE_PROP.key(), 10);
+        int precision = this.config.getInteger(Config.SCHEMA_POST_PROCESSOR_ADD_COLUMN_PRECISION_PROP.key());
+        int scale = this.config.getInteger(Config.SCHEMA_POST_PROCESSOR_ADD_COLUMN_SCALE_PROP.key());
+
+        Schema decimalSchema = Schema.createFixed(null, null, null, size);
+        LogicalTypes.decimal(precision, scale).addToSchema(decimalSchema);
+
+        result = new Schema.Field(columnName, decimalSchema, doc, defaultValue);
+        break;
+      default:
+        throw new HoodieSchemaPostProcessException(String.format("Type %s is not supported", type));
+    }
+    return result;
+  }
+}
diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestSchemaPostProcessor.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestSchemaPostProcessor.java
index d228d87446..20dcc2262c 100644
--- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestSchemaPostProcessor.java
+++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestSchemaPostProcessor.java
@@ -20,6 +20,7 @@ package org.apache.hudi.utilities;
 
 import org.apache.hudi.common.config.TypedProperties;
 import org.apache.hudi.utilities.exception.HoodieSchemaPostProcessException;
+import org.apache.hudi.utilities.schema.AddColumnSchemaPostProcessor;
 import org.apache.hudi.utilities.schema.DeleteSupportSchemaPostProcessor;
 import org.apache.hudi.utilities.schema.DropColumnSchemaPostProcessor;
 import org.apache.hudi.utilities.schema.SchemaPostProcessor;
@@ -33,10 +34,14 @@ import org.apache.avro.Schema;
 import org.apache.avro.Schema.Type;
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
 
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.stream.Stream;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
@@ -55,13 +60,18 @@ public class TestSchemaPostProcessor extends UtilitiesTestBase {
       + "{\"name\":\"_row_key\",\"type\":\"string\"},{\"name\":\"rider\",\"type\":\"string\"},{\"name\":\"driver\","
       + "\"type\":\"string\"},{\"name\":\"fare\",\"type\":\"double\"}]}";
 
+  private static Stream<Arguments> configParams() {
+    String[] types = {"bytes", "string", "int", "long", "float", "double", "boolean"};
+    return Stream.of(types).map(Arguments::of);
+  }
+
   @Test
   public void testPostProcessor() throws IOException {
     properties.put(Config.SCHEMA_POST_PROCESSOR_PROP, DummySchemaPostProcessor.class.getName());
     SchemaProvider provider =
         UtilHelpers.wrapSchemaProviderWithPostProcessor(
-        UtilHelpers.createSchemaProvider(DummySchemaProvider.class.getName(), properties, jsc),
-            properties, jsc,null);
+            UtilHelpers.createSchemaProvider(DummySchemaProvider.class.getName(), properties, jsc),
+            properties, jsc, null);
 
     Schema schema = provider.getSourceSchema();
     assertEquals(schema.getType(), Type.RECORD);
@@ -76,9 +86,9 @@ public class TestSchemaPostProcessor extends UtilitiesTestBase {
     transformerClassNames.add(FlatteningTransformer.class.getName());
 
     SchemaProvider provider =
-            UtilHelpers.wrapSchemaProviderWithPostProcessor(
-                    UtilHelpers.createSchemaProvider(SparkAvroSchemaProvider.class.getName(), properties, jsc),
-                    properties, jsc, transformerClassNames);
+        UtilHelpers.wrapSchemaProviderWithPostProcessor(
+            UtilHelpers.createSchemaProvider(SparkAvroSchemaProvider.class.getName(), properties, jsc),
+            properties, jsc, transformerClassNames);
 
     Schema schema = provider.getSourceSchema();
     assertEquals(schema.getType(), Type.RECORD);
@@ -144,6 +154,48 @@ public class TestSchemaPostProcessor extends UtilitiesTestBase {
     Assertions.assertThrows(HoodieSchemaPostProcessException.class, () -> processor.processSchema(schema));
   }
 
+  @ParameterizedTest
+  @MethodSource("configParams")
+  public void testAddPrimitiveTypeColumn(String type) {
+    properties.put(AddColumnSchemaPostProcessor.Config.SCHEMA_POST_PROCESSOR_ADD_COLUMN_NAME_PROP.key(), "primitive_column");
+    properties.put(AddColumnSchemaPostProcessor.Config.SCHEMA_POST_PROCESSOR_ADD_COLUMN_TYPE_PROP.key(), type);
+    properties.put(AddColumnSchemaPostProcessor.Config.SCHEMA_POST_PROCESSOR_ADD_COLUMN_NEXT_PROP.key(), "fare");
+    properties.put(AddColumnSchemaPostProcessor.Config.SCHEMA_POST_PROCESSOR_ADD_COLUMN_DOC_PROP.key(), "primitive column test");
+
+    AddColumnSchemaPostProcessor processor = new AddColumnSchemaPostProcessor(properties, null);
+    Schema schema = new Schema.Parser().parse(ORIGINAL_SCHEMA);
+    Schema targetSchema = processor.processSchema(schema);
+
+    Schema.Field newColumn = targetSchema.getField("primitive_column");
+    Schema.Field nextColumn = targetSchema.getField("fare");
+
+    assertNotNull(newColumn);
+    assertEquals("primitive column test", newColumn.doc());
+    assertEquals(type, newColumn.schema().getType().getName());
+    assertEquals(nextColumn.pos(), newColumn.pos() + 1);
+  }
+
+  @Test
+  public void testAddDecimalColumn() {
+    properties.put(AddColumnSchemaPostProcessor.Config.SCHEMA_POST_PROCESSOR_ADD_COLUMN_NAME_PROP.key(), "decimal_column");
+    properties.put(AddColumnSchemaPostProcessor.Config.SCHEMA_POST_PROCESSOR_ADD_COLUMN_TYPE_PROP.key(), "decimal");
+    properties.put(AddColumnSchemaPostProcessor.Config.SCHEMA_POST_PROCESSOR_ADD_COLUMN_DOC_PROP.key(), "decimal column test");
+    properties.put(AddColumnSchemaPostProcessor.Config.SCHEMA_POST_PROCESSOR_ADD_COLUMN_DEFAULT_PROP.key(), "0.75");
+    properties.put(AddColumnSchemaPostProcessor.Config.SCHEMA_POST_PROCESSOR_ADD_COLUMN_PRECISION_PROP.key(), "8");
+    properties.put(AddColumnSchemaPostProcessor.Config.SCHEMA_POST_PROCESSOR_ADD_COLUMN_SCALE_PROP.key(), "6");
+    properties.put(AddColumnSchemaPostProcessor.Config.SCHEMA_POST_PROCESSOR_ADD_COLUMN_SIZE_PROP.key(), "8");
+
+    AddColumnSchemaPostProcessor processor = new AddColumnSchemaPostProcessor(properties, null);
+    Schema schema = new Schema.Parser().parse(ORIGINAL_SCHEMA);
+    Schema targetSchema = processor.processSchema(schema);
+
+    Schema.Field newColumn = targetSchema.getField("decimal_column");
+
+    assertNotNull(newColumn);
+    assertEquals("decimal", newColumn.schema().getLogicalType().getName());
+    assertEquals(5, newColumn.pos());
+  }
+
   @Test
   public void testSparkAvroSchema() throws IOException {
     SparkAvroPostProcessor processor = new SparkAvroPostProcessor(properties, null);