You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by si...@apache.org on 2022/02/27 16:13:42 UTC

[hudi] branch master updated: [HUDI-3521] Fixing kakfa key and value serializer value type from class to string (#4919)

This is an automated email from the ASF dual-hosted git repository.

sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 2f99e84  [HUDI-3521] Fixing kakfa key and value serializer value type from class to string (#4919)
2f99e84 is described below

commit 2f99e8458ac3cd51000227fce90576e8e1e056be
Author: Sivabalan Narayanan <n....@gmail.com>
AuthorDate: Sun Feb 27 11:13:13 2022 -0500

    [HUDI-3521] Fixing kakfa key and value serializer value type from class to string (#4919)
---
 .../main/java/org/apache/hudi/utilities/sources/AvroKafkaSource.java  | 4 ++--
 .../main/java/org/apache/hudi/utilities/sources/JsonKafkaSource.java  | 4 ++--
 .../org/apache/hudi/utilities/sources/debezium/DebeziumSource.java    | 4 ++--
 3 files changed, 6 insertions(+), 6 deletions(-)

diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/AvroKafkaSource.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/AvroKafkaSource.java
index 2e4caa0..84c6fd8 100644
--- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/AvroKafkaSource.java
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/AvroKafkaSource.java
@@ -65,12 +65,12 @@ public class AvroKafkaSource extends AvroSource {
       SchemaProvider schemaProvider, HoodieDeltaStreamerMetrics metrics) {
     super(props, sparkContext, sparkSession, schemaProvider);
 
-    props.put(NATIVE_KAFKA_KEY_DESERIALIZER_PROP, StringDeserializer.class);
+    props.put(NATIVE_KAFKA_KEY_DESERIALIZER_PROP, StringDeserializer.class.getName());
     deserializerClassName = props.getString(DataSourceWriteOptions.KAFKA_AVRO_VALUE_DESERIALIZER_CLASS().key(),
             DataSourceWriteOptions.KAFKA_AVRO_VALUE_DESERIALIZER_CLASS().defaultValue());
 
     try {
-      props.put(NATIVE_KAFKA_VALUE_DESERIALIZER_PROP, Class.forName(deserializerClassName));
+      props.put(NATIVE_KAFKA_VALUE_DESERIALIZER_PROP, Class.forName(deserializerClassName).getName());
       if (deserializerClassName.equals(KafkaAvroSchemaDeserializer.class.getName())) {
         if (schemaProvider == null) {
           throw new HoodieIOException("SchemaProvider has to be set to use KafkaAvroSchemaDeserializer");
diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/JsonKafkaSource.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/JsonKafkaSource.java
index e8bd577..d6152a1 100644
--- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/JsonKafkaSource.java
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/JsonKafkaSource.java
@@ -52,8 +52,8 @@ public class JsonKafkaSource extends JsonSource {
                          SchemaProvider schemaProvider, HoodieDeltaStreamerMetrics metrics) {
     super(properties, sparkContext, sparkSession, schemaProvider);
     this.metrics = metrics;
-    properties.put("key.deserializer", StringDeserializer.class);
-    properties.put("value.deserializer", StringDeserializer.class);
+    properties.put("key.deserializer", StringDeserializer.class.getName());
+    properties.put("value.deserializer", StringDeserializer.class.getName());
     offsetGen = new KafkaOffsetGen(properties);
   }
 
diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/debezium/DebeziumSource.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/debezium/DebeziumSource.java
index 7018419..d9be692 100644
--- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/debezium/DebeziumSource.java
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/debezium/DebeziumSource.java
@@ -82,12 +82,12 @@ public abstract class DebeziumSource extends RowSource {
                         HoodieDeltaStreamerMetrics metrics) {
     super(props, sparkContext, sparkSession, schemaProvider);
 
-    props.put(NATIVE_KAFKA_KEY_DESERIALIZER_PROP, StringDeserializer.class);
+    props.put(NATIVE_KAFKA_KEY_DESERIALIZER_PROP, StringDeserializer.class.getName());
     deserializerClassName = props.getString(DataSourceWriteOptions.KAFKA_AVRO_VALUE_DESERIALIZER_CLASS().key(),
         DataSourceWriteOptions.KAFKA_AVRO_VALUE_DESERIALIZER_CLASS().defaultValue());
 
     try {
-      props.put(NATIVE_KAFKA_VALUE_DESERIALIZER_PROP, Class.forName(deserializerClassName));
+      props.put(NATIVE_KAFKA_VALUE_DESERIALIZER_PROP, Class.forName(deserializerClassName).getName());
     } catch (ClassNotFoundException e) {
       String error = "Could not load custom avro kafka deserializer: " + deserializerClassName;
       LOG.error(error);