You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/04/06 04:51:29 UTC

[GitHub] [flink] mas-chen opened a new pull request, #19366: [FLINK-24660][Connectors / Kafka] allow setting KafkaSubscriber in KafkaSourceBuilder

mas-chen opened a new pull request, #19366:
URL: https://github.com/apache/flink/pull/19366

   ## What is the purpose of the change
   
   Allow setting KafkaSubscriber in KafkaSourceBuilder so that users can supply custom implementations.
   
   ## Brief change log
   
   * Expose a setter for the subscriber and add javadocs
   
   ## Verifying this change
   
   This change is a trivial rework / code cleanup without any test coverage.
   
   ## Does this pull request potentially affect one of the following parts:
   
     - Dependencies (does it add or upgrade a dependency): no
     - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: yes
     - The serializers: no
     - The runtime per-record code paths (performance sensitive): no
     - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: yes
     - The S3 file system connector: no
   
   ## Documentation
   
     - Does this pull request introduce a new feature? no
     - If yes, how is the feature documented? JavaDocs
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] PatrickRen commented on pull request #19366: [FLINK-24660][Connectors / Kafka] allow setting KafkaSubscriber in KafkaSourceBuilder

Posted by GitBox <gi...@apache.org>.
PatrickRen commented on PR #19366:
URL: https://github.com/apache/flink/pull/19366#issuecomment-1182933364

   Squashed and merged to master c4697f12ecb097a7a1c890264d813e834b8ae44d


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] MartijnVisser commented on a diff in pull request #19366: [FLINK-24660][Connectors / Kafka] allow setting KafkaSubscriber in KafkaSourceBuilder

Posted by GitBox <gi...@apache.org>.
MartijnVisser commented on code in PR #19366:
URL: https://github.com/apache/flink/pull/19366#discussion_r845040363


##########
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/subscriber/KafkaSubscriber.java:
##########
@@ -38,9 +38,11 @@
  * </ol>
  *
  * <p>The KafkaSubscriber provides a unified interface for the Kafka source to support all these
- * three types of subscribing mode.
+ * three types of subscribing mode. {@link
+ * org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumerator} only supports adding
+ * new splits and not removing splits in split discovery.
  */
-@Internal
+@PublicEvolving

Review Comment:
   @PatrickRen Was it indeed intended to make this interface public? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] salvalcantara commented on pull request #19366: [FLINK-24660][Connectors / Kafka] allow setting KafkaSubscriber in KafkaSourceBuilder

Posted by GitBox <gi...@apache.org>.
salvalcantara commented on PR #19366:
URL: https://github.com/apache/flink/pull/19366#issuecomment-1184183742

   @mas-chen On which version(s) will this patch be available?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] JingGe commented on a diff in pull request #19366: [FLINK-24660][Connectors / Kafka] allow setting KafkaSubscriber in KafkaSourceBuilder

Posted by GitBox <gi...@apache.org>.
JingGe commented on code in PR #19366:
URL: https://github.com/apache/flink/pull/19366#discussion_r845948429


##########
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/KafkaSourceBuilder.java:
##########
@@ -184,6 +184,18 @@
         return this;
     }
 
+    /**
+     * Set the Kafka subscriber to use to discover new splits.
+     *
+     * @param kafkaSubscriber the {@link KafkaSubscriber} to use for split discovery.
+     * @return this KafkaSourceBuilder.
+     */
+    public KafkaSourceBuilder<OUT> setKafkaSubscriber(KafkaSubscriber kafkaSubscriber) {
+        ensureSubscriberIsNull("subscriber");

Review Comment:
   Will it have been set anywhere before this new setter is called? I thought the idea of this method is to force the builder using the given `KafkaSubscriber`, even if it has been set before with a different subscriber.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] mas-chen commented on a diff in pull request #19366: [FLINK-24660][Connectors / Kafka] allow setting KafkaSubscriber in KafkaSourceBuilder

Posted by GitBox <gi...@apache.org>.
mas-chen commented on code in PR #19366:
URL: https://github.com/apache/flink/pull/19366#discussion_r845808245


##########
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/subscriber/KafkaSubscriber.java:
##########
@@ -38,9 +38,11 @@
  * </ol>
  *
  * <p>The KafkaSubscriber provides a unified interface for the Kafka source to support all these
- * three types of subscribing mode.
+ * three types of subscribing mode. {@link
+ * org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumerator} only supports adding
+ * new splits and not removing splits in split discovery.
  */
-@Internal
+@PublicEvolving

Review Comment:
   This was original email thread from last year: https://www.mail-archive.com/user@flink.apache.org/msg44340.html



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] xiangcao commented on a diff in pull request #19366: [FLINK-24660][Connectors / Kafka] allow setting KafkaSubscriber in KafkaSourceBuilder

Posted by GitBox <gi...@apache.org>.
xiangcao commented on code in PR #19366:
URL: https://github.com/apache/flink/pull/19366#discussion_r844545487


##########
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/KafkaSourceBuilder.java:
##########
@@ -184,6 +184,18 @@
         return this;
     }
 
+    /**
+     * Set the Kafka subscriber to use to discover new splits.
+     *
+     * @param kafkaSubscriber the {@link KafkaSubscriber} to use for split discovery.
+     * @return this KafkaSourceBuilder.
+     */
+    public KafkaSourceBuilder<OUT> setKafkaSubscriber(KafkaSubscriber kafkaSubscriber) {
+        ensureSubscriberIsNull("subscriber");

Review Comment:
   this is probably not needed here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] xiangcao commented on a diff in pull request #19366: [FLINK-24660][Connectors / Kafka] allow setting KafkaSubscriber in KafkaSourceBuilder

Posted by GitBox <gi...@apache.org>.
xiangcao commented on code in PR #19366:
URL: https://github.com/apache/flink/pull/19366#discussion_r844545487


##########
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/KafkaSourceBuilder.java:
##########
@@ -184,6 +184,18 @@
         return this;
     }
 
+    /**
+     * Set the Kafka subscriber to use to discover new splits.
+     *
+     * @param kafkaSubscriber the {@link KafkaSubscriber} to use for split discovery.
+     * @return this KafkaSourceBuilder.
+     */
+    public KafkaSourceBuilder<OUT> setKafkaSubscriber(KafkaSubscriber kafkaSubscriber) {
+        ensureSubscriberIsNull("subscriber");

Review Comment:
   this is probably not needed here? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] MartijnVisser commented on pull request #19366: [FLINK-24660][Connectors / Kafka] allow setting KafkaSubscriber in KafkaSourceBuilder

Posted by GitBox <gi...@apache.org>.
MartijnVisser commented on PR #19366:
URL: https://github.com/apache/flink/pull/19366#issuecomment-1184259719

   > @mas-chen On which version(s) will this patch be available?
   
   Only from Flink 1.16 onwards since this is touching public interfaces. These can only be changed in minor and major versions. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] mas-chen commented on pull request #19366: [FLINK-24660][Connectors / Kafka] allow setting KafkaSubscriber in KafkaSourceBuilder

Posted by GitBox <gi...@apache.org>.
mas-chen commented on PR #19366:
URL: https://github.com/apache/flink/pull/19366#issuecomment-1182534445

   @flinkbot run azure
   
   Not sure why the github comment is not updating, but CI passes


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] PatrickRen commented on a diff in pull request #19366: [FLINK-24660][Connectors / Kafka] allow setting KafkaSubscriber in KafkaSourceBuilder

Posted by GitBox <gi...@apache.org>.
PatrickRen commented on code in PR #19366:
URL: https://github.com/apache/flink/pull/19366#discussion_r845833503


##########
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/subscriber/KafkaSubscriber.java:
##########
@@ -38,9 +38,11 @@
  * </ol>
  *
  * <p>The KafkaSubscriber provides a unified interface for the Kafka source to support all these
- * three types of subscribing mode.
+ * three types of subscribing mode. {@link
+ * org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumerator} only supports adding
+ * new splits and not removing splits in split discovery.
  */
-@Internal
+@PublicEvolving

Review Comment:
   IMHO it's reasonable to make `KafkaSubscriber` public. As we are exposing new APIs to users, what about starting a discussion in dev and user mailing list? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] PatrickRen closed pull request #19366: [FLINK-24660][Connectors / Kafka] allow setting KafkaSubscriber in KafkaSourceBuilder

Posted by GitBox <gi...@apache.org>.
PatrickRen closed pull request #19366: [FLINK-24660][Connectors / Kafka] allow setting KafkaSubscriber in KafkaSourceBuilder
URL: https://github.com/apache/flink/pull/19366


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] mas-chen commented on pull request #19366: [FLINK-24660][Connectors / Kafka] allow setting KafkaSubscriber in KafkaSourceBuilder

Posted by GitBox <gi...@apache.org>.
mas-chen commented on PR #19366:
URL: https://github.com/apache/flink/pull/19366#issuecomment-1183530072

   Thanks for the collaboration as always!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] flinkbot commented on pull request #19366: [FLINK-24660][Connectors / Kafka] allow setting KafkaSubscriber in KafkaSourceBuilder

Posted by GitBox <gi...@apache.org>.
flinkbot commented on PR #19366:
URL: https://github.com/apache/flink/pull/19366#issuecomment-1089820185

   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "4b990b04e506d2c5cb9837413b98d86f11dc109a",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "4b990b04e506d2c5cb9837413b98d86f11dc109a",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 4b990b04e506d2c5cb9837413b98d86f11dc109a UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] mas-chen commented on pull request #19366: [FLINK-24660][Connectors / Kafka] allow setting KafkaSubscriber in KafkaSourceBuilder

Posted by GitBox <gi...@apache.org>.
mas-chen commented on PR #19366:
URL: https://github.com/apache/flink/pull/19366#issuecomment-1179122465

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] mas-chen closed pull request #19366: [FLINK-24660][Connectors / Kafka] allow setting KafkaSubscriber in KafkaSourceBuilder

Posted by GitBox <gi...@apache.org>.
mas-chen closed pull request #19366: [FLINK-24660][Connectors / Kafka] allow setting KafkaSubscriber in KafkaSourceBuilder
URL: https://github.com/apache/flink/pull/19366


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] mas-chen commented on pull request #19366: [FLINK-24660][Connectors / Kafka] allow setting KafkaSubscriber in KafkaSourceBuilder

Posted by GitBox <gi...@apache.org>.
mas-chen commented on PR #19366:
URL: https://github.com/apache/flink/pull/19366#issuecomment-1179434006

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] salvalcantara commented on a diff in pull request #19366: [FLINK-24660][Connectors / Kafka] allow setting KafkaSubscriber in KafkaSourceBuilder

Posted by GitBox <gi...@apache.org>.
salvalcantara commented on code in PR #19366:
URL: https://github.com/apache/flink/pull/19366#discussion_r894761263


##########
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/KafkaSourceBuilder.java:
##########
@@ -184,6 +184,18 @@
         return this;
     }
 
+    /**
+     * Set the Kafka subscriber to use to discover new splits.
+     *
+     * @param kafkaSubscriber the {@link KafkaSubscriber} to use for split discovery.
+     * @return this KafkaSourceBuilder.
+     */
+    public KafkaSourceBuilder<OUT> setKafkaSubscriber(KafkaSubscriber kafkaSubscriber) {
+        ensureSubscriberIsNull("subscriber");

Review Comment:
   I'd implement the same behaviour as in `setTopics` and `setTopicPattern`, which use `ensureSubscriberIsNull` too. Maybe instead of `"subscriber"` I'd use `"custom"` or something along these lines.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] mas-chen closed pull request #19366: [FLINK-24660][Connectors / Kafka] allow setting KafkaSubscriber in KafkaSourceBuilder

Posted by GitBox <gi...@apache.org>.
mas-chen closed pull request #19366: [FLINK-24660][Connectors / Kafka] allow setting KafkaSubscriber in KafkaSourceBuilder
URL: https://github.com/apache/flink/pull/19366


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] mas-chen commented on pull request #19366: [FLINK-24660][Connectors / Kafka] allow setting KafkaSubscriber in KafkaSourceBuilder

Posted by GitBox <gi...@apache.org>.
mas-chen commented on PR #19366:
URL: https://github.com/apache/flink/pull/19366#issuecomment-1178643221

   Hey @PatrickRen and others, sorry for the late response. PTAL


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] PatrickRen commented on pull request #19366: [FLINK-24660][Connectors / Kafka] allow setting KafkaSubscriber in KafkaSourceBuilder

Posted by GitBox <gi...@apache.org>.
PatrickRen commented on PR #19366:
URL: https://github.com/apache/flink/pull/19366#issuecomment-1148172752

   I think this PR is ready to merge once we resolve minor pending comments above. @mas-chen would you like to make some updates and rebase the latest master?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] fapaul commented on a diff in pull request #19366: [FLINK-24660][Connectors / Kafka] allow setting KafkaSubscriber in KafkaSourceBuilder

Posted by GitBox <gi...@apache.org>.
fapaul commented on code in PR #19366:
URL: https://github.com/apache/flink/pull/19366#discussion_r845849609


##########
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/KafkaSourceBuilder.java:
##########
@@ -184,6 +184,18 @@
         return this;
     }
 
+    /**
+     * Set the Kafka subscriber to use to discover new splits.
+     *
+     * @param kafkaSubscriber the {@link KafkaSubscriber} to use for split discovery.
+     * @return this KafkaSourceBuilder.
+     */
+    public KafkaSourceBuilder<OUT> setKafkaSubscriber(KafkaSubscriber kafkaSubscriber) {
+        ensureSubscriberIsNull("subscriber");
+        this.subscriber = kafkaSubscriber;

Review Comment:
   Nit: 
   ```suggestion
           this.subscriber = checkNotNull(kafkaSubscriber);
   ```



##########
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/subscriber/KafkaSubscriber.java:
##########
@@ -38,9 +38,11 @@
  * </ol>
  *
  * <p>The KafkaSubscriber provides a unified interface for the Kafka source to support all these
- * three types of subscribing mode.
+ * three types of subscribing mode. {@link
+ * org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumerator} only supports adding

Review Comment:
   Nit: Is it a good idea to link an internal class in a public doc string? I would recommend documenting the issue with partition removal in the docs or at the `KafkaSource` so that every user finds it more easily.



##########
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/KafkaSourceBuilder.java:
##########
@@ -184,6 +184,18 @@
         return this;
     }
 
+    /**
+     * Set the Kafka subscriber to use to discover new splits.
+     *
+     * @param kafkaSubscriber the {@link KafkaSubscriber} to use for split discovery.
+     * @return this KafkaSourceBuilder.
+     */
+    public KafkaSourceBuilder<OUT> setKafkaSubscriber(KafkaSubscriber kafkaSubscriber) {
+        ensureSubscriberIsNull("subscriber");

Review Comment:
   It would be good to keep this check to ensure that the different subscriber setters are mutually exclusive.



##########
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/subscriber/KafkaSubscriber.java:
##########
@@ -38,9 +38,11 @@
  * </ol>
  *
  * <p>The KafkaSubscriber provides a unified interface for the Kafka source to support all these
- * three types of subscribing mode.
+ * three types of subscribing mode. {@link
+ * org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumerator} only supports adding
+ * new splits and not removing splits in split discovery.
  */
-@Internal
+@PublicEvolving

Review Comment:
   I agree making it public does not seem like a bad idea.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org