You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by GitBox <gi...@apache.org> on 2020/04/25 15:27:33 UTC

[GitHub] [incubator-hudi] pratyakshsharma opened a new pull request #1562: [HUDI-837]: implemented custom deserializer for AvroKafkaSource

pratyakshsharma opened a new pull request #1562:
URL: https://github.com/apache/incubator-hudi/pull/1562


   ## *Tips*
   - *Thank you very much for contributing to Apache Hudi.*
   - *Please review https://hudi.apache.org/contributing.html before opening a pull request.*
   
   ## What is the purpose of the pull request
   
    - When we read data from Kafka, we want to always read with the latest schema.
    - This allows us to make assumption throughout the rest of the pipeline that every record has the same schema.
    - We create a custom KafkaAvroDecoder that use the latest schema as read schema.
    - This does not work with all SchemaProvider yet.
   
   ## Brief change log
   
   - Implemented HoodieAvroKafkaDeserializer for supplying readerSchema as per user's need.
   - Introduced a property to configure "value.deserializer" property for AvroKafkaSource. 
   
   ## Verify this pull request
   
   *(Please pick either of the following options)*
   
   This pull request is a trivial rework / code cleanup without any test coverage.
   
   *(or)*
   
   This pull request is already covered by existing tests, such as *(please describe tests)*.
   
   (or)
   
   This change added tests and can be verified as follows:
   
   *(example:)*
   
     - *Added integration tests for end-to-end.*
     - *Added HoodieClientWriteTest to verify the change.*
     - *Manually verified the change by running a job locally.*
   
   ## Committer checklist
   
    - [ ] Has a corresponding JIRA in PR title & commit
    
    - [ ] Commit message is descriptive of the change
    
    - [ ] CI is green
   
    - [ ] Necessary doc changes done or have another open PR
          
    - [ ] For large changes, please consider breaking it into sub-tasks under an umbrella JIRA.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-hudi] pratyakshsharma commented on a change in pull request #1562: [HUDI-837]: implemented custom deserializer for AvroKafkaSource

Posted by GitBox <gi...@apache.org>.
pratyakshsharma commented on a change in pull request #1562:
URL: https://github.com/apache/incubator-hudi/pull/1562#discussion_r425118446



##########
File path: hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/serde/HoodieAvroKafkaDeserializer.java
##########
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities.sources.serde;
+
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.utilities.UtilHelpers;
+import org.apache.hudi.utilities.schema.SchemaProvider;
+
+import io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer;
+import io.confluent.kafka.serializers.KafkaAvroDeserializerConfig;
+import kafka.serializer.Decoder;
+import kafka.utils.VerifiableProperties;
+import org.apache.avro.Schema;
+import org.apache.kafka.common.errors.SerializationException;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Properties;
+
+/**
+ * This is a custom implementation of kafka.serializer.Decoder<T> which aims at deserializing all the incoming messages
+ * with same schema (which is latest).
+ */
+public class HoodieAvroKafkaDeserializer extends AbstractKafkaAvroDeserializer implements Decoder<Object> {
+
+  private final Schema sourceSchema;
+  private static final String SCHEMA_PROVIDER_CLASS_PROP = "hoodie.deltastreamer.schemaprovider.class";
+
+  public HoodieAvroKafkaDeserializer(VerifiableProperties properties) {
+    this.configure(new KafkaAvroDeserializerConfig(properties.props()));
+    TypedProperties typedProperties = new TypedProperties();
+    copyProperties(typedProperties, properties.props());
+    try {
+      SchemaProvider schemaProvider = UtilHelpers.createSchemaProvider(
+          typedProperties.getString(SCHEMA_PROVIDER_CLASS_PROP), typedProperties);
+      sourceSchema = Objects.requireNonNull(schemaProvider).getSourceSchema();
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  @Override
+  public Object fromBytes(byte[] bytes) {
+    return deserialize(bytes);
+  }
+
+  @Override
+  protected Object deserialize(boolean includeSchemaAndVersion, String topic, Boolean isKey, byte[] payload, Schema readerSchema) throws SerializationException {
+    if (readerSchema != null) {
+      return super.deserialize(includeSchemaAndVersion, topic, isKey, payload, readerSchema);
+    }

Review comment:
       Actually AbstractKafkaAvroDeserializer has a function defined with the below signature - 
   
   protected Object deserialize(byte[] payload, Schema readerSchema) throws SerializationException
   
   To keep using similar contracts, I added this check here. 
   However I checked and found out the above function only gets called from test cases. So we can remove the check and simply use sourceSchema always. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-hudi] vinothchandar commented on pull request #1562: [HUDI-837]: implemented custom deserializer for AvroKafkaSource

Posted by GitBox <gi...@apache.org>.
vinothchandar commented on pull request #1562:
URL: https://github.com/apache/incubator-hudi/pull/1562#issuecomment-619409517


   @afilipchik @umehrot2   help review this? :)


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-hudi] vinothchandar commented on pull request #1562: [HUDI-837]: implemented custom deserializer for AvroKafkaSource

Posted by GitBox <gi...@apache.org>.
vinothchandar commented on pull request #1562:
URL: https://github.com/apache/incubator-hudi/pull/1562#issuecomment-628899382


   @n3nash can you review this and take it home? 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-hudi] pratyakshsharma commented on a change in pull request #1562: [HUDI-837]: implemented custom deserializer for AvroKafkaSource

Posted by GitBox <gi...@apache.org>.
pratyakshsharma commented on a change in pull request #1562:
URL: https://github.com/apache/incubator-hudi/pull/1562#discussion_r426142613



##########
File path: hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/AvroKafkaSource.java
##########
@@ -45,11 +46,14 @@
 
   private final KafkaOffsetGen offsetGen;
 
+  private final String useCustomDeserializerProp = "hoodie.deltastreamer.kafka.custom.avro.deserializer";

Review comment:
       Done.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-hudi] pratyakshsharma commented on a change in pull request #1562: [HUDI-837]: implemented custom deserializer for AvroKafkaSource

Posted by GitBox <gi...@apache.org>.
pratyakshsharma commented on a change in pull request #1562:
URL: https://github.com/apache/incubator-hudi/pull/1562#discussion_r425118717



##########
File path: hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/serde/HoodieAvroKafkaDeserializer.java
##########
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities.sources.serde;
+
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.utilities.UtilHelpers;
+import org.apache.hudi.utilities.schema.SchemaProvider;
+
+import io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer;
+import io.confluent.kafka.serializers.KafkaAvroDeserializerConfig;
+import kafka.serializer.Decoder;
+import kafka.utils.VerifiableProperties;
+import org.apache.avro.Schema;
+import org.apache.kafka.common.errors.SerializationException;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Properties;
+
+/**
+ * This is a custom implementation of kafka.serializer.Decoder<T> which aims at deserializing all the incoming messages
+ * with same schema (which is latest).
+ */
+public class HoodieAvroKafkaDeserializer extends AbstractKafkaAvroDeserializer implements Decoder<Object> {

Review comment:
       @afilipchik 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-hudi] vinothchandar commented on a change in pull request #1562: [HUDI-837]: implemented custom deserializer for AvroKafkaSource

Posted by GitBox <gi...@apache.org>.
vinothchandar commented on a change in pull request #1562:
URL: https://github.com/apache/incubator-hudi/pull/1562#discussion_r425421081



##########
File path: hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/AvroKafkaSource.java
##########
@@ -45,11 +46,14 @@
 
   private final KafkaOffsetGen offsetGen;
 
+  private final String useCustomDeserializerProp = "hoodie.deltastreamer.kafka.custom.avro.deserializer";

Review comment:
       static string, all caps variable name? 

##########
File path: hudi-utilities/src/main/java/org/apache/hudi/utilities/UtilHelpers.java
##########
@@ -106,6 +106,16 @@ public static SchemaProvider createSchemaProvider(String schemaProviderClass, Ty
     }
   }
 
+  public static SchemaProvider createSchemaProvider(String schemaProviderClass,
+                                                    TypedProperties cfg) throws IOException {
+    try {
+      return schemaProviderClass == null ? null :
+        (SchemaProvider) ReflectionUtils.loadClass(schemaProviderClass, cfg);
+    } catch (Throwable e) {
+      throw new IOException("Could not load schema provider class " + schemaProviderClass, e);

Review comment:
       throw HoodieException instead? 

##########
File path: hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/AvroKafkaSource.java
##########
@@ -23,6 +23,7 @@
 import org.apache.hudi.utilities.schema.SchemaProvider;
 import org.apache.hudi.utilities.sources.helpers.KafkaOffsetGen;
 import org.apache.hudi.utilities.sources.helpers.KafkaOffsetGen.CheckpointUtils;
+import org.apache.hudi.utilities.sources.serde.HoodieAvroKafkaDeserializer;
 
 import io.confluent.kafka.serializers.KafkaAvroDeserializer;

Review comment:
       @pratyakshsharma I thought this is what this deserializer does. no? 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-hudi] pratyakshsharma commented on a change in pull request #1562: [HUDI-837]: implemented custom deserializer for AvroKafkaSource

Posted by GitBox <gi...@apache.org>.
pratyakshsharma commented on a change in pull request #1562:
URL: https://github.com/apache/incubator-hudi/pull/1562#discussion_r426141784



##########
File path: hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/AvroKafkaSource.java
##########
@@ -23,6 +23,7 @@
 import org.apache.hudi.utilities.schema.SchemaProvider;
 import org.apache.hudi.utilities.sources.helpers.KafkaOffsetGen;
 import org.apache.hudi.utilities.sources.helpers.KafkaOffsetGen.CheckpointUtils;
+import org.apache.hudi.utilities.sources.serde.HoodieAvroKafkaDeserializer;
 
 import io.confluent.kafka.serializers.KafkaAvroDeserializer;

Review comment:
       No, it reads the encoded schema id from the byte buffer and simply uses that schema for reading. It does not take into account readerSchema.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-hudi] vinothchandar commented on a change in pull request #1562: [HUDI-837]: implemented custom deserializer for AvroKafkaSource

Posted by GitBox <gi...@apache.org>.
vinothchandar commented on a change in pull request #1562:
URL: https://github.com/apache/incubator-hudi/pull/1562#discussion_r425429344



##########
File path: hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/serde/HoodieAvroKafkaDeserializer.java
##########
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities.sources.serde;
+
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.utilities.UtilHelpers;
+import org.apache.hudi.utilities.schema.SchemaProvider;
+
+import io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer;
+import io.confluent.kafka.serializers.KafkaAvroDeserializerConfig;
+import kafka.serializer.Decoder;
+import kafka.utils.VerifiableProperties;
+import org.apache.avro.Schema;
+import org.apache.kafka.common.errors.SerializationException;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Properties;
+
+/**
+ * This is a custom implementation of kafka.serializer.Decoder<T> which aims at deserializing all the incoming messages
+ * with same schema (which is latest).
+ */
+public class HoodieAvroKafkaDeserializer extends AbstractKafkaAvroDeserializer implements Decoder<Object> {

Review comment:
       the source itself has to be renamed then.. let's keep this class named consistently with the source class in Hudi and that should be fine.. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-hudi] pratyakshsharma commented on pull request #1562: [HUDI-837]: implemented custom deserializer for AvroKafkaSource

Posted by GitBox <gi...@apache.org>.
pratyakshsharma commented on pull request #1562:
URL: https://github.com/apache/incubator-hudi/pull/1562#issuecomment-629701009


   @n3nash please take a look. It is good to merge now. All the comments are addressed. 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-hudi] pratyakshsharma commented on a change in pull request #1562: [HUDI-837]: implemented custom deserializer for AvroKafkaSource

Posted by GitBox <gi...@apache.org>.
pratyakshsharma commented on a change in pull request #1562:
URL: https://github.com/apache/incubator-hudi/pull/1562#discussion_r416563158



##########
File path: hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/serde/HoodieAvroKafkaDeserializer.java
##########
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities.sources.serde;
+
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.utilities.UtilHelpers;
+import org.apache.hudi.utilities.schema.SchemaProvider;
+
+import io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer;
+import io.confluent.kafka.serializers.KafkaAvroDeserializerConfig;
+import kafka.serializer.Decoder;
+import kafka.utils.VerifiableProperties;
+import org.apache.avro.Schema;
+import org.apache.kafka.common.errors.SerializationException;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Properties;
+
+/**
+ * This is a custom implementation of kafka.serializer.Decoder<T> which aims at deserializing all the incoming messages
+ * with same schema (which is latest).
+ */
+public class HoodieAvroKafkaDeserializer extends AbstractKafkaAvroDeserializer implements Decoder<Object> {
+
+  private final Schema sourceSchema;
+  private static final String SCHEMA_PROVIDER_CLASS_PROP = "hoodie.deltastreamer.schemaprovider.class";
+
+  public HoodieAvroKafkaDeserializer(VerifiableProperties properties) {
+    this.configure(new KafkaAvroDeserializerConfig(properties.props()));
+    TypedProperties typedProperties = new TypedProperties();
+    copyProperties(typedProperties, properties.props());
+    try {
+      SchemaProvider schemaProvider = UtilHelpers.createSchemaProvider(

Review comment:
       > also, in general, it is safer to deserialize avro using both writer's and reader's schema
   
   Yes the flow here uses both of them for deserializing. :) 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-hudi] pratyakshsharma commented on a change in pull request #1562: [HUDI-837]: implemented custom deserializer for AvroKafkaSource

Posted by GitBox <gi...@apache.org>.
pratyakshsharma commented on a change in pull request #1562:
URL: https://github.com/apache/incubator-hudi/pull/1562#discussion_r425112675



##########
File path: hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/serde/HoodieAvroKafkaDeserializer.java
##########
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities.sources.serde;
+
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.utilities.UtilHelpers;
+import org.apache.hudi.utilities.schema.SchemaProvider;
+
+import io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer;
+import io.confluent.kafka.serializers.KafkaAvroDeserializerConfig;
+import kafka.serializer.Decoder;
+import kafka.utils.VerifiableProperties;
+import org.apache.avro.Schema;
+import org.apache.kafka.common.errors.SerializationException;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Properties;
+
+/**
+ * This is a custom implementation of kafka.serializer.Decoder<T> which aims at deserializing all the incoming messages
+ * with same schema (which is latest).
+ */
+public class HoodieAvroKafkaDeserializer extends AbstractKafkaAvroDeserializer implements Decoder<Object> {

Review comment:
       You mean the class name should have Confluent?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-hudi] pratyakshsharma commented on pull request #1562: [HUDI-837]: implemented custom deserializer for AvroKafkaSource

Posted by GitBox <gi...@apache.org>.
pratyakshsharma commented on pull request #1562:
URL: https://github.com/apache/incubator-hudi/pull/1562#issuecomment-619396502


   Thinking of writing test cases for this, but unable to simulate because AbstractKafkaAvroDeserializer expects a working schema-registry url. Not sure of how to mock the same here since it is library class. 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-hudi] afilipchik commented on a change in pull request #1562: [HUDI-837]: implemented custom deserializer for AvroKafkaSource

Posted by GitBox <gi...@apache.org>.
afilipchik commented on a change in pull request #1562:
URL: https://github.com/apache/incubator-hudi/pull/1562#discussion_r416350367



##########
File path: hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/serde/HoodieAvroKafkaDeserializer.java
##########
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities.sources.serde;
+
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.utilities.UtilHelpers;
+import org.apache.hudi.utilities.schema.SchemaProvider;
+
+import io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer;
+import io.confluent.kafka.serializers.KafkaAvroDeserializerConfig;
+import kafka.serializer.Decoder;
+import kafka.utils.VerifiableProperties;
+import org.apache.avro.Schema;
+import org.apache.kafka.common.errors.SerializationException;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Properties;
+
+/**
+ * This is a custom implementation of kafka.serializer.Decoder<T> which aims at deserializing all the incoming messages
+ * with same schema (which is latest).
+ */
+public class HoodieAvroKafkaDeserializer extends AbstractKafkaAvroDeserializer implements Decoder<Object> {
+
+  private final Schema sourceSchema;
+  private static final String SCHEMA_PROVIDER_CLASS_PROP = "hoodie.deltastreamer.schemaprovider.class";
+
+  public HoodieAvroKafkaDeserializer(VerifiableProperties properties) {
+    this.configure(new KafkaAvroDeserializerConfig(properties.props()));
+    TypedProperties typedProperties = new TypedProperties();
+    copyProperties(typedProperties, properties.props());
+    try {
+      SchemaProvider schemaProvider = UtilHelpers.createSchemaProvider(

Review comment:
       also, in general, it is safer to deserialize avro using both writer's and reader's schema

##########
File path: hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/serde/HoodieAvroKafkaDeserializer.java
##########
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities.sources.serde;
+
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.utilities.UtilHelpers;
+import org.apache.hudi.utilities.schema.SchemaProvider;
+
+import io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer;
+import io.confluent.kafka.serializers.KafkaAvroDeserializerConfig;
+import kafka.serializer.Decoder;
+import kafka.utils.VerifiableProperties;
+import org.apache.avro.Schema;
+import org.apache.kafka.common.errors.SerializationException;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Properties;
+
+/**
+ * This is a custom implementation of kafka.serializer.Decoder<T> which aims at deserializing all the incoming messages
+ * with same schema (which is latest).
+ */
+public class HoodieAvroKafkaDeserializer extends AbstractKafkaAvroDeserializer implements Decoder<Object> {
+
+  private final Schema sourceSchema;
+  private static final String SCHEMA_PROVIDER_CLASS_PROP = "hoodie.deltastreamer.schemaprovider.class";
+
+  public HoodieAvroKafkaDeserializer(VerifiableProperties properties) {
+    this.configure(new KafkaAvroDeserializerConfig(properties.props()));
+    TypedProperties typedProperties = new TypedProperties();
+    copyProperties(typedProperties, properties.props());
+    try {
+      SchemaProvider schemaProvider = UtilHelpers.createSchemaProvider(

Review comment:
       how will it guarantee that schema is latest? Is it loaded before or after offsets are determined?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-hudi] pratyakshsharma commented on a change in pull request #1562: [HUDI-837]: implemented custom deserializer for AvroKafkaSource

Posted by GitBox <gi...@apache.org>.
pratyakshsharma commented on a change in pull request #1562:
URL: https://github.com/apache/incubator-hudi/pull/1562#discussion_r416527261



##########
File path: hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/serde/HoodieAvroKafkaDeserializer.java
##########
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities.sources.serde;
+
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.utilities.UtilHelpers;
+import org.apache.hudi.utilities.schema.SchemaProvider;
+
+import io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer;
+import io.confluent.kafka.serializers.KafkaAvroDeserializerConfig;
+import kafka.serializer.Decoder;
+import kafka.utils.VerifiableProperties;
+import org.apache.avro.Schema;
+import org.apache.kafka.common.errors.SerializationException;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Properties;
+
+/**
+ * This is a custom implementation of kafka.serializer.Decoder<T> which aims at deserializing all the incoming messages
+ * with same schema (which is latest).
+ */
+public class HoodieAvroKafkaDeserializer extends AbstractKafkaAvroDeserializer implements Decoder<Object> {
+
+  private final Schema sourceSchema;
+  private static final String SCHEMA_PROVIDER_CLASS_PROP = "hoodie.deltastreamer.schemaprovider.class";
+
+  public HoodieAvroKafkaDeserializer(VerifiableProperties properties) {
+    this.configure(new KafkaAvroDeserializerConfig(properties.props()));
+    TypedProperties typedProperties = new TypedProperties();
+    copyProperties(typedProperties, properties.props());
+    try {
+      SchemaProvider schemaProvider = UtilHelpers.createSchemaProvider(

Review comment:
       As per my understanding, Deserializer comes into picture after offsets have been determined since it will be called internally by kafka. So whatever sourceSchema we get at this point of time, that is going to be the latest for the incoming batch. Please correct me if I am missing something here. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-hudi] pratyakshsharma commented on pull request #1562: [HUDI-837]: implemented custom deserializer for AvroKafkaSource

Posted by GitBox <gi...@apache.org>.
pratyakshsharma commented on pull request #1562:
URL: https://github.com/apache/incubator-hudi/pull/1562#issuecomment-628625011


   @vinothchandar @afilipchik Let us close this? :) 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-hudi] vinothchandar commented on pull request #1562: [HUDI-837]: implemented custom deserializer for AvroKafkaSource

Posted by GitBox <gi...@apache.org>.
vinothchandar commented on pull request #1562:
URL: https://github.com/apache/incubator-hudi/pull/1562#issuecomment-620199838


   @afilipchik interested in reviewing this?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-hudi] pratyakshsharma commented on a change in pull request #1562: [HUDI-837]: implemented custom deserializer for AvroKafkaSource

Posted by GitBox <gi...@apache.org>.
pratyakshsharma commented on a change in pull request #1562:
URL: https://github.com/apache/incubator-hudi/pull/1562#discussion_r426142657



##########
File path: hudi-utilities/src/main/java/org/apache/hudi/utilities/UtilHelpers.java
##########
@@ -106,6 +106,16 @@ public static SchemaProvider createSchemaProvider(String schemaProviderClass, Ty
     }
   }
 
+  public static SchemaProvider createSchemaProvider(String schemaProviderClass,
+                                                    TypedProperties cfg) throws IOException {
+    try {
+      return schemaProviderClass == null ? null :
+        (SchemaProvider) ReflectionUtils.loadClass(schemaProviderClass, cfg);
+    } catch (Throwable e) {
+      throw new IOException("Could not load schema provider class " + schemaProviderClass, e);

Review comment:
       Done. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-hudi] pratyakshsharma commented on a change in pull request #1562: [HUDI-837]: implemented custom deserializer for AvroKafkaSource

Posted by GitBox <gi...@apache.org>.
pratyakshsharma commented on a change in pull request #1562:
URL: https://github.com/apache/incubator-hudi/pull/1562#discussion_r425113606



##########
File path: hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/AvroKafkaSource.java
##########
@@ -45,11 +46,14 @@
 
   private final KafkaOffsetGen offsetGen;
 
+  private final String useCustomDeserializerProp = "hoodie.deltastreamer.kafka.custom.avro.deserializer";
+
   public AvroKafkaSource(TypedProperties props, JavaSparkContext sparkContext, SparkSession sparkSession,
       SchemaProvider schemaProvider) {
     super(props, sparkContext, sparkSession, schemaProvider);
+    boolean useCustomDeserializer = props.getBoolean(useCustomDeserializerProp, false);
     props.put("key.deserializer", StringDeserializer.class);
-    props.put("value.deserializer", KafkaAvroDeserializer.class);
+    props.put("value.deserializer", useCustomDeserializer ? HoodieAvroKafkaDeserializer.class : KafkaAvroDeserializer.class);

Review comment:
       Yeah, I mean historically we have used confluent schema-registry integration with hudi. :)




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-hudi] pratyakshsharma commented on a change in pull request #1562: [HUDI-837]: implemented custom deserializer for AvroKafkaSource

Posted by GitBox <gi...@apache.org>.
pratyakshsharma commented on a change in pull request #1562:
URL: https://github.com/apache/incubator-hudi/pull/1562#discussion_r425119279



##########
File path: hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/serde/HoodieAvroKafkaDeserializer.java
##########
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities.sources.serde;
+
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.utilities.UtilHelpers;
+import org.apache.hudi.utilities.schema.SchemaProvider;
+
+import io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer;
+import io.confluent.kafka.serializers.KafkaAvroDeserializerConfig;
+import kafka.serializer.Decoder;
+import kafka.utils.VerifiableProperties;
+import org.apache.avro.Schema;
+import org.apache.kafka.common.errors.SerializationException;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Properties;
+
+/**
+ * This is a custom implementation of kafka.serializer.Decoder<T> which aims at deserializing all the incoming messages
+ * with same schema (which is latest).
+ */
+public class HoodieAvroKafkaDeserializer extends AbstractKafkaAvroDeserializer implements Decoder<Object> {
+
+  private final Schema sourceSchema;
+  private static final String SCHEMA_PROVIDER_CLASS_PROP = "hoodie.deltastreamer.schemaprovider.class";
+
+  public HoodieAvroKafkaDeserializer(VerifiableProperties properties) {
+    this.configure(new KafkaAvroDeserializerConfig(properties.props()));
+    TypedProperties typedProperties = new TypedProperties();
+    copyProperties(typedProperties, properties.props());
+    try {
+      SchemaProvider schemaProvider = UtilHelpers.createSchemaProvider(
+          typedProperties.getString(SCHEMA_PROVIDER_CLASS_PROP), typedProperties);
+      sourceSchema = Objects.requireNonNull(schemaProvider).getSourceSchema();
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  @Override
+  public Object fromBytes(byte[] bytes) {
+    return deserialize(bytes);
+  }
+
+  @Override
+  protected Object deserialize(boolean includeSchemaAndVersion, String topic, Boolean isKey, byte[] payload, Schema readerSchema) throws SerializationException {
+    if (readerSchema != null) {
+      return super.deserialize(includeSchemaAndVersion, topic, isKey, payload, readerSchema);
+    }

Review comment:
       Will change it. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-hudi] vinothchandar commented on pull request #1562: [HUDI-837]: implemented custom deserializer for AvroKafkaSource

Posted by GitBox <gi...@apache.org>.
vinothchandar commented on pull request #1562:
URL: https://github.com/apache/incubator-hudi/pull/1562#issuecomment-628885456


   > Not sure of how to mock the same here since it is library class.
   
   We can just mock the response it will send into a test SchemaProvider.. We need not mock SchemaRegistry itself 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-hudi] pratyakshsharma commented on pull request #1562: [HUDI-837]: implemented custom deserializer for AvroKafkaSource

Posted by GitBox <gi...@apache.org>.
pratyakshsharma commented on pull request #1562:
URL: https://github.com/apache/incubator-hudi/pull/1562#issuecomment-631763979


   @n3nash got a chance to look at this? :)


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-hudi] afilipchik commented on a change in pull request #1562: [HUDI-837]: implemented custom deserializer for AvroKafkaSource

Posted by GitBox <gi...@apache.org>.
afilipchik commented on a change in pull request #1562:
URL: https://github.com/apache/incubator-hudi/pull/1562#discussion_r419843893



##########
File path: hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/serde/HoodieAvroKafkaDeserializer.java
##########
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities.sources.serde;
+
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.utilities.UtilHelpers;
+import org.apache.hudi.utilities.schema.SchemaProvider;
+
+import io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer;
+import io.confluent.kafka.serializers.KafkaAvroDeserializerConfig;
+import kafka.serializer.Decoder;
+import kafka.utils.VerifiableProperties;
+import org.apache.avro.Schema;
+import org.apache.kafka.common.errors.SerializationException;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Properties;
+
+/**
+ * This is a custom implementation of kafka.serializer.Decoder<T> which aims at deserializing all the incoming messages
+ * with same schema (which is latest).
+ */
+public class HoodieAvroKafkaDeserializer extends AbstractKafkaAvroDeserializer implements Decoder<Object> {

Review comment:
       probably should be called Confluent... as it assumes that schema registry is from confluent.

##########
File path: hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/serde/HoodieAvroKafkaDeserializer.java
##########
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities.sources.serde;
+
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.utilities.UtilHelpers;
+import org.apache.hudi.utilities.schema.SchemaProvider;
+
+import io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer;
+import io.confluent.kafka.serializers.KafkaAvroDeserializerConfig;
+import kafka.serializer.Decoder;
+import kafka.utils.VerifiableProperties;
+import org.apache.avro.Schema;
+import org.apache.kafka.common.errors.SerializationException;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Properties;
+
+/**
+ * This is a custom implementation of kafka.serializer.Decoder<T> which aims at deserializing all the incoming messages
+ * with same schema (which is latest).
+ */
+public class HoodieAvroKafkaDeserializer extends AbstractKafkaAvroDeserializer implements Decoder<Object> {
+
+  private final Schema sourceSchema;
+  private static final String SCHEMA_PROVIDER_CLASS_PROP = "hoodie.deltastreamer.schemaprovider.class";
+
+  public HoodieAvroKafkaDeserializer(VerifiableProperties properties) {
+    this.configure(new KafkaAvroDeserializerConfig(properties.props()));
+    TypedProperties typedProperties = new TypedProperties();
+    copyProperties(typedProperties, properties.props());
+    try {
+      SchemaProvider schemaProvider = UtilHelpers.createSchemaProvider(

Review comment:
       Oh, right. We actually have internal implementation which is very similar to this one, except reader schema thing.

##########
File path: hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/serde/HoodieAvroKafkaDeserializer.java
##########
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities.sources.serde;
+
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.utilities.UtilHelpers;
+import org.apache.hudi.utilities.schema.SchemaProvider;
+
+import io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer;
+import io.confluent.kafka.serializers.KafkaAvroDeserializerConfig;
+import kafka.serializer.Decoder;
+import kafka.utils.VerifiableProperties;
+import org.apache.avro.Schema;
+import org.apache.kafka.common.errors.SerializationException;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Properties;
+
+/**
+ * This is a custom implementation of kafka.serializer.Decoder<T> which aims at deserializing all the incoming messages
+ * with same schema (which is latest).
+ */
+public class HoodieAvroKafkaDeserializer extends AbstractKafkaAvroDeserializer implements Decoder<Object> {
+
+  private final Schema sourceSchema;
+  private static final String SCHEMA_PROVIDER_CLASS_PROP = "hoodie.deltastreamer.schemaprovider.class";
+
+  public HoodieAvroKafkaDeserializer(VerifiableProperties properties) {
+    this.configure(new KafkaAvroDeserializerConfig(properties.props()));
+    TypedProperties typedProperties = new TypedProperties();
+    copyProperties(typedProperties, properties.props());
+    try {
+      SchemaProvider schemaProvider = UtilHelpers.createSchemaProvider(
+          typedProperties.getString(SCHEMA_PROVIDER_CLASS_PROP), typedProperties);
+      sourceSchema = Objects.requireNonNull(schemaProvider).getSourceSchema();
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  @Override
+  public Object fromBytes(byte[] bytes) {
+    return deserialize(bytes);
+  }
+
+  @Override
+  protected Object deserialize(boolean includeSchemaAndVersion, String topic, Boolean isKey, byte[] payload, Schema readerSchema) throws SerializationException {
+    if (readerSchema != null) {
+      return super.deserialize(includeSchemaAndVersion, topic, isKey, payload, readerSchema);
+    }

Review comment:
       What does this check do? Should we always use sourceSchema over reader schema if it is not null as if I specified source schema I expect it to be used? 
   
   What situation will have reader schema specified and preferred?

##########
File path: hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/AvroKafkaSource.java
##########
@@ -45,11 +46,14 @@
 
   private final KafkaOffsetGen offsetGen;
 
+  private final String useCustomDeserializerProp = "hoodie.deltastreamer.kafka.custom.avro.deserializer";
+
   public AvroKafkaSource(TypedProperties props, JavaSparkContext sparkContext, SparkSession sparkSession,
       SchemaProvider schemaProvider) {
     super(props, sparkContext, sparkSession, schemaProvider);
+    boolean useCustomDeserializer = props.getBoolean(useCustomDeserializerProp, false);
     props.put("key.deserializer", StringDeserializer.class);
-    props.put("value.deserializer", KafkaAvroDeserializer.class);
+    props.put("value.deserializer", useCustomDeserializer ? HoodieAvroKafkaDeserializer.class : KafkaAvroDeserializer.class);

Review comment:
       is it fair to assume confluent is default for avro?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org