You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2021/01/06 06:43:56 UTC

[GitHub] [flink] wuchong commented on a change in pull request #14530: [FLINK-20348][kafka] Make "schema-registry.subject" optional for Kafka sink with avro-confluent format

wuchong commented on a change in pull request #14530:
URL: https://github.com/apache/flink/pull/14530#discussion_r552393276



##########
File path: flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaOptions.java
##########
@@ -120,6 +122,12 @@ private KafkaOptions() {}
                                     + ValueFieldsStrategy.EXCEPT_KEY
                                     + "'.");
 
+    public static final ConfigOption<String> SCHEMA_REGISTRY_SUBJECT =

Review comment:
       private scope and String type is enough. 
   And please move it under all other options. 

##########
File path: flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaOptions.java
##########
@@ -698,6 +706,49 @@ private static boolean hasKafkaClientProperties(Map<String, String> tableOptions
         throw new TableException("Unknown value fields strategy:" + strategy);
     }
 
+    public static ReadableConfig completeAvroConfluentSubject(ReadableConfig tableOptions) {
+        Configuration configuration = (Configuration) tableOptions;
+        final String format = configuration.getString(FORMAT);
+        final String valueFormat = configuration.getString(VALUE_FORMAT);
+        final String avro = "avro-confluent";
+        final String debezium = "debezium-avro-confluent";
+        if (avro.equals(format)) {
+            completeKeyAvroConfulentSubject(configuration, avro, false);
+        } else if (debezium.equals(format)) {
+            completeKeyAvroConfulentSubject(configuration, debezium, false);
+        } else if (avro.equals(valueFormat)) {
+            completeKeyAvroConfulentSubject(configuration, avro, true);
+        } else if (debezium.equals(valueFormat)) {
+            completeKeyAvroConfulentSubject(configuration, debezium, true);
+        }
+        return configuration;
+    }
+
+    private static void completeKeyAvroConfulentSubject(

Review comment:
       We can make this method more generic, e.g. `autocompleteSchemaRegistrySubject(Configuration conf, String format)` and consider `format=?`, `key.format=?`, `value.format=?` in it. 

##########
File path: flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaOptions.java
##########
@@ -698,6 +706,49 @@ private static boolean hasKafkaClientProperties(Map<String, String> tableOptions
         throw new TableException("Unknown value fields strategy:" + strategy);
     }
 
+    public static ReadableConfig completeAvroConfluentSubject(ReadableConfig tableOptions) {
+        Configuration configuration = (Configuration) tableOptions;
+        final String format = configuration.getString(FORMAT);
+        final String valueFormat = configuration.getString(VALUE_FORMAT);
+        final String avro = "avro-confluent";
+        final String debezium = "debezium-avro-confluent";

Review comment:
       Make them as a static final variable. 

##########
File path: flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaOptionsTest.java
##########
@@ -173,6 +178,69 @@ public void testInvalidValueFormatProjection() {
         }
     }
 
+    @Test
+    public void testCompleteAvroConfluentSubject() {
+        final String avro = "avro-confluent";
+        final String debezium = "debezium-avro-confluent";
+
+        final Map<String, String> options = new HashMap<>();
+        options.put("connector", "kafka");
+        options.put("properties.bootstrap.servers", "localhost:9092");
+        options.put("topic", "myTopic");
+        options.put("format", avro);
+
+        options.put("key.format", avro);
+
+        String[] subject1 = getSubject(options, avro, false);
+        assertThat(subject1[0], equalTo("myTopic-value"));
+        assertThat(subject1[1], equalTo("myTopic-key"));
+
+        options.put("format", debezium);
+        options.put("key.format", debezium);
+
+        String[] subject2 = getSubject(options, debezium, false);
+        assertThat(subject2[0], equalTo("myTopic-value"));
+        assertThat(subject2[1], equalTo("myTopic-key"));
+
+        options.remove("format");
+        options.put("value.format", debezium);
+        options.put("value." + debezium + ".schema-registry.subject", "valueSubject");
+
+        String[] subject3 = getSubject(options, debezium, true);
+        assertThat(subject3[0], equalTo("valueSubject"));
+        assertThat(subject3[1], equalTo("myTopic-key"));
+
+        options.remove("key.format");
+
+        String[] subject4 = getSubject(options, debezium, true);
+        assertThat(subject4[0], equalTo("valueSubject"));
+        assertThat(subject4[1], equalTo(null));
+    }
+
+    private String[] getSubject(Map<String, String> options, String format, boolean isValuePrefix) {

Review comment:
       We shouldn't use this way to get subject for testing, because it can't cover all the path. A simple way to test is using `KafkaDynamicTableFactoryTest#createTableSink(schema, options)` to create sink and decoder, and validate the decoder.

##########
File path: flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaOptions.java
##########
@@ -698,6 +706,49 @@ private static boolean hasKafkaClientProperties(Map<String, String> tableOptions
         throw new TableException("Unknown value fields strategy:" + strategy);
     }
 
+    public static ReadableConfig completeAvroConfluentSubject(ReadableConfig tableOptions) {
+        Configuration configuration = (Configuration) tableOptions;
+        final String format = configuration.getString(FORMAT);
+        final String valueFormat = configuration.getString(VALUE_FORMAT);
+        final String avro = "avro-confluent";
+        final String debezium = "debezium-avro-confluent";
+        if (avro.equals(format)) {
+            completeKeyAvroConfulentSubject(configuration, avro, false);
+        } else if (debezium.equals(format)) {
+            completeKeyAvroConfulentSubject(configuration, debezium, false);
+        } else if (avro.equals(valueFormat)) {
+            completeKeyAvroConfulentSubject(configuration, avro, true);
+        } else if (debezium.equals(valueFormat)) {
+            completeKeyAvroConfulentSubject(configuration, debezium, true);
+        }
+        return configuration;
+    }
+
+    private static void completeKeyAvroConfulentSubject(
+            Configuration configuration, String format, boolean isValuePrefix) {
+        final ConfigOption<String> valueSubject =
+                ConfigOptions.key(
+                                (isValuePrefix ? "value." : "")
+                                        + format
+                                        + "."
+                                        + SCHEMA_REGISTRY_SUBJECT.key())
+                        .stringType()
+                        .noDefaultValue()
+                        .withDescription("Subject name to write to the Schema Registry service.");

Review comment:
       Don't need description here. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org