You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/04/08 08:08:40 UTC

[GitHub] [flink] fapaul commented on a diff in pull request #19366: [FLINK-24660][Connectors / Kafka] allow setting KafkaSubscriber in KafkaSourceBuilder

fapaul commented on code in PR #19366:
URL: https://github.com/apache/flink/pull/19366#discussion_r845849609


##########
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/KafkaSourceBuilder.java:
##########
@@ -184,6 +184,18 @@
         return this;
     }
 
+    /**
+     * Set the Kafka subscriber to use to discover new splits.
+     *
+     * @param kafkaSubscriber the {@link KafkaSubscriber} to use for split discovery.
+     * @return this KafkaSourceBuilder.
+     */
+    public KafkaSourceBuilder<OUT> setKafkaSubscriber(KafkaSubscriber kafkaSubscriber) {
+        ensureSubscriberIsNull("subscriber");
+        this.subscriber = kafkaSubscriber;

Review Comment:
   Nit: 
   ```suggestion
           this.subscriber = checkNotNull(kafkaSubscriber);
   ```



##########
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/subscriber/KafkaSubscriber.java:
##########
@@ -38,9 +38,11 @@
  * </ol>
  *
  * <p>The KafkaSubscriber provides a unified interface for the Kafka source to support all these
- * three types of subscribing mode.
+ * three types of subscribing mode. {@link
+ * org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumerator} only supports adding

Review Comment:
   Nit: Is it a good idea to link an internal class in a public doc string? I would recommend documenting the issue with partition removal in the docs or at the `KafkaSource` so that every user finds it more easily.



##########
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/KafkaSourceBuilder.java:
##########
@@ -184,6 +184,18 @@
         return this;
     }
 
+    /**
+     * Set the Kafka subscriber to use to discover new splits.
+     *
+     * @param kafkaSubscriber the {@link KafkaSubscriber} to use for split discovery.
+     * @return this KafkaSourceBuilder.
+     */
+    public KafkaSourceBuilder<OUT> setKafkaSubscriber(KafkaSubscriber kafkaSubscriber) {
+        ensureSubscriberIsNull("subscriber");

Review Comment:
   It would be good to keep this check to ensure that the different subscriber setters are mutually exclusive.



##########
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/subscriber/KafkaSubscriber.java:
##########
@@ -38,9 +38,11 @@
  * </ol>
  *
  * <p>The KafkaSubscriber provides a unified interface for the Kafka source to support all these
- * three types of subscribing mode.
+ * three types of subscribing mode. {@link
+ * org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumerator} only supports adding
+ * new splits and not removing splits in split discovery.
  */
-@Internal
+@PublicEvolving

Review Comment:
   I agree making it public does not seem like a bad idea.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org