You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by "sjvanrossum (via GitHub)" <gi...@apache.org> on 2023/05/31 13:36:20 UTC

[GitHub] [beam] sjvanrossum opened a new pull request, #26948: Add topicPattern property to KafkaIO.Read to match topics using a regex

sjvanrossum opened a new pull request, #26948:
URL: https://github.com/apache/beam/pull/26948

   This addresses #19217 and #21338, matching Apache Flink's KafkaSource property `topicPattern`.
   
   ------------------------
   
   Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
   
    - [ ] Mention the appropriate issue in your description (for example: `addresses #123`), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, comment `fixes #<ISSUE NUMBER>` instead.
    - [ ] Update `CHANGES.md` with noteworthy changes.
    - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).
   
   See the [Contributor Guide](https://beam.apache.org/contribute) for more tips on [how to make review process smoother](https://beam.apache.org/contribute/get-started-contributing/#make-the-reviewers-job-easier).
   
   To check the build health, please visit [https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md](https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md)
   
   GitHub Actions Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   [![Build python source distribution and wheels](https://github.com/apache/beam/workflows/Build%20python%20source%20distribution%20and%20wheels/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Build+python+source+distribution+and+wheels%22+branch%3Amaster+event%3Aschedule)
   [![Python tests](https://github.com/apache/beam/workflows/Python%20tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Python+Tests%22+branch%3Amaster+event%3Aschedule)
   [![Java tests](https://github.com/apache/beam/workflows/Java%20Tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Java+Tests%22+branch%3Amaster+event%3Aschedule)
   [![Go tests](https://github.com/apache/beam/workflows/Go%20tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Go+tests%22+branch%3Amaster+event%3Aschedule)
   
   See [CI.md](https://github.com/apache/beam/blob/master/CI.md) for more information about GitHub Actions CI.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] chamikaramj commented on a diff in pull request #26948: Add topicPattern property to KafkaIO.Read to match topics using a regex

Posted by "chamikaramj (via GitHub)" <gi...@apache.org>.
chamikaramj commented on code in PR #26948:
URL: https://github.com/apache/beam/pull/26948#discussion_r1247243485


##########
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java:
##########
@@ -936,11 +944,26 @@ public Read<K, V> withTopics(List<String> topics) {
      */
     public Read<K, V> withTopicPartitions(List<TopicPartition> topicPartitions) {
       checkState(
-          getTopics() == null || getTopics().isEmpty(),
-          "Only topics or topicPartitions can be set, not both");
+          (getTopics() == null || getTopics().isEmpty()) && getTopicPattern() == null,
+          "Only one of topics, topicPartitions or topicPattern can be set");
       return toBuilder().setTopicPartitions(ImmutableList.copyOf(topicPartitions)).build();
     }
 
+    /**
+     * Sets a {@link java.util.regex.Pattern} of topics to read from. All the partitions from each
+     * of the matching topics are read.
+     *
+     * <p>See {@link KafkaUnboundedSource#split(int, PipelineOptions)} for description of how the
+     * partitions are distributed among the splits.
+     */
+    public Read<K, V> withTopicPattern(Pattern topicPattern) {

Review Comment:
   Thanks. If there is no significant advantage for specifying a Pattern object I would just support specifying a String regex to keep the API simple. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] github-actions[bot] commented on pull request #26948: Add topicPattern property to KafkaIO.Read to match topics using a regex

Posted by "github-actions[bot] (via GitHub)" <gi...@apache.org>.
github-actions[bot] commented on PR #26948:
URL: https://github.com/apache/beam/pull/26948#issuecomment-1582475476

   Reminder, please take a look at this pr: @bvolpato @pabloem 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] sjvanrossum commented on a diff in pull request #26948: Add topicPattern property to KafkaIO.Read to match topics using a regex

Posted by "sjvanrossum (via GitHub)" <gi...@apache.org>.
sjvanrossum commented on code in PR #26948:
URL: https://github.com/apache/beam/pull/26948#discussion_r1247754667


##########
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java:
##########
@@ -936,11 +944,26 @@ public Read<K, V> withTopics(List<String> topics) {
      */
     public Read<K, V> withTopicPartitions(List<TopicPartition> topicPartitions) {
       checkState(
-          getTopics() == null || getTopics().isEmpty(),
-          "Only topics or topicPartitions can be set, not both");
+          (getTopics() == null || getTopics().isEmpty()) && getTopicPattern() == null,
+          "Only one of topics, topicPartitions or topicPattern can be set");
       return toBuilder().setTopicPartitions(ImmutableList.copyOf(topicPartitions)).build();
     }
 
+    /**
+     * Sets a {@link java.util.regex.Pattern} of topics to read from. All the partitions from each
+     * of the matching topics are read.
+     *
+     * <p>See {@link KafkaUnboundedSource#split(int, PipelineOptions)} for description of how the
+     * partitions are distributed among the splits.
+     */
+    public Read<K, V> withTopicPattern(Pattern topicPattern) {

Review Comment:
   Done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] johnjcasey commented on a diff in pull request #26948: Add topicPattern property to KafkaIO.Read to match topics using a regex

Posted by "johnjcasey (via GitHub)" <gi...@apache.org>.
johnjcasey commented on code in PR #26948:
URL: https://github.com/apache/beam/pull/26948#discussion_r1246637138


##########
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java:
##########
@@ -936,11 +944,26 @@ public Read<K, V> withTopics(List<String> topics) {
      */
     public Read<K, V> withTopicPartitions(List<TopicPartition> topicPartitions) {
       checkState(
-          getTopics() == null || getTopics().isEmpty(),
-          "Only topics or topicPartitions can be set, not both");
+          (getTopics() == null || getTopics().isEmpty()) && getTopicPattern() == null,
+          "Only one of topics, topicPartitions or topicPattern can be set");
       return toBuilder().setTopicPartitions(ImmutableList.copyOf(topicPartitions)).build();
     }
 
+    /**
+     * Sets a {@link java.util.regex.Pattern} of topics to read from. All the partitions from each
+     * of the matching topics are read.
+     *
+     * <p>See {@link KafkaUnboundedSource#split(int, PipelineOptions)} for description of how the
+     * partitions are distributed among the splits.
+     */
+    public Read<K, V> withTopicPattern(Pattern topicPattern) {

Review Comment:
   Ideally we don't want to be using external read configuration as a pattern. It makes it so the configurations for an IO diverge for python v. java users



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] sjvanrossum commented on a diff in pull request #26948: Add topicPattern property to KafkaIO.Read to match topics using a regex

Posted by "sjvanrossum (via GitHub)" <gi...@apache.org>.
sjvanrossum commented on code in PR #26948:
URL: https://github.com/apache/beam/pull/26948#discussion_r1246950699


##########
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java:
##########
@@ -936,11 +944,26 @@ public Read<K, V> withTopics(List<String> topics) {
      */
     public Read<K, V> withTopicPartitions(List<TopicPartition> topicPartitions) {
       checkState(
-          getTopics() == null || getTopics().isEmpty(),
-          "Only topics or topicPartitions can be set, not both");
+          (getTopics() == null || getTopics().isEmpty()) && getTopicPattern() == null,
+          "Only one of topics, topicPartitions or topicPattern can be set");
       return toBuilder().setTopicPartitions(ImmutableList.copyOf(topicPartitions)).build();
     }
 
+    /**
+     * Sets a {@link java.util.regex.Pattern} of topics to read from. All the partitions from each
+     * of the matching topics are read.
+     *
+     * <p>See {@link KafkaUnboundedSource#split(int, PipelineOptions)} for description of how the
+     * partitions are distributed among the splits.
+     */
+    public Read<K, V> withTopicPattern(Pattern topicPattern) {

Review Comment:
   Mostly to make sure that pattern flags can be specified by users, but nearly all (except `LITERAL` and `CANON_EQ`) can be specified in the expression as well.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] sjvanrossum commented on a diff in pull request #26948: Add topicPattern property to KafkaIO.Read to match topics using a regex

Posted by "sjvanrossum (via GitHub)" <gi...@apache.org>.
sjvanrossum commented on code in PR #26948:
URL: https://github.com/apache/beam/pull/26948#discussion_r1212769417


##########
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java:
##########
@@ -1578,15 +1604,30 @@ static class GenerateKafkaSourceDescriptor extends DoFn<byte[], KafkaSourceDescr
 
       @VisibleForTesting final @Nullable List<String> topics;
 
+      private final @Nullable Pattern topicPattern;
+
       @ProcessElement
       public void processElement(OutputReceiver<KafkaSourceDescriptor> receiver) {
         List<TopicPartition> partitions =
             new ArrayList<>(Preconditions.checkStateNotNull(topicPartitions));
         if (partitions.isEmpty()) {
           try (Consumer<?, ?> consumer = consumerFactoryFn.apply(consumerConfig)) {
-            for (String topic : Preconditions.checkStateNotNull(topics)) {
-              for (PartitionInfo p : consumer.partitionsFor(topic)) {
-                partitions.add(new TopicPartition(p.topic(), p.partition()));
+            List<String> topics = Preconditions.checkStateNotNull(this.topics);
+            if (topics.isEmpty()) {
+              Pattern pattern = Preconditions.checkStateNotNull(topicPattern);
+              for (Map.Entry<String, List<PartitionInfo>> entry :
+                  consumer.listTopics().entrySet()) {
+                if (pattern.matcher(entry.getKey()).matches()) {
+                  for (PartitionInfo p : entry.getValue()) {
+                    partitions.add(new TopicPartition(p.topic(), p.partition()));
+                  }
+                }
+              }
+            } else {
+              for (String topic : topics) {
+                for (PartitionInfo p : consumer.partitionsFor(topic)) {

Review Comment:
   Is this referring to `topics`? Both `topicPartitions` and `topics` are initialized as empty lists in the builder by default and replaced using `.withTopics()` and `.withTopicPartitions()`. The previous `Preconditions.checkStateNotNull(topics)` expression in the for loop should still not be null under any circumstance. Special care should be taken to carry that property forward when we add support for this property in KafkaIO's ExternalTransformRegistrar though, since it doesn't guarantee the same object state the builder guarantees.
   In regards to `topicPattern`, if both `topicPartitions` and `topics` are empty, then `topicPattern` must be non-null, since the PTransform's expansion checks that at least one of those properties is set and the `.withX()` builder methods check that none are previously set.
   
   As far as Kafka's topic metadata goes, `.partitionsFor()` will throw an exception if an unauthorized topic is requested and `.listTopics()` will only list all authorized topics. Both methods return initialized objects and ensure that potential null responses from the server are translated to empty collections (as far back as `org.apache.kafka:kafka-clients:0.11.0.3`) or throw an exception in the case of an authorization failure. I'd say that the existing check on `partitionInfoList` seems superfluous and could potentially be considered for deletion:
   ```
   List<PartitionInfo> partitionInfoList = consumer.partitionsFor(topic);
   checkState(
       partitionInfoList != null,
       "Could not find any partitions info. Please check Kafka configuration and make sure "
           + "that provided topics exist.");
   for (PartitionInfo p : partitionInfoList) {
     partitions.add(new TopicPartition(p.topic(), p.partition()));
   }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] bvolpato commented on a diff in pull request #26948: Add topicPattern property to KafkaIO.Read to match topics using a regex

Posted by "bvolpato (via GitHub)" <gi...@apache.org>.
bvolpato commented on code in PR #26948:
URL: https://github.com/apache/beam/pull/26948#discussion_r1212436172


##########
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java:
##########
@@ -1578,15 +1604,30 @@ static class GenerateKafkaSourceDescriptor extends DoFn<byte[], KafkaSourceDescr
 
       @VisibleForTesting final @Nullable List<String> topics;
 
+      private final @Nullable Pattern topicPattern;
+
       @ProcessElement
       public void processElement(OutputReceiver<KafkaSourceDescriptor> receiver) {
         List<TopicPartition> partitions =
             new ArrayList<>(Preconditions.checkStateNotNull(topicPartitions));
         if (partitions.isEmpty()) {
           try (Consumer<?, ?> consumer = consumerFactoryFn.apply(consumerConfig)) {
-            for (String topic : Preconditions.checkStateNotNull(topics)) {
-              for (PartitionInfo p : consumer.partitionsFor(topic)) {
-                partitions.add(new TopicPartition(p.topic(), p.partition()));
+            List<String> topics = Preconditions.checkStateNotNull(this.topics);
+            if (topics.isEmpty()) {
+              Pattern pattern = Preconditions.checkStateNotNull(topicPattern);
+              for (Map.Entry<String, List<PartitionInfo>> entry :
+                  consumer.listTopics().entrySet()) {
+                if (pattern.matcher(entry.getKey()).matches()) {
+                  for (PartitionInfo p : entry.getValue()) {
+                    partitions.add(new TopicPartition(p.topic(), p.partition()));
+                  }
+                }
+              }
+            } else {
+              for (String topic : topics) {
+                for (PartitionInfo p : consumer.partitionsFor(topic)) {

Review Comment:
   Any chance this is null at this point? In the split below you have null checks, but not here. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] github-actions[bot] commented on pull request #26948: Add topicPattern property to KafkaIO.Read to match topics using a regex

Posted by "github-actions[bot] (via GitHub)" <gi...@apache.org>.
github-actions[bot] commented on PR #26948:
URL: https://github.com/apache/beam/pull/26948#issuecomment-1609381221

   Assigning new set of reviewers because Pr has gone too long without review. If you would like to opt out of this review, comment `assign to next reviewer`:
   
   R: @Abacn for label java.
   R: @Abacn for label io.
   
   Available commands:
   - `stop reviewer notifications` - opt out of the automated review tooling
   - `remind me after tests pass` - tag the comment author after tests pass
   - `waiting on author` - shift the attention set back to the author (any comment or push by the author will return the attention set to the reviewers)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] sjvanrossum commented on a diff in pull request #26948: Add topicPattern property to KafkaIO.Read to match topics using a regex

Posted by "sjvanrossum (via GitHub)" <gi...@apache.org>.
sjvanrossum commented on code in PR #26948:
URL: https://github.com/apache/beam/pull/26948#discussion_r1246605302


##########
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java:
##########
@@ -936,11 +944,26 @@ public Read<K, V> withTopics(List<String> topics) {
      */
     public Read<K, V> withTopicPartitions(List<TopicPartition> topicPartitions) {
       checkState(
-          getTopics() == null || getTopics().isEmpty(),
-          "Only topics or topicPartitions can be set, not both");
+          (getTopics() == null || getTopics().isEmpty()) && getTopicPattern() == null,
+          "Only one of topics, topicPartitions or topicPattern can be set");
       return toBuilder().setTopicPartitions(ImmutableList.copyOf(topicPartitions)).build();
     }
 
+    /**
+     * Sets a {@link java.util.regex.Pattern} of topics to read from. All the partitions from each
+     * of the matching topics are read.
+     *
+     * <p>See {@link KafkaUnboundedSource#split(int, PipelineOptions)} for description of how the
+     * partitions are distributed among the splits.
+     */
+    public Read<K, V> withTopicPattern(Pattern topicPattern) {

Review Comment:
   `Read.External.Configuration` has fields for `keyDeserializer` and `valueDeserializer` which are resolved from `String` to `Class` during external transform construction, does it make sense to provide a mapping there instead?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] johnjcasey commented on a diff in pull request #26948: Add topicPattern property to KafkaIO.Read to match topics using a regex

Posted by "johnjcasey (via GitHub)" <gi...@apache.org>.
johnjcasey commented on code in PR #26948:
URL: https://github.com/apache/beam/pull/26948#discussion_r1246854669


##########
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java:
##########
@@ -936,11 +944,26 @@ public Read<K, V> withTopics(List<String> topics) {
      */
     public Read<K, V> withTopicPartitions(List<TopicPartition> topicPartitions) {
       checkState(
-          getTopics() == null || getTopics().isEmpty(),
-          "Only topics or topicPartitions can be set, not both");
+          (getTopics() == null || getTopics().isEmpty()) && getTopicPattern() == null,
+          "Only one of topics, topicPartitions or topicPattern can be set");
       return toBuilder().setTopicPartitions(ImmutableList.copyOf(topicPartitions)).build();
     }
 
+    /**
+     * Sets a {@link java.util.regex.Pattern} of topics to read from. All the partitions from each
+     * of the matching topics are read.
+     *
+     * <p>See {@link KafkaUnboundedSource#split(int, PipelineOptions)} for description of how the
+     * partitions are distributed among the splits.
+     */
+    public Read<K, V> withTopicPattern(Pattern topicPattern) {

Review Comment:
   I think we can have both



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] sjvanrossum commented on a diff in pull request #26948: Add topicPattern property to KafkaIO.Read to match topics using a regex

Posted by "sjvanrossum (via GitHub)" <gi...@apache.org>.
sjvanrossum commented on code in PR #26948:
URL: https://github.com/apache/beam/pull/26948#discussion_r1212769417


##########
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java:
##########
@@ -1578,15 +1604,30 @@ static class GenerateKafkaSourceDescriptor extends DoFn<byte[], KafkaSourceDescr
 
       @VisibleForTesting final @Nullable List<String> topics;
 
+      private final @Nullable Pattern topicPattern;
+
       @ProcessElement
       public void processElement(OutputReceiver<KafkaSourceDescriptor> receiver) {
         List<TopicPartition> partitions =
             new ArrayList<>(Preconditions.checkStateNotNull(topicPartitions));
         if (partitions.isEmpty()) {
           try (Consumer<?, ?> consumer = consumerFactoryFn.apply(consumerConfig)) {
-            for (String topic : Preconditions.checkStateNotNull(topics)) {
-              for (PartitionInfo p : consumer.partitionsFor(topic)) {
-                partitions.add(new TopicPartition(p.topic(), p.partition()));
+            List<String> topics = Preconditions.checkStateNotNull(this.topics);
+            if (topics.isEmpty()) {
+              Pattern pattern = Preconditions.checkStateNotNull(topicPattern);
+              for (Map.Entry<String, List<PartitionInfo>> entry :
+                  consumer.listTopics().entrySet()) {
+                if (pattern.matcher(entry.getKey()).matches()) {
+                  for (PartitionInfo p : entry.getValue()) {
+                    partitions.add(new TopicPartition(p.topic(), p.partition()));
+                  }
+                }
+              }
+            } else {
+              for (String topic : topics) {
+                for (PartitionInfo p : consumer.partitionsFor(topic)) {

Review Comment:
   Is this referring to `topics`? Both `topicPartitions` and `topics` are initialized as empty lists in the builder by default and replaced using `.withTopics()` and `.withTopicPartitions()`. The previous `Preconditions.checkStateNotNull(topics)` expression in the for loop should still not be null under any circumstance. Special care should be taken to carry that property forward when we add support for this property in KafkaIO's ExternalTransformRegistrar though, since it doesn't guarantee the same object state the builder guarantees.
   In regards to `topicPattern`, if both `topicPartitions` and `topics` are empty, then `topicPattern` must be non-null, since the PTransform's expansion checks that at least one of those properties is set and the `.withX()` builder methods check that none are previously set.
   
   As far as Kafka's topic metadata goes, `.partitionsFor()` will throw an exception if an unauthorized topic is requested and `.listTopics()` will only list all authorized topics. Both methods return initialized objects or throw an exception. I'd say that the existing check on `partitionInfoList` seems superfluous and could potentially be considered for deletion:
   ```
   List<PartitionInfo> partitionInfoList = consumer.partitionsFor(topic);
   checkState(
       partitionInfoList != null,
       "Could not find any partitions info. Please check Kafka configuration and make sure "
           + "that provided topics exist.");
   for (PartitionInfo p : partitionInfoList) {
     partitions.add(new TopicPartition(p.topic(), p.partition()));
   }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] bvolpato commented on pull request #26948: Add topicPattern property to KafkaIO.Read to match topics using a regex

Posted by "bvolpato (via GitHub)" <gi...@apache.org>.
bvolpato commented on PR #26948:
URL: https://github.com/apache/beam/pull/26948#issuecomment-1588454936

   @pabloem @johnjcasey can you please TAL / merge? Thanks!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] sjvanrossum commented on a diff in pull request #26948: Add topicPattern property to KafkaIO.Read to match topics using a regex

Posted by "sjvanrossum (via GitHub)" <gi...@apache.org>.
sjvanrossum commented on code in PR #26948:
URL: https://github.com/apache/beam/pull/26948#discussion_r1238583174


##########
sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java:
##########
@@ -723,6 +724,31 @@ public void testUnboundedSourceWithExplicitPartitions() {
     p.run();
   }
 
+  @Test

Review Comment:
   Added one for partial matches and one for no matches.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] johnjcasey commented on pull request #26948: Add topicPattern property to KafkaIO.Read to match topics using a regex

Posted by "johnjcasey (via GitHub)" <gi...@apache.org>.
johnjcasey commented on PR #26948:
URL: https://github.com/apache/beam/pull/26948#issuecomment-1611249296

   Tests appear to be building fine to me


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] chamikaramj commented on a diff in pull request #26948: Add topicPattern property to KafkaIO.Read to match topics using a regex

Posted by "chamikaramj (via GitHub)" <gi...@apache.org>.
chamikaramj commented on code in PR #26948:
URL: https://github.com/apache/beam/pull/26948#discussion_r1245239927


##########
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java:
##########
@@ -936,11 +944,26 @@ public Read<K, V> withTopics(List<String> topics) {
      */
     public Read<K, V> withTopicPartitions(List<TopicPartition> topicPartitions) {
       checkState(
-          getTopics() == null || getTopics().isEmpty(),
-          "Only topics or topicPartitions can be set, not both");
+          (getTopics() == null || getTopics().isEmpty()) && getTopicPattern() == null,
+          "Only one of topics, topicPartitions or topicPattern can be set");
       return toBuilder().setTopicPartitions(ImmutableList.copyOf(topicPartitions)).build();
     }
 
+    /**
+     * Sets a {@link java.util.regex.Pattern} of topics to read from. All the partitions from each
+     * of the matching topics are read.
+     *
+     * <p>See {@link KafkaUnboundedSource#split(int, PipelineOptions)} for description of how the
+     * partitions are distributed among the splits.
+     */
+    public Read<K, V> withTopicPattern(Pattern topicPattern) {

Review Comment:
   Can change this API to use a portable type (for example, a string regex) so that this can be supported via cross-language wrappers ?
   
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] johnjcasey commented on pull request #26948: Add topicPattern property to KafkaIO.Read to match topics using a regex

Posted by "johnjcasey (via GitHub)" <gi...@apache.org>.
johnjcasey commented on PR #26948:
URL: https://github.com/apache/beam/pull/26948#issuecomment-1609567452

   run RAT PreCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] sjvanrossum commented on a diff in pull request #26948: Add topicPattern property to KafkaIO.Read to match topics using a regex

Posted by "sjvanrossum (via GitHub)" <gi...@apache.org>.
sjvanrossum commented on code in PR #26948:
URL: https://github.com/apache/beam/pull/26948#discussion_r1246645703


##########
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java:
##########
@@ -936,11 +944,26 @@ public Read<K, V> withTopics(List<String> topics) {
      */
     public Read<K, V> withTopicPartitions(List<TopicPartition> topicPartitions) {
       checkState(
-          getTopics() == null || getTopics().isEmpty(),
-          "Only topics or topicPartitions can be set, not both");
+          (getTopics() == null || getTopics().isEmpty()) && getTopicPattern() == null,
+          "Only one of topics, topicPartitions or topicPattern can be set");
       return toBuilder().setTopicPartitions(ImmutableList.copyOf(topicPartitions)).build();
     }
 
+    /**
+     * Sets a {@link java.util.regex.Pattern} of topics to read from. All the partitions from each
+     * of the matching topics are read.
+     *
+     * <p>See {@link KafkaUnboundedSource#split(int, PipelineOptions)} for description of how the
+     * partitions are distributed among the splits.
+     */
+    public Read<K, V> withTopicPattern(Pattern topicPattern) {

Review Comment:
   Got it. Would an additional method overload with `String topicPattern` help at all?
   Otherwise I'll change the method signature.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] chamikaramj commented on a diff in pull request #26948: Add topicPattern property to KafkaIO.Read to match topics using a regex

Posted by "chamikaramj (via GitHub)" <gi...@apache.org>.
chamikaramj commented on code in PR #26948:
URL: https://github.com/apache/beam/pull/26948#discussion_r1246889586


##########
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java:
##########
@@ -936,11 +944,26 @@ public Read<K, V> withTopics(List<String> topics) {
      */
     public Read<K, V> withTopicPartitions(List<TopicPartition> topicPartitions) {
       checkState(
-          getTopics() == null || getTopics().isEmpty(),
-          "Only topics or topicPartitions can be set, not both");
+          (getTopics() == null || getTopics().isEmpty()) && getTopicPattern() == null,
+          "Only one of topics, topicPartitions or topicPattern can be set");
       return toBuilder().setTopicPartitions(ImmutableList.copyOf(topicPartitions)).build();
     }
 
+    /**
+     * Sets a {@link java.util.regex.Pattern} of topics to read from. All the partitions from each
+     * of the matching topics are read.
+     *
+     * <p>See {@link KafkaUnboundedSource#split(int, PipelineOptions)} for description of how the
+     * partitions are distributed among the splits.
+     */
+    public Read<K, V> withTopicPattern(Pattern topicPattern) {

Review Comment:
   Is there a significant advantage to using "Pattern" over a string regex ?
   
   If it's a perf issue, we could just build the Patter object once within "withTopicPattern" and use that ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] chamikaramj merged pull request #26948: Add topicPattern property to KafkaIO.Read to match topics using a regex

Posted by "chamikaramj (via GitHub)" <gi...@apache.org>.
chamikaramj merged PR #26948:
URL: https://github.com/apache/beam/pull/26948


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] github-actions[bot] commented on pull request #26948: Add topicPattern property to KafkaIO.Read to match topics using a regex

Posted by "github-actions[bot] (via GitHub)" <gi...@apache.org>.
github-actions[bot] commented on PR #26948:
URL: https://github.com/apache/beam/pull/26948#issuecomment-1570306406

   Assigning reviewers. If you would like to opt out of this review, comment `assign to next reviewer`:
   
   R: @bvolpato for label java.
   R: @pabloem for label io.
   
   Available commands:
   - `stop reviewer notifications` - opt out of the automated review tooling
   - `remind me after tests pass` - tag the comment author after tests pass
   - `waiting on author` - shift the attention set back to the author (any comment or push by the author will return the attention set to the reviewers)
   
   The PR bot will only process comments in the main thread (not review comments).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] johnjcasey commented on a diff in pull request #26948: Add topicPattern property to KafkaIO.Read to match topics using a regex

Posted by "johnjcasey (via GitHub)" <gi...@apache.org>.
johnjcasey commented on code in PR #26948:
URL: https://github.com/apache/beam/pull/26948#discussion_r1235327690


##########
sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java:
##########
@@ -723,6 +724,31 @@ public void testUnboundedSourceWithExplicitPartitions() {
     p.run();
   }
 
+  @Test

Review Comment:
   can you add a test, or update this test, such that you don't match all the topics?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] github-actions[bot] commented on pull request #26948: Add topicPattern property to KafkaIO.Read to match topics using a regex

Posted by "github-actions[bot] (via GitHub)" <gi...@apache.org>.
github-actions[bot] commented on PR #26948:
URL: https://github.com/apache/beam/pull/26948#issuecomment-1598659635

   Reminder, please take a look at this pr: @robertwb @chamikaramj 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] sjvanrossum commented on pull request #26948: Add topicPattern property to KafkaIO.Read to match topics using a regex

Posted by "sjvanrossum (via GitHub)" <gi...@apache.org>.
sjvanrossum commented on PR #26948:
URL: https://github.com/apache/beam/pull/26948#issuecomment-1604159868

   @johnjcasey Unit tests are failing early on a missing dependency.
   The Avro plugin version in use for Beam seems to have been yanked and all versions of the published artifact com.commercehub.gradle.plugin:gradle-avro-plugin are obsolete.
   
   It has been superseded by com.github.davidmc24.gradle.plugin.avro it seems, see [changelog](https://github.com/davidmc24/gradle-avro-plugin/blob/master/CHANGES.md#pre-10-versions).
   I'm not sure what the compatibility story is between <1.0.0 and >=1.0.0, but upgrading seems prudent.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] github-actions[bot] commented on pull request #26948: Add topicPattern property to KafkaIO.Read to match topics using a regex

Posted by "github-actions[bot] (via GitHub)" <gi...@apache.org>.
github-actions[bot] commented on PR #26948:
URL: https://github.com/apache/beam/pull/26948#issuecomment-1587221367

   Assigning new set of reviewers because Pr has gone too long without review. If you would like to opt out of this review, comment `assign to next reviewer`:
   
   R: @robertwb for label java.
   R: @chamikaramj for label io.
   
   Available commands:
   - `stop reviewer notifications` - opt out of the automated review tooling
   - `remind me after tests pass` - tag the comment author after tests pass
   - `waiting on author` - shift the attention set back to the author (any comment or push by the author will return the attention set to the reviewers)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] sjvanrossum commented on a diff in pull request #26948: Add topicPattern property to KafkaIO.Read to match topics using a regex

Posted by "sjvanrossum (via GitHub)" <gi...@apache.org>.
sjvanrossum commented on code in PR #26948:
URL: https://github.com/apache/beam/pull/26948#discussion_r1211742600


##########
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java:
##########
@@ -1578,15 +1604,30 @@ static class GenerateKafkaSourceDescriptor extends DoFn<byte[], KafkaSourceDescr
 
       @VisibleForTesting final @Nullable List<String> topics;
 
+      private final @Nullable Pattern topicPattern;

Review Comment:
   Not annotated with `@VisibleForTesting` since the property is not accessed directly in tests, but neither are the properties which do specify `@VisibleForTesting`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org