You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by wa...@apache.org on 2022/09/23 11:53:27 UTC
[hudi] branch master updated: [HUDI-3523] Introduce AddColumnSchemaPostProcessor to support add columns to the end of a schema (#5031)
This is an automated email from the ASF dual-hosted git repository.
wangxianghu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 092375fc1f [HUDI-3523] Introduce AddColumnSchemaPostProcessor to support add columns to the end of a schema (#5031)
092375fc1f is described below
commit 092375fc1f058c7841d9d63cd04e842c062fae74
Author: wangxianghu <wa...@apache.org>
AuthorDate: Fri Sep 23 19:53:18 2022 +0800
[HUDI-3523] Introduce AddColumnSchemaPostProcessor to support add columns to the end of a schema (#5031)
---
.../schema/AddColumnSchemaPostProcessor.java | 172 +++++++++++++++++++++
.../hudi/utilities/TestSchemaPostProcessor.java | 62 +++++++-
2 files changed, 229 insertions(+), 5 deletions(-)
diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/AddColumnSchemaPostProcessor.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/AddColumnSchemaPostProcessor.java
new file mode 100644
index 0000000000..ed5d8a2355
--- /dev/null
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/AddColumnSchemaPostProcessor.java
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities.schema;
+
+import org.apache.hudi.common.config.ConfigProperty;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.utilities.exception.HoodieSchemaPostProcessException;
+
+import org.apache.avro.LogicalTypes;
+import org.apache.avro.Schema;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+import org.apache.spark.api.java.JavaSparkContext;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Locale;
+
+/**
+ * A {@link SchemaPostProcessor} use to add column to given schema. Currently. only supports adding one column at a time.
+ * Users can specify the position of new column by config {@link Config#SCHEMA_POST_PROCESSOR_ADD_COLUMN_NEXT_PROP},
+ * the new column will be added before this column.
+ * <p>
+ * Currently supported types : bytes, string, int, long, float, double, boolean, decimal
+ */
+public class AddColumnSchemaPostProcessor extends SchemaPostProcessor {
+
+ private static final Logger LOG = LogManager.getLogger(AddColumnSchemaPostProcessor.class);
+
+ public AddColumnSchemaPostProcessor(TypedProperties props, JavaSparkContext jssc) {
+ super(props, jssc);
+ }
+
+ /**
+ * Configs supported.
+ */
+ public static class Config {
+ public static final ConfigProperty<String> SCHEMA_POST_PROCESSOR_ADD_COLUMN_NAME_PROP = ConfigProperty
+ .key("hoodie.deltastreamer.schemaprovider.schema_post_processor.add.column.name")
+ .noDefaultValue()
+ .withDocumentation("New column's name");
+
+ public static final ConfigProperty<String> SCHEMA_POST_PROCESSOR_ADD_COLUMN_TYPE_PROP = ConfigProperty
+ .key("hoodie.deltastreamer.schemaprovider.schema_post_processor.add.column.type")
+ .noDefaultValue()
+ .withDocumentation("New column's type");
+
+ public static final ConfigProperty<String> SCHEMA_POST_PROCESSOR_ADD_COLUMN_DOC_PROP = ConfigProperty
+ .key("hoodie.deltastreamer.schemaprovider.schema_post_processor.add.column.doc")
+ .noDefaultValue()
+ .withDocumentation("New column's doc");
+
+ public static final ConfigProperty<String> SCHEMA_POST_PROCESSOR_ADD_COLUMN_DEFAULT_PROP = ConfigProperty
+ .key("hoodie.deltastreamer.schemaprovider.schema_post_processor.add.column.default")
+ .noDefaultValue()
+ .withDocumentation("New column's default value");
+
+ public static final ConfigProperty<String> SCHEMA_POST_PROCESSOR_ADD_COLUMN_SIZE_PROP = ConfigProperty
+ .key("hoodie.deltastreamer.schemaprovider.schema_post_processor.add.column.size")
+ .noDefaultValue()
+ .withDocumentation("New column's size, used in decimal type");
+
+ public static final ConfigProperty<String> SCHEMA_POST_PROCESSOR_ADD_COLUMN_PRECISION_PROP = ConfigProperty
+ .key("hoodie.deltastreamer.schemaprovider.schema_post_processor.add.column.precision")
+ .noDefaultValue()
+ .withDocumentation("New column's precision, used in decimal type");
+
+ public static final ConfigProperty<String> SCHEMA_POST_PROCESSOR_ADD_COLUMN_SCALE_PROP = ConfigProperty
+ .key("hoodie.deltastreamer.schemaprovider.schema_post_processor.add.column.scale")
+ .noDefaultValue()
+ .withDocumentation("New column's precision, used in decimal type");
+
+ public static final ConfigProperty<String> SCHEMA_POST_PROCESSOR_ADD_COLUMN_NEXT_PROP = ConfigProperty
+ .key("hoodie.deltastreamer.schemaprovider.schema_post_processor.add.column.next")
+ .defaultValue(HoodieRecord.HOODIE_IS_DELETED)
+ .withDocumentation("Column name which locate next to new column, `_hoodie_is_deleted` by default.");
+ }
+
+ public static final String BYTES = "BYTES";
+ public static final String STRING = "STRING";
+ public static final String INT = "INT";
+ public static final String LONG = "LONG";
+ public static final String FLOAT = "FLOAT";
+ public static final String DOUBLE = "DOUBLE";
+ public static final String BOOLEAN = "BOOLEAN";
+ public static final String DECIMAL = "DECIMAL";
+
+ @Override
+ public Schema processSchema(Schema schema) {
+ String newColumnName = this.config.getString(Config.SCHEMA_POST_PROCESSOR_ADD_COLUMN_NAME_PROP.key());
+
+ if (schema.getField(newColumnName) != null) {
+ LOG.warn(String.format("Column %s already exist!", newColumnName));
+ return schema;
+ }
+
+ List<Schema.Field> sourceFields = schema.getFields();
+ List<Schema.Field> targetFields = new ArrayList<>(sourceFields.size() + 1);
+
+ String nextColumnName = this.config.getString(Config.SCHEMA_POST_PROCESSOR_ADD_COLUMN_NEXT_PROP.key(),
+ Config.SCHEMA_POST_PROCESSOR_ADD_COLUMN_NEXT_PROP.defaultValue());
+
+ // mark whether the new column is added
+ boolean isAdded = false;
+ for (Schema.Field sourceField : sourceFields) {
+ if (sourceField.name().equals(nextColumnName)) {
+ targetFields.add(buildNewColumn());
+ isAdded = true;
+ }
+ targetFields.add(new Schema.Field(sourceField.name(), sourceField.schema(), sourceField.doc(), sourceField.defaultVal()));
+ }
+
+ // this would happen when `nextColumn` does not exist. just append the new column to the end
+ if (!isAdded) {
+ targetFields.add(buildNewColumn());
+ }
+
+ return Schema.createRecord(schema.getName(), schema.getDoc(), schema.getNamespace(), false, targetFields);
+ }
+
+ private Schema.Field buildNewColumn() {
+ Schema.Field result;
+
+ String columnName = this.config.getString(Config.SCHEMA_POST_PROCESSOR_ADD_COLUMN_NAME_PROP.key());
+ String type = this.config.getString(Config.SCHEMA_POST_PROCESSOR_ADD_COLUMN_TYPE_PROP.key()).toUpperCase(Locale.ROOT);
+ String doc = this.config.getString(Config.SCHEMA_POST_PROCESSOR_ADD_COLUMN_DOC_PROP.key(), null);
+ Object defaultValue = this.config.getOrDefault(Config.SCHEMA_POST_PROCESSOR_ADD_COLUMN_DEFAULT_PROP.key(),
+ null);
+
+ switch (type) {
+ case STRING:
+ case BYTES:
+ case INT:
+ case LONG:
+ case FLOAT:
+ case DOUBLE:
+ case BOOLEAN:
+ result = new Schema.Field(columnName, Schema.create(Schema.Type.valueOf(type)), doc, defaultValue);
+ break;
+ case DECIMAL:
+ int size = this.config.getInteger(Config.SCHEMA_POST_PROCESSOR_ADD_COLUMN_SIZE_PROP.key(), 10);
+ int precision = this.config.getInteger(Config.SCHEMA_POST_PROCESSOR_ADD_COLUMN_PRECISION_PROP.key());
+ int scale = this.config.getInteger(Config.SCHEMA_POST_PROCESSOR_ADD_COLUMN_SCALE_PROP.key());
+
+ Schema decimalSchema = Schema.createFixed(null, null, null, size);
+ LogicalTypes.decimal(precision, scale).addToSchema(decimalSchema);
+
+ result = new Schema.Field(columnName, decimalSchema, doc, defaultValue);
+ break;
+ default:
+ throw new HoodieSchemaPostProcessException(String.format("Type %s is not supported", type));
+ }
+ return result;
+ }
+}
diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestSchemaPostProcessor.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestSchemaPostProcessor.java
index d228d87446..20dcc2262c 100644
--- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestSchemaPostProcessor.java
+++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestSchemaPostProcessor.java
@@ -20,6 +20,7 @@ package org.apache.hudi.utilities;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.utilities.exception.HoodieSchemaPostProcessException;
+import org.apache.hudi.utilities.schema.AddColumnSchemaPostProcessor;
import org.apache.hudi.utilities.schema.DeleteSupportSchemaPostProcessor;
import org.apache.hudi.utilities.schema.DropColumnSchemaPostProcessor;
import org.apache.hudi.utilities.schema.SchemaPostProcessor;
@@ -33,10 +34,14 @@ import org.apache.avro.Schema;
import org.apache.avro.Schema.Type;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
+import java.util.stream.Stream;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
@@ -55,13 +60,18 @@ public class TestSchemaPostProcessor extends UtilitiesTestBase {
+ "{\"name\":\"_row_key\",\"type\":\"string\"},{\"name\":\"rider\",\"type\":\"string\"},{\"name\":\"driver\","
+ "\"type\":\"string\"},{\"name\":\"fare\",\"type\":\"double\"}]}";
+ private static Stream<Arguments> configParams() {
+ String[] types = {"bytes", "string", "int", "long", "float", "double", "boolean"};
+ return Stream.of(types).map(Arguments::of);
+ }
+
@Test
public void testPostProcessor() throws IOException {
properties.put(Config.SCHEMA_POST_PROCESSOR_PROP, DummySchemaPostProcessor.class.getName());
SchemaProvider provider =
UtilHelpers.wrapSchemaProviderWithPostProcessor(
- UtilHelpers.createSchemaProvider(DummySchemaProvider.class.getName(), properties, jsc),
- properties, jsc,null);
+ UtilHelpers.createSchemaProvider(DummySchemaProvider.class.getName(), properties, jsc),
+ properties, jsc, null);
Schema schema = provider.getSourceSchema();
assertEquals(schema.getType(), Type.RECORD);
@@ -76,9 +86,9 @@ public class TestSchemaPostProcessor extends UtilitiesTestBase {
transformerClassNames.add(FlatteningTransformer.class.getName());
SchemaProvider provider =
- UtilHelpers.wrapSchemaProviderWithPostProcessor(
- UtilHelpers.createSchemaProvider(SparkAvroSchemaProvider.class.getName(), properties, jsc),
- properties, jsc, transformerClassNames);
+ UtilHelpers.wrapSchemaProviderWithPostProcessor(
+ UtilHelpers.createSchemaProvider(SparkAvroSchemaProvider.class.getName(), properties, jsc),
+ properties, jsc, transformerClassNames);
Schema schema = provider.getSourceSchema();
assertEquals(schema.getType(), Type.RECORD);
@@ -144,6 +154,48 @@ public class TestSchemaPostProcessor extends UtilitiesTestBase {
Assertions.assertThrows(HoodieSchemaPostProcessException.class, () -> processor.processSchema(schema));
}
+ @ParameterizedTest
+ @MethodSource("configParams")
+ public void testAddPrimitiveTypeColumn(String type) {
+ properties.put(AddColumnSchemaPostProcessor.Config.SCHEMA_POST_PROCESSOR_ADD_COLUMN_NAME_PROP.key(), "primitive_column");
+ properties.put(AddColumnSchemaPostProcessor.Config.SCHEMA_POST_PROCESSOR_ADD_COLUMN_TYPE_PROP.key(), type);
+ properties.put(AddColumnSchemaPostProcessor.Config.SCHEMA_POST_PROCESSOR_ADD_COLUMN_NEXT_PROP.key(), "fare");
+ properties.put(AddColumnSchemaPostProcessor.Config.SCHEMA_POST_PROCESSOR_ADD_COLUMN_DOC_PROP.key(), "primitive column test");
+
+ AddColumnSchemaPostProcessor processor = new AddColumnSchemaPostProcessor(properties, null);
+ Schema schema = new Schema.Parser().parse(ORIGINAL_SCHEMA);
+ Schema targetSchema = processor.processSchema(schema);
+
+ Schema.Field newColumn = targetSchema.getField("primitive_column");
+ Schema.Field nextColumn = targetSchema.getField("fare");
+
+ assertNotNull(newColumn);
+ assertEquals("primitive column test", newColumn.doc());
+ assertEquals(type, newColumn.schema().getType().getName());
+ assertEquals(nextColumn.pos(), newColumn.pos() + 1);
+ }
+
+ @Test
+ public void testAddDecimalColumn() {
+ properties.put(AddColumnSchemaPostProcessor.Config.SCHEMA_POST_PROCESSOR_ADD_COLUMN_NAME_PROP.key(), "decimal_column");
+ properties.put(AddColumnSchemaPostProcessor.Config.SCHEMA_POST_PROCESSOR_ADD_COLUMN_TYPE_PROP.key(), "decimal");
+ properties.put(AddColumnSchemaPostProcessor.Config.SCHEMA_POST_PROCESSOR_ADD_COLUMN_DOC_PROP.key(), "decimal column test");
+ properties.put(AddColumnSchemaPostProcessor.Config.SCHEMA_POST_PROCESSOR_ADD_COLUMN_DEFAULT_PROP.key(), "0.75");
+ properties.put(AddColumnSchemaPostProcessor.Config.SCHEMA_POST_PROCESSOR_ADD_COLUMN_PRECISION_PROP.key(), "8");
+ properties.put(AddColumnSchemaPostProcessor.Config.SCHEMA_POST_PROCESSOR_ADD_COLUMN_SCALE_PROP.key(), "6");
+ properties.put(AddColumnSchemaPostProcessor.Config.SCHEMA_POST_PROCESSOR_ADD_COLUMN_SIZE_PROP.key(), "8");
+
+ AddColumnSchemaPostProcessor processor = new AddColumnSchemaPostProcessor(properties, null);
+ Schema schema = new Schema.Parser().parse(ORIGINAL_SCHEMA);
+ Schema targetSchema = processor.processSchema(schema);
+
+ Schema.Field newColumn = targetSchema.getField("decimal_column");
+
+ assertNotNull(newColumn);
+ assertEquals("decimal", newColumn.schema().getLogicalType().getName());
+ assertEquals(5, newColumn.pos());
+ }
+
@Test
public void testSparkAvroSchema() throws IOException {
SparkAvroPostProcessor processor = new SparkAvroPostProcessor(properties, null);