You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by si...@apache.org on 2022/02/27 16:13:42 UTC
[hudi] branch master updated: [HUDI-3521] Fixing kakfa key and value serializer value type from class to string (#4919)
This is an automated email from the ASF dual-hosted git repository.
sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 2f99e84 [HUDI-3521] Fixing kakfa key and value serializer value type from class to string (#4919)
2f99e84 is described below
commit 2f99e8458ac3cd51000227fce90576e8e1e056be
Author: Sivabalan Narayanan <n....@gmail.com>
AuthorDate: Sun Feb 27 11:13:13 2022 -0500
[HUDI-3521] Fixing kakfa key and value serializer value type from class to string (#4919)
---
.../main/java/org/apache/hudi/utilities/sources/AvroKafkaSource.java | 4 ++--
.../main/java/org/apache/hudi/utilities/sources/JsonKafkaSource.java | 4 ++--
.../org/apache/hudi/utilities/sources/debezium/DebeziumSource.java | 4 ++--
3 files changed, 6 insertions(+), 6 deletions(-)
diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/AvroKafkaSource.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/AvroKafkaSource.java
index 2e4caa0..84c6fd8 100644
--- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/AvroKafkaSource.java
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/AvroKafkaSource.java
@@ -65,12 +65,12 @@ public class AvroKafkaSource extends AvroSource {
SchemaProvider schemaProvider, HoodieDeltaStreamerMetrics metrics) {
super(props, sparkContext, sparkSession, schemaProvider);
- props.put(NATIVE_KAFKA_KEY_DESERIALIZER_PROP, StringDeserializer.class);
+ props.put(NATIVE_KAFKA_KEY_DESERIALIZER_PROP, StringDeserializer.class.getName());
deserializerClassName = props.getString(DataSourceWriteOptions.KAFKA_AVRO_VALUE_DESERIALIZER_CLASS().key(),
DataSourceWriteOptions.KAFKA_AVRO_VALUE_DESERIALIZER_CLASS().defaultValue());
try {
- props.put(NATIVE_KAFKA_VALUE_DESERIALIZER_PROP, Class.forName(deserializerClassName));
+ props.put(NATIVE_KAFKA_VALUE_DESERIALIZER_PROP, Class.forName(deserializerClassName).getName());
if (deserializerClassName.equals(KafkaAvroSchemaDeserializer.class.getName())) {
if (schemaProvider == null) {
throw new HoodieIOException("SchemaProvider has to be set to use KafkaAvroSchemaDeserializer");
diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/JsonKafkaSource.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/JsonKafkaSource.java
index e8bd577..d6152a1 100644
--- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/JsonKafkaSource.java
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/JsonKafkaSource.java
@@ -52,8 +52,8 @@ public class JsonKafkaSource extends JsonSource {
SchemaProvider schemaProvider, HoodieDeltaStreamerMetrics metrics) {
super(properties, sparkContext, sparkSession, schemaProvider);
this.metrics = metrics;
- properties.put("key.deserializer", StringDeserializer.class);
- properties.put("value.deserializer", StringDeserializer.class);
+ properties.put("key.deserializer", StringDeserializer.class.getName());
+ properties.put("value.deserializer", StringDeserializer.class.getName());
offsetGen = new KafkaOffsetGen(properties);
}
diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/debezium/DebeziumSource.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/debezium/DebeziumSource.java
index 7018419..d9be692 100644
--- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/debezium/DebeziumSource.java
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/debezium/DebeziumSource.java
@@ -82,12 +82,12 @@ public abstract class DebeziumSource extends RowSource {
HoodieDeltaStreamerMetrics metrics) {
super(props, sparkContext, sparkSession, schemaProvider);
- props.put(NATIVE_KAFKA_KEY_DESERIALIZER_PROP, StringDeserializer.class);
+ props.put(NATIVE_KAFKA_KEY_DESERIALIZER_PROP, StringDeserializer.class.getName());
deserializerClassName = props.getString(DataSourceWriteOptions.KAFKA_AVRO_VALUE_DESERIALIZER_CLASS().key(),
DataSourceWriteOptions.KAFKA_AVRO_VALUE_DESERIALIZER_CLASS().defaultValue());
try {
- props.put(NATIVE_KAFKA_VALUE_DESERIALIZER_PROP, Class.forName(deserializerClassName));
+ props.put(NATIVE_KAFKA_VALUE_DESERIALIZER_PROP, Class.forName(deserializerClassName).getName());
} catch (ClassNotFoundException e) {
String error = "Could not load custom avro kafka deserializer: " + deserializerClassName;
LOG.error(error);