You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2020/08/20 15:31:54 UTC

[GitHub] [beam] iemejia commented on a change in pull request #12630: [BEAM-10759] Uses reader Avro schema to deserialize in KafkaIO

iemejia commented on a change in pull request #12630:
URL: https://github.com/apache/beam/pull/12630#discussion_r474070383



##########
File path: sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ConfluentSchemaRegistryDeserializerProvider.java
##########
@@ -91,7 +91,10 @@
             .put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, schemaRegistryUrl)
             .build();
     Deserializer<T> deserializer =
-        (Deserializer<T>) new KafkaAvroDeserializer(getSchemaRegistryClient());
+        (Deserializer<T>)
+            new ConfluentSchemaRegistryDeserializer(
+                getSchemaRegistryClient(),
+                new Schema.Parser().parse(getSchemaMetadata().getSchema()));

Review comment:
       This same line is used in the Coder part for a similar purpose, can we extract maybe this as a private method and reuse?
   ```
   private Schema getAvroSchema() { return new Schema.Parser().parse(getSchemaMetadata().getSchema()); }
   ``
   

##########
File path: sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/ConfluentSchemaRegistryDeserializerProviderTest.java
##########
@@ -45,7 +45,8 @@ public void testGetCoder() {
     assertEquals(AVRO_SCHEMA, coderV0.getSchema());
 
     try {
-      Integer version = mockRegistryClient.register(subject, AVRO_SCHEMA_V1);
+      mockRegistryClient.register(subject, AVRO_SCHEMA_V1);

Review comment:
       How is this different? Or better how is this testing that the good schema is chosen? Maybe we can add a specific test for that?

##########
File path: sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ConfluentSchemaRegistryDeserializerProvider.java
##########
@@ -116,3 +119,17 @@ private SchemaRegistryClient getSchemaRegistryClient() {
     return this.schemaRegistryClientProviderFn.apply(null);
   }
 }
+
+class ConfluentSchemaRegistryDeserializer extends KafkaAvroDeserializer {
+  Schema readerSchema;
+
+  ConfluentSchemaRegistryDeserializer(SchemaRegistryClient client, Schema readerSchema) {
+    this.schemaRegistry = client;

Review comment:
       super(client);




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org