You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by "vinothchandar (via GitHub)" <gi...@apache.org> on 2023/04/25 13:42:59 UTC

[GitHub] [hudi] vinothchandar commented on a diff in pull request #8574: [HUDI-6139] Add support for Transformer schema validation in DeltaStreamer

vinothchandar commented on code in PR #8574:
URL: https://github.com/apache/hudi/pull/8574#discussion_r1176506429


##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/transform/Transformer.java:
##########
@@ -45,4 +46,8 @@ public interface Transformer {
    */
   @PublicAPIMethod(maturity = ApiMaturityLevel.STABLE)
   Dataset<Row> apply(JavaSparkContext jsc, SparkSession sparkSession, Dataset<Row> rowDataset, TypedProperties properties);
+
+  default Schema schemaTransform(JavaSparkContext jsc, SparkSession sparkSession, Schema incomingSchema, TypedProperties properties) {
+    throw new UnsupportedOperationException("Not implemented");

Review Comment:
   lets throw a specific Hudi exception here?



##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/transform/Transformer.java:
##########
@@ -45,4 +46,8 @@ public interface Transformer {
    */
   @PublicAPIMethod(maturity = ApiMaturityLevel.STABLE)
   Dataset<Row> apply(JavaSparkContext jsc, SparkSession sparkSession, Dataset<Row> rowDataset, TypedProperties properties);
+
+  default Schema schemaTransform(JavaSparkContext jsc, SparkSession sparkSession, Schema incomingSchema, TypedProperties properties) {

Review Comment:
   rename: trasnformedSchema() to indicate its returning the transformed schema. 



##########
hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java:
##########
@@ -1802,6 +1802,44 @@ private void testParquetDFSSource(boolean useSchemaProvider, List<String> transf
     testNum++;
   }
 
+  @Test
+  public void testMultipleTransformers() throws Exception {

Review Comment:
   lets use this opp to pull a new Transformer test?



##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/transform/ChainedTransformer.java:
##########
@@ -46,9 +110,40 @@ public List<String> getTransformersNames() {
   @Override
   public Dataset<Row> apply(JavaSparkContext jsc, SparkSession sparkSession, Dataset<Row> rowDataset, TypedProperties properties) {
     Dataset<Row> dataset = rowDataset;
+    Schema incomingSchema = enableSchemaValidation ? sourceSchemaOpt.get() : null;
     for (Transformer t : transformers) {
-      dataset = t.apply(jsc, sparkSession, dataset, properties);
+      String suffix = transformerToPropKeySuffix.get(t);
+      TypedProperties transformerProps = properties;
+      if (StringUtils.nonEmpty(suffix)) {
+        transformerProps = new TypedProperties(properties);
+        Map<String, Object> overrideKeysMap = new HashMap<>();
+        for (Map.Entry<Object, Object> entry : properties.entrySet()) {
+          String key = (String) entry.getKey();
+          if (key.endsWith("." + suffix)) {
+            overrideKeysMap.put(key.substring(0, key.length() - (suffix.length() + 1)), entry.getValue());
+          }
+        }
+        transformerProps.putAll(overrideKeysMap);
+      }
+      dataset = t.apply(jsc, sparkSession, dataset, transformerProps);
+      if (enableSchemaValidation) {
+        incomingSchema = validateAndGetTransformedSchema(t, dataset, incomingSchema, jsc, sparkSession, properties);
+      }
     }
     return dataset;
   }
+
+  private Schema validateAndGetTransformedSchema(Transformer transformer, Dataset<Row> dataset, Schema incomingSchema,

Review Comment:
   lets UT all these methods.



##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/transform/ChainedTransformer.java:
##########
@@ -46,9 +110,40 @@ public List<String> getTransformersNames() {
   @Override
   public Dataset<Row> apply(JavaSparkContext jsc, SparkSession sparkSession, Dataset<Row> rowDataset, TypedProperties properties) {
     Dataset<Row> dataset = rowDataset;
+    Schema incomingSchema = enableSchemaValidation ? sourceSchemaOpt.get() : null;
     for (Transformer t : transformers) {
-      dataset = t.apply(jsc, sparkSession, dataset, properties);
+      String suffix = transformerToPropKeySuffix.get(t);

Review Comment:
   pull into another method to keep this block easier to read?



##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/transform/ChainedTransformer.java:
##########
@@ -46,9 +110,40 @@ public List<String> getTransformersNames() {
   @Override
   public Dataset<Row> apply(JavaSparkContext jsc, SparkSession sparkSession, Dataset<Row> rowDataset, TypedProperties properties) {
     Dataset<Row> dataset = rowDataset;
+    Schema incomingSchema = enableSchemaValidation ? sourceSchemaOpt.get() : null;

Review Comment:
   lets use Option instead of `null` as sentinels.



##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/transform/ChainedTransformer.java:
##########
@@ -46,9 +110,40 @@ public List<String> getTransformersNames() {
   @Override
   public Dataset<Row> apply(JavaSparkContext jsc, SparkSession sparkSession, Dataset<Row> rowDataset, TypedProperties properties) {
     Dataset<Row> dataset = rowDataset;
+    Schema incomingSchema = enableSchemaValidation ? sourceSchemaOpt.get() : null;
     for (Transformer t : transformers) {
-      dataset = t.apply(jsc, sparkSession, dataset, properties);
+      String suffix = transformerToPropKeySuffix.get(t);
+      TypedProperties transformerProps = properties;
+      if (StringUtils.nonEmpty(suffix)) {
+        transformerProps = new TypedProperties(properties);
+        Map<String, Object> overrideKeysMap = new HashMap<>();
+        for (Map.Entry<Object, Object> entry : properties.entrySet()) {
+          String key = (String) entry.getKey();
+          if (key.endsWith("." + suffix)) {
+            overrideKeysMap.put(key.substring(0, key.length() - (suffix.length() + 1)), entry.getValue());
+          }
+        }
+        transformerProps.putAll(overrideKeysMap);
+      }
+      dataset = t.apply(jsc, sparkSession, dataset, transformerProps);
+      if (enableSchemaValidation) {
+        incomingSchema = validateAndGetTransformedSchema(t, dataset, incomingSchema, jsc, sparkSession, properties);

Review Comment:
   per current impl, we will error out here, even if the `schemaTransform()` method is not implemented?



##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/transform/ChainedTransformer.java:
##########
@@ -46,9 +110,40 @@ public List<String> getTransformersNames() {
   @Override
   public Dataset<Row> apply(JavaSparkContext jsc, SparkSession sparkSession, Dataset<Row> rowDataset, TypedProperties properties) {
     Dataset<Row> dataset = rowDataset;
+    Schema incomingSchema = enableSchemaValidation ? sourceSchemaOpt.get() : null;
     for (Transformer t : transformers) {
-      dataset = t.apply(jsc, sparkSession, dataset, properties);
+      String suffix = transformerToPropKeySuffix.get(t);
+      TypedProperties transformerProps = properties;
+      if (StringUtils.nonEmpty(suffix)) {
+        transformerProps = new TypedProperties(properties);
+        Map<String, Object> overrideKeysMap = new HashMap<>();
+        for (Map.Entry<Object, Object> entry : properties.entrySet()) {
+          String key = (String) entry.getKey();
+          if (key.endsWith("." + suffix)) {
+            overrideKeysMap.put(key.substring(0, key.length() - (suffix.length() + 1)), entry.getValue());
+          }
+        }
+        transformerProps.putAll(overrideKeysMap);
+      }
+      dataset = t.apply(jsc, sparkSession, dataset, transformerProps);
+      if (enableSchemaValidation) {
+        incomingSchema = validateAndGetTransformedSchema(t, dataset, incomingSchema, jsc, sparkSession, properties);
+      }
     }
     return dataset;
   }
+
+  private Schema validateAndGetTransformedSchema(Transformer transformer, Dataset<Row> dataset, Schema incomingSchema,
+                                               JavaSparkContext jsc, SparkSession sparkSession, TypedProperties properties) {
+    Schema targetSchema = AvroConversionUtils.convertStructTypeToAvroSchema(dataset.schema(), incomingSchema.getName(),
+        incomingSchema.getNamespace());
+    Schema expectedTargetSchema = transformer.schemaTransform(jsc, sparkSession, incomingSchema, properties);
+    // TODO: Check the API arguments below

Review Comment:
   please resolve TODOs before we can land.



##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/transform/ChainedTransformer.java:
##########
@@ -18,25 +18,89 @@
 
 package org.apache.hudi.utilities.transform;
 
+import org.apache.hudi.AvroConversionUtils;
+import org.apache.hudi.avro.AvroSchemaUtils;
 import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ReflectionUtils;
+import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.exception.HoodieException;
 
+import org.apache.avro.Schema;
 import org.apache.spark.api.java.JavaSparkContext;
 import org.apache.spark.sql.Dataset;
 import org.apache.spark.sql.Row;
 import org.apache.spark.sql.SparkSession;
 
+import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.stream.Collectors;
 
 /**
  * A {@link Transformer} to chain other {@link Transformer}s and apply sequentially.
  */
 public class ChainedTransformer implements Transformer {
 
-  private List<Transformer> transformers;
+  // Delimiter used to separate class name and the property key suffix. The suffix comes first.
+  private static final String TRANSFORMER_CLASS_NAME_KEY_SUFFIX_DELIMITER = ":";
+
+  private final List<Transformer> transformers;
+  private final Map<Transformer, String> transformerToPropKeySuffix;
+  private Option<Schema> sourceSchemaOpt = Option.empty();
+  private boolean enableSchemaValidation = false;
 
   public ChainedTransformer(List<Transformer> transformers) {
     this.transformers = transformers;
+    this.transformerToPropKeySuffix = new HashMap<>(transformers.size());
+    for (Transformer transformer : this.transformers) {
+      transformerToPropKeySuffix.put(transformer, "");
+    }
+  }
+
+  /**
+   * Creates a chained transformer using the input transformer class names. The name can also include
+   * a suffix. This suffix can be appended with the property keys to identify properties related to the transformer.
+   * E:g - tr1:org.apache.hudi.utilities.transform.SqlQueryBasedTransformer can be used along with property key
+   * hoodie.deltastreamer.transformer.sql.tr1. Here tr1 is a suffix used to identify the keys specific to this transformer.
+   * This suffix is removed from the configuration keys when the transformer is used. This is useful when there are two or more
+   * transformers using the same config keys and expect different values for those keys.
+   *
+   * @param sourceSchemaOpt                   Source Schema
+   * @param configuredTransformers            List of configured transformer class names.
+   * @param enableSchemaValidation if true, schema is validated for the transformed data against expected schema.
+   *                                          Expected schema is provided by {@link Transformer#schemaTransform}
+   */
+  public ChainedTransformer(List<String> configuredTransformers, Option<Schema> sourceSchemaOpt, boolean enableSchemaValidation) {
+    this.transformerToPropKeySuffix = new HashMap<>(configuredTransformers.size());
+    this.transformers = new ArrayList<>(configuredTransformers.size());
+    this.enableSchemaValidation = enableSchemaValidation;
+    this.sourceSchemaOpt = sourceSchemaOpt;
+    if (enableSchemaValidation) {
+      ValidationUtils.checkArgument(sourceSchemaOpt.isPresent(), "Source schema should not be null");
+    }
+
+    List<Pair<String, String>> transformerClassNamesToSuffixList = new ArrayList<>(configuredTransformers.size());

Review Comment:
   new method + UT?



##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/transform/Transformer.java:
##########
@@ -45,4 +46,8 @@ public interface Transformer {
    */
   @PublicAPIMethod(maturity = ApiMaturityLevel.STABLE)
   Dataset<Row> apply(JavaSparkContext jsc, SparkSession sparkSession, Dataset<Row> rowDataset, TypedProperties properties);
+
+  default Schema schemaTransform(JavaSparkContext jsc, SparkSession sparkSession, Schema incomingSchema, TypedProperties properties) {
+    throw new UnsupportedOperationException("Not implemented");

Review Comment:
   Should it return an `Option` instead? The handling of code in an upper layer need not be written in terms of exception handling then?



##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java:
##########
@@ -276,9 +276,19 @@ public static class Config implements Serializable {
             + ". Allows transforming raw source Dataset to a target Dataset (conforming to target schema) before "
             + "writing. Default : Not set. E:g - org.apache.hudi.utilities.transform.SqlQueryBasedTransformer (which "
             + "allows a SQL query templated to be passed as a transformation function). "
-            + "Pass a comma-separated list of subclass names to chain the transformations.")
+            + "Pass a comma-separated list of subclass names to chain the transformations. Transformer can also include "
+            + "a suffix. This suffix can be appended with the property keys to identify properties related to the transformer. "
+            + "E:g - tr1:org.apache.hudi.utilities.transform.SqlQueryBasedTransformer can be used along with property key "
+            + "hoodie.deltastreamer.transformer.sql.tr1. Here tr1 is a suffix used to identify the keys specific to this transformer. "

Review Comment:
   It may help to allow such a scenario though. consider, the following. 
   
   ```
   tr1:io.bytearray.TransformerA
   tr2:io.bytearray.TransformerB
   tr3:io.bytearray.TransformerA
   ```
   
   with configs 
   
   ```
   io.bytearray.transformera.x=1
   io.bytearray.transformera.tr1.y=2
   io.bytearray.transformera.tr3.y=3
   ```
   here, we pass in same value for `x` into tr1/tr3. but different value for y? Can we think through this. 



##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java:
##########
@@ -276,9 +276,19 @@ public static class Config implements Serializable {
             + ". Allows transforming raw source Dataset to a target Dataset (conforming to target schema) before "
             + "writing. Default : Not set. E:g - org.apache.hudi.utilities.transform.SqlQueryBasedTransformer (which "
             + "allows a SQL query templated to be passed as a transformation function). "
-            + "Pass a comma-separated list of subclass names to chain the transformations.")
+            + "Pass a comma-separated list of subclass names to chain the transformations. Transformer can also include "

Review Comment:
    it's added to the start of the transformer (prefix) and end of property (suffix)?  lets call this an `id` instead of `suffix`? its easier to understand.
   
   and if I understand correctly, there ordering of transformers should not matter. rename everywhere uniformly? 



##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/transform/ChainedTransformer.java:
##########
@@ -18,25 +18,89 @@
 
 package org.apache.hudi.utilities.transform;
 
+import org.apache.hudi.AvroConversionUtils;
+import org.apache.hudi.avro.AvroSchemaUtils;
 import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ReflectionUtils;
+import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.exception.HoodieException;
 
+import org.apache.avro.Schema;
 import org.apache.spark.api.java.JavaSparkContext;
 import org.apache.spark.sql.Dataset;
 import org.apache.spark.sql.Row;
 import org.apache.spark.sql.SparkSession;
 
+import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.stream.Collectors;
 
 /**
  * A {@link Transformer} to chain other {@link Transformer}s and apply sequentially.
  */
 public class ChainedTransformer implements Transformer {
 
-  private List<Transformer> transformers;
+  // Delimiter used to separate class name and the property key suffix. The suffix comes first.
+  private static final String TRANSFORMER_CLASS_NAME_KEY_SUFFIX_DELIMITER = ":";
+
+  private final List<Transformer> transformers;
+  private final Map<Transformer, String> transformerToPropKeySuffix;
+  private Option<Schema> sourceSchemaOpt = Option.empty();
+  private boolean enableSchemaValidation = false;
 
   public ChainedTransformer(List<Transformer> transformers) {
     this.transformers = transformers;
+    this.transformerToPropKeySuffix = new HashMap<>(transformers.size());
+    for (Transformer transformer : this.transformers) {
+      transformerToPropKeySuffix.put(transformer, "");
+    }
+  }
+
+  /**
+   * Creates a chained transformer using the input transformer class names. The name can also include
+   * a suffix. This suffix can be appended with the property keys to identify properties related to the transformer.
+   * E:g - tr1:org.apache.hudi.utilities.transform.SqlQueryBasedTransformer can be used along with property key
+   * hoodie.deltastreamer.transformer.sql.tr1. Here tr1 is a suffix used to identify the keys specific to this transformer.
+   * This suffix is removed from the configuration keys when the transformer is used. This is useful when there are two or more
+   * transformers using the same config keys and expect different values for those keys.
+   *
+   * @param sourceSchemaOpt                   Source Schema
+   * @param configuredTransformers            List of configured transformer class names.
+   * @param enableSchemaValidation if true, schema is validated for the transformed data against expected schema.
+   *                                          Expected schema is provided by {@link Transformer#schemaTransform}
+   */
+  public ChainedTransformer(List<String> configuredTransformers, Option<Schema> sourceSchemaOpt, boolean enableSchemaValidation) {
+    this.transformerToPropKeySuffix = new HashMap<>(configuredTransformers.size());
+    this.transformers = new ArrayList<>(configuredTransformers.size());
+    this.enableSchemaValidation = enableSchemaValidation;
+    this.sourceSchemaOpt = sourceSchemaOpt;
+    if (enableSchemaValidation) {
+      ValidationUtils.checkArgument(sourceSchemaOpt.isPresent(), "Source schema should not be null");
+    }
+
+    List<Pair<String, String>> transformerClassNamesToSuffixList = new ArrayList<>(configuredTransformers.size());
+    for (String configuredTransformer : configuredTransformers) {
+      if (!configuredTransformer.contains(":")) {
+        transformerClassNamesToSuffixList.add(Pair.of(configuredTransformer, ""));
+      } else {
+        String[] splits = configuredTransformer.split(":");
+        if (splits.length > 2) {
+          throw new IllegalArgumentException("There should only be one colon in a configured transformer");
+        }
+        transformerClassNamesToSuffixList.add(Pair.of(splits[1], splits[0]));
+      }
+    }
+
+    for (Pair<String, String> pair : transformerClassNamesToSuffixList) {

Review Comment:
   I think we are missing a check for id/suffix uniqueness. we should error out if the same suffix is reused. 



##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java:
##########
@@ -276,9 +276,19 @@ public static class Config implements Serializable {
             + ". Allows transforming raw source Dataset to a target Dataset (conforming to target schema) before "
             + "writing. Default : Not set. E:g - org.apache.hudi.utilities.transform.SqlQueryBasedTransformer (which "
             + "allows a SQL query templated to be passed as a transformation function). "
-            + "Pass a comma-separated list of subclass names to chain the transformations.")
+            + "Pass a comma-separated list of subclass names to chain the transformations. Transformer can also include "
+            + "a suffix. This suffix can be appended with the property keys to identify properties related to the transformer. "
+            + "E:g - tr1:org.apache.hudi.utilities.transform.SqlQueryBasedTransformer can be used along with property key "
+            + "hoodie.deltastreamer.transformer.sql.tr1. Here tr1 is a suffix used to identify the keys specific to this transformer. "

Review Comment:
   can a user have suffix/id in transformer class name and not in the property? lets clarify such behavior in docs in either case and also have checks to guard against misconfiguration. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org