You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by GitBox <gi...@apache.org> on 2022/09/26 00:49:37 UTC

[GitHub] [hudi] nsivabalan commented on a diff in pull request #6761: [HUDI-4904] Add support for unraveling proto schemas in ProtoClassBasedSchemaProvider

nsivabalan commented on code in PR #6761:
URL: https://github.com/apache/hudi/pull/6761#discussion_r979482700


##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/ProtoClassBasedSchemaProvider.java:
##########
@@ -38,27 +39,43 @@ public class ProtoClassBasedSchemaProvider extends SchemaProvider {
    * Configs supported.
    */
   public static class Config {
-    public static final String PROTO_SCHEMA_CLASS_NAME = "hoodie.deltastreamer.schemaprovider.proto.className";
-    public static final String PROTO_SCHEMA_FLATTEN_WRAPPED_PRIMITIVES = "hoodie.deltastreamer.schemaprovider.proto.flattenWrappers";
+    private static final String PREFIX = "hoodie.deltastreamer.schemaprovider.proto";
+    public static final ConfigProperty<String> PROTO_SCHEMA_CLASS_NAME = ConfigProperty.key(PREFIX + ".className")

Review Comment:
   usually we don't follow camel notations for configs. 
   PREFIX + ".class.name"



##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/ProtoClassBasedSchemaProvider.java:
##########
@@ -38,27 +39,43 @@ public class ProtoClassBasedSchemaProvider extends SchemaProvider {
    * Configs supported.
    */
   public static class Config {
-    public static final String PROTO_SCHEMA_CLASS_NAME = "hoodie.deltastreamer.schemaprovider.proto.className";
-    public static final String PROTO_SCHEMA_FLATTEN_WRAPPED_PRIMITIVES = "hoodie.deltastreamer.schemaprovider.proto.flattenWrappers";
+    private static final String PREFIX = "hoodie.deltastreamer.schemaprovider.proto";
+    public static final ConfigProperty<String> PROTO_SCHEMA_CLASS_NAME = ConfigProperty.key(PREFIX + ".className")
+        .noDefaultValue()
+        .sinceVersion("0.13.0")
+        .withDocumentation("The Protobuf Message class used as the source for the schema.");
+
+    public static final ConfigProperty<Boolean> PROTO_SCHEMA_FLATTEN_WRAPPED_PRIMITIVES = ConfigProperty.key(PREFIX + ".flattenWrappers")
+        .defaultValue(false)
+        .sinceVersion("0.13.0")
+        .withDocumentation("When set to false wrapped primitives like Int64Value are translated to a record with a single 'value' field instead of simply a nullable value");
+    public static final ConfigProperty<Integer> PROTO_SCHEMA_MAX_RECURSION_DEPTH = ConfigProperty.key(PREFIX + ".maxRecursionDepth")

Review Comment:
   minor. empty line between two configs. 



##########
hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestProtoConversionUtil.java:
##########
@@ -35,24 +38,99 @@
 import com.google.protobuf.UInt64Value;
 import org.apache.avro.Schema;
 import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.generic.GenericDatumWriter;
 import org.apache.avro.generic.GenericRecord;
 import com.google.protobuf.util.Timestamps;
+import org.apache.avro.io.BinaryDecoder;
+import org.apache.avro.io.BinaryEncoder;
+import org.apache.avro.io.DecoderFactory;
+import org.apache.avro.io.EncoderFactory;
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.Test;
 
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.UncheckedIOException;
 import java.nio.ByteBuffer;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.Scanner;
 import java.util.function.Function;
 import java.util.stream.Collectors;
 
 public class TestProtoConversionUtil {
   @Test
-  public void allFieldsSet_wellKnownTypesAreNested() {
+  public void allFieldsSet_wellKnownTypesAreNested() throws IOException {
+    Schema.Parser parser = new Schema.Parser();
+    Schema convertedSchema = parser.parse(getClass().getClassLoader().getResourceAsStream("schema-provider/proto/sample_schema_nested.avsc"));
+    Pair<Sample, GenericRecord> inputAndOutput = createInputOutputSampleWithWellKnownTypesNested(convertedSchema);
+    GenericRecord actual = serializeAndDeserializeAvro(ProtoConversionUtil.convertToAvro(convertedSchema, inputAndOutput.getLeft()), convertedSchema);
+    Assertions.assertEquals(inputAndOutput.getRight(), actual);
+  }
+
+  @Test
+  public void noFieldsSet_wellKnownTypesAreNested() throws IOException {
+    Sample sample = Sample.newBuilder().build();
+    Schema.Parser parser = new Schema.Parser();
+    Schema convertedSchema = parser.parse(getClass().getClassLoader().getResourceAsStream("schema-provider/proto/sample_schema_nested.avsc"));
+    GenericRecord actual = serializeAndDeserializeAvro(ProtoConversionUtil.convertToAvro(convertedSchema, sample), convertedSchema);
+    Assertions.assertEquals(createDefaultOutputWithWellKnownTypesNested(convertedSchema), actual);
+  }
+
+  @Test
+  public void allFieldsSet_wellKnownTypesAreFlattened() throws IOException {
+    Schema.Parser parser = new Schema.Parser();
+    Schema convertedSchema = parser.parse(getClass().getClassLoader().getResourceAsStream("schema-provider/proto/sample_schema_flattened.avsc"));
+    Pair<Sample, GenericRecord> inputAndOutput = createInputOutputSampleWithWellKnownTypesFlattened(convertedSchema);
+    GenericRecord actual = serializeAndDeserializeAvro(ProtoConversionUtil.convertToAvro(convertedSchema, inputAndOutput.getLeft()), convertedSchema);
+    Assertions.assertEquals(inputAndOutput.getRight(), actual);
+  }
+
+  @Test
+  public void noFieldsSet_wellKnownTypesAreFlattened() throws IOException {
+    Sample sample = Sample.newBuilder().build();
+    Schema.Parser parser = new Schema.Parser();
+    Schema convertedSchema = parser.parse(getClass().getClassLoader().getResourceAsStream("schema-provider/proto/sample_schema_flattened.avsc"));
+    GenericRecord actual = serializeAndDeserializeAvro(ProtoConversionUtil.convertToAvro(convertedSchema, sample), convertedSchema);
+    Assertions.assertEquals(createDefaultOutputWithWellKnownTypesFlattened(convertedSchema), actual);
+  }
+
+  @Test
+  public void recursiveSchema_noOverflow() throws IOException {
+    Schema.Parser parser = new Schema.Parser();
+    Schema convertedSchema = parser.parse(getClass().getClassLoader().getResourceAsStream("schema-provider/proto/parent_schema_recursive.avsc"));
+    Pair<Parent, GenericRecord> inputAndOutput = createInputOutputForRecursiveSchemaNoOverflow(convertedSchema);
+    GenericRecord actual = serializeAndDeserializeAvro(ProtoConversionUtil.convertToAvro(convertedSchema, inputAndOutput.getLeft()), convertedSchema);
+    Assertions.assertEquals(inputAndOutput.getRight(), actual);
+  }
+
+  @Test
+  public void recursiveSchema_withOverflow() throws Exception {
+    Schema.Parser parser = new Schema.Parser();
+    Schema convertedSchema = parser.parse(getClass().getClassLoader().getResourceAsStream("schema-provider/proto/parent_schema_recursive.avsc"));
+    Pair<Parent, GenericRecord> inputAndOutput = createInputOutputForRecursiveSchemaWithOverflow(convertedSchema);
+    Parent input = inputAndOutput.getLeft();
+    GenericRecord actual = serializeAndDeserializeAvro(ProtoConversionUtil.convertToAvro(convertedSchema, inputAndOutput.getLeft()), convertedSchema);
+    Assertions.assertEquals(inputAndOutput.getRight(), actual);
+    // assert that overflow data can be read back into proto class
+    Child parsedSingleChildOverflow = Child.parseFrom(getOverflowBytesFromChildRecord((GenericRecord) actual.get("child")));
+    Assertions.assertEquals(input.getChild().getRecurseField().getRecurseField(), parsedSingleChildOverflow);
+    // Get children list
+    GenericData.Array<GenericRecord> array = (GenericData.Array<GenericRecord>) actual.get("children");
+    Child parsedChildren1Overflow = Child.parseFrom(getOverflowBytesFromChildRecord(array.get(0)));
+    Assertions.assertEquals(input.getChildren(0).getRecurseField().getRecurseField(), parsedChildren1Overflow);
+    Child parsedChildren2Overflow = Child.parseFrom(getOverflowBytesFromChildRecord(array.get(1)));
+    Assertions.assertEquals(input.getChildren(1).getRecurseField().getRecurseField(), parsedChildren2Overflow);
+  }
+
+  private Pair<Sample, GenericRecord> createInputOutputSampleWithWellKnownTypesNested(Schema schema) {

Review Comment:
   I see that this method populates only static values.
   A: can we generate random dynamic values. 
   B. If its cumbersome, can we declare static variables and reuse rather than generating it again and again. I assume this is called from many places. 



##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/ProtoConversionUtil.java:
##########
@@ -80,17 +82,19 @@ public static GenericRecord convertToAvro(Schema schema, Message message) {
    * 2. Convert directly from a protobuf {@link Message} to a {@link GenericRecord} while properly handling enums and wrapped primitives mentioned above.
    */
   private static class AvroSupport {
+    private static final Schema STRINGS = Schema.create(Schema.Type.STRING);
+    private static final Schema NULL = Schema.create(Schema.Type.NULL);

Review Comment:
   NULL_SCHEMA



##########
hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestProtoConversionUtil.java:
##########
@@ -35,24 +38,99 @@
 import com.google.protobuf.UInt64Value;
 import org.apache.avro.Schema;
 import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.generic.GenericDatumWriter;
 import org.apache.avro.generic.GenericRecord;
 import com.google.protobuf.util.Timestamps;
+import org.apache.avro.io.BinaryDecoder;
+import org.apache.avro.io.BinaryEncoder;
+import org.apache.avro.io.DecoderFactory;
+import org.apache.avro.io.EncoderFactory;
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.Test;
 
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.UncheckedIOException;
 import java.nio.ByteBuffer;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.Scanner;
 import java.util.function.Function;
 import java.util.stream.Collectors;
 
 public class TestProtoConversionUtil {
   @Test
-  public void allFieldsSet_wellKnownTypesAreNested() {
+  public void allFieldsSet_wellKnownTypesAreNested() throws IOException {
+    Schema.Parser parser = new Schema.Parser();
+    Schema convertedSchema = parser.parse(getClass().getClassLoader().getResourceAsStream("schema-provider/proto/sample_schema_nested.avsc"));
+    Pair<Sample, GenericRecord> inputAndOutput = createInputOutputSampleWithWellKnownTypesNested(convertedSchema);
+    GenericRecord actual = serializeAndDeserializeAvro(ProtoConversionUtil.convertToAvro(convertedSchema, inputAndOutput.getLeft()), convertedSchema);
+    Assertions.assertEquals(inputAndOutput.getRight(), actual);
+  }
+
+  @Test
+  public void noFieldsSet_wellKnownTypesAreNested() throws IOException {
+    Sample sample = Sample.newBuilder().build();
+    Schema.Parser parser = new Schema.Parser();
+    Schema convertedSchema = parser.parse(getClass().getClassLoader().getResourceAsStream("schema-provider/proto/sample_schema_nested.avsc"));
+    GenericRecord actual = serializeAndDeserializeAvro(ProtoConversionUtil.convertToAvro(convertedSchema, sample), convertedSchema);
+    Assertions.assertEquals(createDefaultOutputWithWellKnownTypesNested(convertedSchema), actual);
+  }
+
+  @Test
+  public void allFieldsSet_wellKnownTypesAreFlattened() throws IOException {
+    Schema.Parser parser = new Schema.Parser();
+    Schema convertedSchema = parser.parse(getClass().getClassLoader().getResourceAsStream("schema-provider/proto/sample_schema_flattened.avsc"));
+    Pair<Sample, GenericRecord> inputAndOutput = createInputOutputSampleWithWellKnownTypesFlattened(convertedSchema);
+    GenericRecord actual = serializeAndDeserializeAvro(ProtoConversionUtil.convertToAvro(convertedSchema, inputAndOutput.getLeft()), convertedSchema);
+    Assertions.assertEquals(inputAndOutput.getRight(), actual);
+  }
+
+  @Test
+  public void noFieldsSet_wellKnownTypesAreFlattened() throws IOException {
+    Sample sample = Sample.newBuilder().build();
+    Schema.Parser parser = new Schema.Parser();
+    Schema convertedSchema = parser.parse(getClass().getClassLoader().getResourceAsStream("schema-provider/proto/sample_schema_flattened.avsc"));
+    GenericRecord actual = serializeAndDeserializeAvro(ProtoConversionUtil.convertToAvro(convertedSchema, sample), convertedSchema);
+    Assertions.assertEquals(createDefaultOutputWithWellKnownTypesFlattened(convertedSchema), actual);
+  }
+
+  @Test
+  public void recursiveSchema_noOverflow() throws IOException {
+    Schema.Parser parser = new Schema.Parser();
+    Schema convertedSchema = parser.parse(getClass().getClassLoader().getResourceAsStream("schema-provider/proto/parent_schema_recursive.avsc"));
+    Pair<Parent, GenericRecord> inputAndOutput = createInputOutputForRecursiveSchemaNoOverflow(convertedSchema);
+    GenericRecord actual = serializeAndDeserializeAvro(ProtoConversionUtil.convertToAvro(convertedSchema, inputAndOutput.getLeft()), convertedSchema);
+    Assertions.assertEquals(inputAndOutput.getRight(), actual);
+  }
+
+  @Test
+  public void recursiveSchema_withOverflow() throws Exception {

Review Comment:
   do we have tests for diff values of maxRecursionDepth. would be good to add one.



##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/ProtoClassBasedSchemaProvider.java:
##########
@@ -38,27 +39,43 @@ public class ProtoClassBasedSchemaProvider extends SchemaProvider {
    * Configs supported.
    */
   public static class Config {
-    public static final String PROTO_SCHEMA_CLASS_NAME = "hoodie.deltastreamer.schemaprovider.proto.className";
-    public static final String PROTO_SCHEMA_FLATTEN_WRAPPED_PRIMITIVES = "hoodie.deltastreamer.schemaprovider.proto.flattenWrappers";
+    private static final String PREFIX = "hoodie.deltastreamer.schemaprovider.proto";
+    public static final ConfigProperty<String> PROTO_SCHEMA_CLASS_NAME = ConfigProperty.key(PREFIX + ".className")
+        .noDefaultValue()
+        .sinceVersion("0.13.0")
+        .withDocumentation("The Protobuf Message class used as the source for the schema.");
+
+    public static final ConfigProperty<Boolean> PROTO_SCHEMA_FLATTEN_WRAPPED_PRIMITIVES = ConfigProperty.key(PREFIX + ".flattenWrappers")
+        .defaultValue(false)
+        .sinceVersion("0.13.0")
+        .withDocumentation("When set to false wrapped primitives like Int64Value are translated to a record with a single 'value' field instead of simply a nullable value");
+    public static final ConfigProperty<Integer> PROTO_SCHEMA_MAX_RECURSION_DEPTH = ConfigProperty.key(PREFIX + ".maxRecursionDepth")

Review Comment:
   same here.



##########
hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestProtoConversionUtil.java:
##########
@@ -35,24 +38,99 @@
 import com.google.protobuf.UInt64Value;
 import org.apache.avro.Schema;
 import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.generic.GenericDatumWriter;
 import org.apache.avro.generic.GenericRecord;
 import com.google.protobuf.util.Timestamps;
+import org.apache.avro.io.BinaryDecoder;
+import org.apache.avro.io.BinaryEncoder;
+import org.apache.avro.io.DecoderFactory;
+import org.apache.avro.io.EncoderFactory;
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.Test;
 
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.UncheckedIOException;
 import java.nio.ByteBuffer;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.Scanner;
 import java.util.function.Function;
 import java.util.stream.Collectors;
 
 public class TestProtoConversionUtil {
   @Test
-  public void allFieldsSet_wellKnownTypesAreNested() {
+  public void allFieldsSet_wellKnownTypesAreNested() throws IOException {
+    Schema.Parser parser = new Schema.Parser();
+    Schema convertedSchema = parser.parse(getClass().getClassLoader().getResourceAsStream("schema-provider/proto/sample_schema_nested.avsc"));
+    Pair<Sample, GenericRecord> inputAndOutput = createInputOutputSampleWithWellKnownTypesNested(convertedSchema);
+    GenericRecord actual = serializeAndDeserializeAvro(ProtoConversionUtil.convertToAvro(convertedSchema, inputAndOutput.getLeft()), convertedSchema);
+    Assertions.assertEquals(inputAndOutput.getRight(), actual);
+  }
+
+  @Test
+  public void noFieldsSet_wellKnownTypesAreNested() throws IOException {
+    Sample sample = Sample.newBuilder().build();

Review Comment:
   createDefaultOutputWithWellKnownTypesNested and createDefaultOutputWithWellKnownTypesNested has code reuse opportunity. can you add a private method and reuse in both. 



##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/ProtoConversionUtil.java:
##########
@@ -80,17 +82,19 @@ public static GenericRecord convertToAvro(Schema schema, Message message) {
    * 2. Convert directly from a protobuf {@link Message} to a {@link GenericRecord} while properly handling enums and wrapped primitives mentioned above.
    */
   private static class AvroSupport {
+    private static final Schema STRINGS = Schema.create(Schema.Type.STRING);

Review Comment:
   minor. STRING_SCHEMA



##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/ProtoClassBasedSchemaProvider.java:
##########
@@ -38,27 +39,43 @@ public class ProtoClassBasedSchemaProvider extends SchemaProvider {
    * Configs supported.
    */
   public static class Config {
-    public static final String PROTO_SCHEMA_CLASS_NAME = "hoodie.deltastreamer.schemaprovider.proto.className";
-    public static final String PROTO_SCHEMA_FLATTEN_WRAPPED_PRIMITIVES = "hoodie.deltastreamer.schemaprovider.proto.flattenWrappers";
+    private static final String PREFIX = "hoodie.deltastreamer.schemaprovider.proto";
+    public static final ConfigProperty<String> PROTO_SCHEMA_CLASS_NAME = ConfigProperty.key(PREFIX + ".className")
+        .noDefaultValue()
+        .sinceVersion("0.13.0")
+        .withDocumentation("The Protobuf Message class used as the source for the schema.");
+
+    public static final ConfigProperty<Boolean> PROTO_SCHEMA_FLATTEN_WRAPPED_PRIMITIVES = ConfigProperty.key(PREFIX + ".flattenWrappers")

Review Comment:
   same comment as above. 
   `PREFIX + "flatten.wrappers"`



##########
hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestProtoConversionUtil.java:
##########
@@ -35,24 +38,99 @@
 import com.google.protobuf.UInt64Value;
 import org.apache.avro.Schema;
 import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.generic.GenericDatumWriter;
 import org.apache.avro.generic.GenericRecord;
 import com.google.protobuf.util.Timestamps;
+import org.apache.avro.io.BinaryDecoder;
+import org.apache.avro.io.BinaryEncoder;
+import org.apache.avro.io.DecoderFactory;
+import org.apache.avro.io.EncoderFactory;
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.Test;
 
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.UncheckedIOException;
 import java.nio.ByteBuffer;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.Scanner;
 import java.util.function.Function;
 import java.util.stream.Collectors;
 
 public class TestProtoConversionUtil {
   @Test
-  public void allFieldsSet_wellKnownTypesAreNested() {
+  public void allFieldsSet_wellKnownTypesAreNested() throws IOException {
+    Schema.Parser parser = new Schema.Parser();
+    Schema convertedSchema = parser.parse(getClass().getClassLoader().getResourceAsStream("schema-provider/proto/sample_schema_nested.avsc"));
+    Pair<Sample, GenericRecord> inputAndOutput = createInputOutputSampleWithWellKnownTypesNested(convertedSchema);
+    GenericRecord actual = serializeAndDeserializeAvro(ProtoConversionUtil.convertToAvro(convertedSchema, inputAndOutput.getLeft()), convertedSchema);
+    Assertions.assertEquals(inputAndOutput.getRight(), actual);
+  }
+
+  @Test
+  public void noFieldsSet_wellKnownTypesAreNested() throws IOException {
+    Sample sample = Sample.newBuilder().build();
+    Schema.Parser parser = new Schema.Parser();
+    Schema convertedSchema = parser.parse(getClass().getClassLoader().getResourceAsStream("schema-provider/proto/sample_schema_nested.avsc"));
+    GenericRecord actual = serializeAndDeserializeAvro(ProtoConversionUtil.convertToAvro(convertedSchema, sample), convertedSchema);
+    Assertions.assertEquals(createDefaultOutputWithWellKnownTypesNested(convertedSchema), actual);
+  }
+
+  @Test
+  public void allFieldsSet_wellKnownTypesAreFlattened() throws IOException {

Review Comment:
   same comment as above. 



##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/ProtoConversionUtil.java:
##########
@@ -141,24 +146,40 @@ private Schema getEnumSchema(Descriptors.EnumDescriptor enumDescriptor) {
       return Schema.createEnum(enumDescriptor.getName(), null, getNamespace(enumDescriptor.getFullName()), symbols);
     }
 
-    private Schema getMessageSchema(Descriptors.Descriptor descriptor, Map<Descriptors.Descriptor, Schema> seen, boolean flattenWrappedPrimitives) {
-      if (seen.containsKey(descriptor)) {
-        return seen.get(descriptor);
+    /**
+     * Translates a Proto Message descriptor into an Avro Schema
+     * @param descriptor the descriptor for the proto message
+     * @param recursionDepths a map of the descriptor to the number of times it has been encountered in this depth first traversal of the schema.
+     *                        This is used to cap the number of times we recurse on a schema.
+     * @param flattenWrappedPrimitives if true, treat wrapped primitives as nullable primitives, if false, treat them as proto messages
+     * @param path a string prefixed with the namespace of the original message being translated to avro and containing the current dot separated path tracking progress through the schema.
+     *             This value is used for a namespace when creating Avro records to avoid an error when reusing the same class name when unraveling a recursive schema.
+     * @param maxRecursionDepth the number of times to unravel a recursive proto schema before spilling the rest to bytes
+     * @return an avro schema
+     */
+    private Schema getMessageSchema(Descriptors.Descriptor descriptor, CopyOnWriteMap<Descriptors.Descriptor, Integer> recursionDepths, boolean flattenWrappedPrimitives, String path,
+                                    int maxRecursionDepth) {
+      // Parquet does not handle recursive schemas so we "unravel" the proto N levels
+      Integer currentRecursionCount = recursionDepths.getOrDefault(descriptor, 0);
+      if (currentRecursionCount >= maxRecursionDepth) {
+        return RECURSION_OVERFLOW_SCHEMA;
       }
-      Schema result = Schema.createRecord(descriptor.getName(), null,
-          getNamespace(descriptor.getFullName()), false);
+      // The current path is used as a namespace to avoid record name collisions within recursive schemas
+      Schema result = Schema.createRecord(descriptor.getName(), null, path, false);
 
-      seen.put(descriptor, result);
+      recursionDepths.put(descriptor, ++currentRecursionCount);
 
       List<Schema.Field> fields = new ArrayList<>(descriptor.getFields().size());
       for (Descriptors.FieldDescriptor f : descriptor.getFields()) {
-        fields.add(new Schema.Field(f.getName(), getFieldSchema(f, seen, flattenWrappedPrimitives), null, getDefault(f)));
+        // each branch of the schema traversal requires its own recursion depth tracking so copy the recursionDepths map
+        fields.add(new Schema.Field(f.getName(), getFieldSchema(f, new CopyOnWriteMap<>(recursionDepths), flattenWrappedPrimitives, path, maxRecursionDepth), null, getDefault(f)));

Review Comment:
   not too strong on the suggestion. will leave it to you to decide. 
   this may result in creating too many new hashmaps. 
   did you think of having just one hashmap, but modifying the key to prefix the path along w/ descriptor may be. 
   



##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/ProtoClassBasedSchemaProvider.java:
##########
@@ -38,27 +39,43 @@ public class ProtoClassBasedSchemaProvider extends SchemaProvider {
    * Configs supported.
    */
   public static class Config {
-    public static final String PROTO_SCHEMA_CLASS_NAME = "hoodie.deltastreamer.schemaprovider.proto.className";
-    public static final String PROTO_SCHEMA_FLATTEN_WRAPPED_PRIMITIVES = "hoodie.deltastreamer.schemaprovider.proto.flattenWrappers";
+    private static final String PREFIX = "hoodie.deltastreamer.schemaprovider.proto";

Review Comment:
   minor: `PREFIX` -> `PROTO_SCHEMA_PROVIDER_PREFIX`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org