You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2022/06/08 17:16:42 UTC

[GitHub] [beam] johnjcasey opened a new pull request, #21752: Feature/beam 13852 reimplement with dynamic read

johnjcasey opened a new pull request, #21752:
URL: https://github.com/apache/beam/pull/21752

   Re implement with dynamic read using Watch
   
   ------------------------
   
   Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
   
    - [ ] [**Choose reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and mention them in a comment (`R: @username`).
    - [ ] Add a link to the appropriate issue in your description, if applicable. This will automatically link the pull request to the issue.
    - [ ] Update `CHANGES.md` with noteworthy changes.
    - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).
   
   See the [Contributor Guide](https://beam.apache.org/contribute) for more tips on [how to make review process smoother](https://beam.apache.org/contribute/#make-reviewers-job-easier).
   
   To check the build health, please visit [https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md](https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md)
   
   GitHub Actions Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   [![Build python source distribution and wheels](https://github.com/apache/beam/workflows/Build%20python%20source%20distribution%20and%20wheels/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Build+python+source+distribution+and+wheels%22+branch%3Amaster+event%3Aschedule)
   [![Python tests](https://github.com/apache/beam/workflows/Python%20tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Python+Tests%22+branch%3Amaster+event%3Aschedule)
   [![Java tests](https://github.com/apache/beam/workflows/Java%20Tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Java+Tests%22+branch%3Amaster+event%3Aschedule)
   
   See [CI.md](https://github.com/apache/beam/blob/master/CI.md) for more information about GitHub Actions CI.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] johnjcasey commented on pull request #21752: Feature/beam 13852 reimplement with dynamic read

Posted by GitBox <gi...@apache.org>.
johnjcasey commented on PR #21752:
URL: https://github.com/apache/beam/pull/21752#issuecomment-1152584390

   run java postcommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] johnjcasey commented on pull request #21752: Feature/beam 13852 reimplement with dynamic read

Posted by GitBox <gi...@apache.org>.
johnjcasey commented on PR #21752:
URL: https://github.com/apache/beam/pull/21752#issuecomment-1161841515

   Run Java PreCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] johnjcasey commented on a diff in pull request #21752: Feature/beam 13852 reimplement with dynamic read

Posted by GitBox <gi...@apache.org>.
johnjcasey commented on code in PR #21752:
URL: https://github.com/apache/beam/pull/21752#discussion_r903988026


##########
sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOIT.java:
##########
@@ -259,8 +272,131 @@ public void processElement(
               return null;
             });
 
+    PipelineResult writeResult = writePipeline.run();
+    writeResult.waitUntilFinish();
+
     PipelineResult readResult = readPipeline.run();
-    readResult.waitUntilFinish();
+    PipelineResult.State readState =
+        readResult.waitUntilFinish(Duration.standardSeconds(options.getReadTimeout()));
+
+    cancelIfTimeouted(readResult, readState);
+  }
+
+  @Test
+  public void testKafkaWithDynamicPartitions() throws IOException {
+    AdminClient client =
+        AdminClient.create(
+            ImmutableMap.of("bootstrap.servers", options.getKafkaBootstrapServerAddresses()));
+    String topicName = "DynamicTopicPartition-" + UUID.randomUUID();
+    Map<Integer, String> records = new HashMap<>();
+    for (int i = 0; i < 100; i++) {
+      records.put(i, String.valueOf(i));
+    }
+    Map<Integer, String> moreRecords = new HashMap<>();
+    for (int i = 100; i < 200; i++) {
+      moreRecords.put(i, String.valueOf(i));
+    }
+    try {
+      client.createTopics(ImmutableSet.of(new NewTopic(topicName, 1, (short) 1)));
+      client.createPartitions(ImmutableMap.of(topicName, NewPartitions.increaseTo(1)));
+
+      writePipeline
+          .apply("Generate Write Elements", Create.of(records))
+          .apply(
+              "Write to Kafka",
+              KafkaIO.<Integer, String>write()
+                  .withBootstrapServers(options.getKafkaBootstrapServerAddresses())
+                  .withTopic(topicName)
+                  .withKeySerializer(IntegerSerializer.class)
+                  .withValueSerializer(StringSerializer.class));
+
+      writePipeline.run().waitUntilFinish(Duration.standardSeconds(15));
+
+      System.out.println("after write 1");
+
+      Thread delayedWriteThread =
+          new Thread(
+              () -> {
+                try {
+                  Thread.sleep(20 * 1000); // wait 20 seconds before changing kafka

Review Comment:
   the goal of this is to wait some amount of time before repartitioning kafka, and then publishing more data to kafka. Thus far it has not been flaky.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] johnjcasey commented on pull request #21752: Feature/beam 13852 reimplement with dynamic read

Posted by GitBox <gi...@apache.org>.
johnjcasey commented on PR #21752:
URL: https://github.com/apache/beam/pull/21752#issuecomment-1165838276

   Run Java Precommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] johnjcasey commented on pull request #21752: Feature/beam 13852 reimplement with dynamic read

Posted by GitBox <gi...@apache.org>.
johnjcasey commented on PR #21752:
URL: https://github.com/apache/beam/pull/21752#issuecomment-1159115046

   run java postcommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] asf-ci commented on pull request #21752: Feature/beam 13852 reimplement with dynamic read

Posted by GitBox <gi...@apache.org>.
asf-ci commented on PR #21752:
URL: https://github.com/apache/beam/pull/21752#issuecomment-1150184534

   Can one of the admins verify this patch?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] pabloem commented on a diff in pull request #21752: Feature/beam 13852 reimplement with dynamic read

Posted by GitBox <gi...@apache.org>.
pabloem commented on code in PR #21752:
URL: https://github.com/apache/beam/pull/21752#discussion_r907663667


##########
sdks/java/io/kafka/build.gradle:
##########
@@ -135,7 +144,8 @@ kafkaVersions.each {kv ->
     include '**/KafkaIOIT.class'
 
     filter {
-      includeTestsMatching "*InBatch"
+      excludeTestsMatching "*InStreaming"
+      if (!(kv.key in sdfKafkaVersions)) excludeTestsMatching "*DynamicPartitions" //admin client create partitions does not exist in kafka 0.11.0.3 and kafka sdf does not appear to work for kafka versions <2.0.1

Review Comment:
   just for my edification - how / where / when do these tests run?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] pabloem commented on a diff in pull request #21752: Feature/beam 13852 reimplement with dynamic read

Posted by GitBox <gi...@apache.org>.
pabloem commented on code in PR #21752:
URL: https://github.com/apache/beam/pull/21752#discussion_r908767655


##########
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/WatchForKafkaTopicPartitions.java:
##########
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.kafka;
+
+import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.MoreObjects.firstNonNull;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.metrics.Counter;
+import org.apache.beam.sdk.metrics.Metrics;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.Impulse;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.transforms.Watch;
+import org.apache.beam.sdk.transforms.Watch.Growth.PollFn;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PBegin;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
+import org.checkerframework.checker.nullness.qual.Nullable;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+
+/**
+ * A {@link PTransform} for continuously querying Kafka for new partitions, and emitting those
+ * topics as {@link KafkaSourceDescriptor} This transform is implemented using the {@link Watch}
+ * transform, and modifications to this transform should keep that in mind.
+ *
+ * <p>Please see
+ * https://docs.google.com/document/d/1Io49s5LBs29HJyppKG3AlR-gHz5m5PC6CqO0CCoSqLs/edit?usp=sharing
+ * for design details
+ */
+@Experimental
+class WatchForKafkaTopicPartitions extends PTransform<PBegin, PCollection<KafkaSourceDescriptor>> {
+
+  private static final Duration DEFAULT_CHECK_DURATION = Duration.standardHours(1);
+  private static final String COUNTER_NAMESPACE = "watch_kafka_topic_partition";
+
+  private final Duration checkDuration;
+  private final SerializableFunction<Map<String, Object>, Consumer<byte[], byte[]>>
+      kafkaConsumerFactoryFn;
+  private final Map<String, Object> kafkaConsumerConfig;
+  private final @Nullable SerializableFunction<TopicPartition, Boolean> checkStopReadingFn;
+  private final List<String> topics;
+  private final @Nullable Instant startReadTime;
+  private final @Nullable Instant stopReadTime;
+
+  public WatchForKafkaTopicPartitions(
+      @Nullable Duration checkDuration,
+      SerializableFunction<Map<String, Object>, Consumer<byte[], byte[]>> kafkaConsumerFactoryFn,
+      Map<String, Object> kafkaConsumerConfig,
+      @Nullable SerializableFunction<TopicPartition, Boolean> checkStopReadingFn,
+      List<String> topics,
+      @Nullable Instant startReadTime,
+      @Nullable Instant stopReadTime) {
+    this.checkDuration = firstNonNull(checkDuration, DEFAULT_CHECK_DURATION);
+    this.kafkaConsumerFactoryFn = kafkaConsumerFactoryFn;
+    this.kafkaConsumerConfig = kafkaConsumerConfig;
+    this.checkStopReadingFn = checkStopReadingFn;
+    this.topics = topics;
+    this.startReadTime = startReadTime;
+    this.stopReadTime = stopReadTime;
+  }
+
+  @Override
+  public PCollection<KafkaSourceDescriptor> expand(PBegin input) {
+    return input
+        .apply(Impulse.create())
+        .apply(
+            "Match new TopicPartitions",
+            Watch.growthOf(
+                    new WatchPartitionFn(kafkaConsumerFactoryFn, kafkaConsumerConfig, topics))
+                .withPollInterval(checkDuration))
+        .apply(ParDo.of(new ConvertToDescriptor(checkStopReadingFn, startReadTime, stopReadTime)));
+  }
+
+  private static class ConvertToDescriptor
+      extends DoFn<KV<byte[], TopicPartition>, KafkaSourceDescriptor> {
+
+    private final @Nullable SerializableFunction<TopicPartition, Boolean> checkStopReadingFn;
+    private final @Nullable Instant startReadTime;
+    private final @Nullable Instant stopReadTime;
+
+    private ConvertToDescriptor(
+        @Nullable SerializableFunction<TopicPartition, Boolean> checkStopReadingFn,
+        @Nullable Instant startReadTime,
+        @Nullable Instant stopReadTime) {
+      this.checkStopReadingFn = checkStopReadingFn;
+      this.startReadTime = startReadTime;
+      this.stopReadTime = stopReadTime;
+    }
+
+    @ProcessElement
+    public void processElement(
+        @Element KV<byte[], TopicPartition> partition,
+        OutputReceiver<KafkaSourceDescriptor> receiver) {
+      TopicPartition topicPartition = Objects.requireNonNull(partition.getValue());
+      if (checkStopReadingFn == null || !checkStopReadingFn.apply(topicPartition)) {
+        Counter foundedTopicPartition =
+            Metrics.counter(COUNTER_NAMESPACE, topicPartition.toString());
+        foundedTopicPartition.inc();
+        receiver.output(
+            KafkaSourceDescriptor.of(
+                topicPartition, null, startReadTime, null, stopReadTime, null));
+      }
+    }
+  }
+
+  private static class WatchPartitionFn extends PollFn<byte[], TopicPartition> {
+
+    private final SerializableFunction<Map<String, Object>, Consumer<byte[], byte[]>>
+        kafkaConsumerFactoryFn;
+    private final Map<String, Object> kafkaConsumerConfig;
+    private final List<String> topics;
+
+    private WatchPartitionFn(
+        SerializableFunction<Map<String, Object>, Consumer<byte[], byte[]>> kafkaConsumerFactoryFn,
+        Map<String, Object> kafkaConsumerConfig,
+        List<String> topics) {
+      this.kafkaConsumerFactoryFn = kafkaConsumerFactoryFn;
+      this.kafkaConsumerConfig = kafkaConsumerConfig;
+      this.topics = topics;
+    }
+
+    @Override
+    public Watch.Growth.PollResult<TopicPartition> apply(byte[] element, Context c)
+        throws Exception {
+      Instant now = Instant.now();
+      return Watch.Growth.PollResult.incomplete(
+              now, getAllTopicPartitions(kafkaConsumerFactoryFn, kafkaConsumerConfig, topics))
+          .withWatermark(now);
+    }
+  }
+
+  @VisibleForTesting
+  static List<TopicPartition> getAllTopicPartitions(
+      SerializableFunction<Map<String, Object>, Consumer<byte[], byte[]>> kafkaConsumerFactoryFn,
+      Map<String, Object> kafkaConsumerConfig,
+      List<String> topics) {
+    List<TopicPartition> current = new ArrayList<>();
+    try (Consumer<byte[], byte[]> kafkaConsumer =
+        kafkaConsumerFactoryFn.apply(kafkaConsumerConfig)) {
+      if (topics != null && !topics.isEmpty()) {
+        for (String topic : topics) {
+          for (PartitionInfo partition : kafkaConsumer.partitionsFor(topic)) {

Review Comment:
   What would happen if the topic does not exist? Should we handle an exception?
   
   ( I checked this : ) - https://kafka.apache.org/26/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] johnjcasey commented on pull request #21752: Feature/beam 13852 reimplement with dynamic read

Posted by GitBox <gi...@apache.org>.
johnjcasey commented on PR #21752:
URL: https://github.com/apache/beam/pull/21752#issuecomment-1162330041

   R: @chamikaramj
   R: @ahmedabu98 
   R: @Abacn 
   R: @pabloem 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] johnjcasey commented on pull request #21752: Feature/beam 13852 reimplement with dynamic read

Posted by GitBox <gi...@apache.org>.
johnjcasey commented on PR #21752:
URL: https://github.com/apache/beam/pull/21752#issuecomment-1162311669

   run Java postcommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] johnjcasey commented on pull request #21752: Feature/beam 13852 reimplement with dynamic read

Posted by GitBox <gi...@apache.org>.
johnjcasey commented on PR #21752:
URL: https://github.com/apache/beam/pull/21752#issuecomment-1152369768

   run java postcommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] johnjcasey commented on pull request #21752: Feature/beam 13852 reimplement with dynamic read

Posted by GitBox <gi...@apache.org>.
johnjcasey commented on PR #21752:
URL: https://github.com/apache/beam/pull/21752#issuecomment-1151466399

   run java postcommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] pabloem commented on pull request #21752: Feature/beam 13852 reimplement with dynamic read

Posted by GitBox <gi...@apache.org>.
pabloem commented on PR #21752:
URL: https://github.com/apache/beam/pull/21752#issuecomment-1170298774

   Run Java PreCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] johnjcasey commented on pull request #21752: Feature/beam 13852 reimplement with dynamic read

Posted by GitBox <gi...@apache.org>.
johnjcasey commented on PR #21752:
URL: https://github.com/apache/beam/pull/21752#issuecomment-1157938381

   Run Java PreCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] johnjcasey commented on a diff in pull request #21752: Feature/beam 13852 reimplement with dynamic read

Posted by GitBox <gi...@apache.org>.
johnjcasey commented on code in PR #21752:
URL: https://github.com/apache/beam/pull/21752#discussion_r909999505


##########
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/WatchForKafkaTopicPartitions.java:
##########
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.kafka;
+
+import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.MoreObjects.firstNonNull;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.metrics.Counter;
+import org.apache.beam.sdk.metrics.Metrics;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.Impulse;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.transforms.Watch;
+import org.apache.beam.sdk.transforms.Watch.Growth.PollFn;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PBegin;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
+import org.checkerframework.checker.nullness.qual.Nullable;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+
+/**
+ * A {@link PTransform} for continuously querying Kafka for new partitions, and emitting those
+ * topics as {@link KafkaSourceDescriptor} This transform is implemented using the {@link Watch}
+ * transform, and modifications to this transform should keep that in mind.
+ *
+ * <p>Please see
+ * https://docs.google.com/document/d/1Io49s5LBs29HJyppKG3AlR-gHz5m5PC6CqO0CCoSqLs/edit?usp=sharing
+ * for design details
+ */
+@Experimental
+class WatchForKafkaTopicPartitions extends PTransform<PBegin, PCollection<KafkaSourceDescriptor>> {
+
+  private static final Duration DEFAULT_CHECK_DURATION = Duration.standardHours(1);
+  private static final String COUNTER_NAMESPACE = "watch_kafka_topic_partition";
+
+  private final Duration checkDuration;
+  private final SerializableFunction<Map<String, Object>, Consumer<byte[], byte[]>>
+      kafkaConsumerFactoryFn;
+  private final Map<String, Object> kafkaConsumerConfig;
+  private final @Nullable SerializableFunction<TopicPartition, Boolean> checkStopReadingFn;
+  private final List<String> topics;
+  private final @Nullable Instant startReadTime;
+  private final @Nullable Instant stopReadTime;
+
+  public WatchForKafkaTopicPartitions(
+      @Nullable Duration checkDuration,
+      SerializableFunction<Map<String, Object>, Consumer<byte[], byte[]>> kafkaConsumerFactoryFn,
+      Map<String, Object> kafkaConsumerConfig,
+      @Nullable SerializableFunction<TopicPartition, Boolean> checkStopReadingFn,
+      List<String> topics,
+      @Nullable Instant startReadTime,
+      @Nullable Instant stopReadTime) {
+    this.checkDuration = firstNonNull(checkDuration, DEFAULT_CHECK_DURATION);
+    this.kafkaConsumerFactoryFn = kafkaConsumerFactoryFn;
+    this.kafkaConsumerConfig = kafkaConsumerConfig;
+    this.checkStopReadingFn = checkStopReadingFn;
+    this.topics = topics;
+    this.startReadTime = startReadTime;
+    this.stopReadTime = stopReadTime;
+  }
+
+  @Override
+  public PCollection<KafkaSourceDescriptor> expand(PBegin input) {
+    return input
+        .apply(Impulse.create())
+        .apply(
+            "Match new TopicPartitions",
+            Watch.growthOf(
+                    new WatchPartitionFn(kafkaConsumerFactoryFn, kafkaConsumerConfig, topics))
+                .withPollInterval(checkDuration))
+        .apply(ParDo.of(new ConvertToDescriptor(checkStopReadingFn, startReadTime, stopReadTime)));
+  }
+
+  private static class ConvertToDescriptor
+      extends DoFn<KV<byte[], TopicPartition>, KafkaSourceDescriptor> {
+
+    private final @Nullable SerializableFunction<TopicPartition, Boolean> checkStopReadingFn;
+    private final @Nullable Instant startReadTime;
+    private final @Nullable Instant stopReadTime;
+
+    private ConvertToDescriptor(
+        @Nullable SerializableFunction<TopicPartition, Boolean> checkStopReadingFn,
+        @Nullable Instant startReadTime,
+        @Nullable Instant stopReadTime) {
+      this.checkStopReadingFn = checkStopReadingFn;
+      this.startReadTime = startReadTime;
+      this.stopReadTime = stopReadTime;
+    }
+
+    @ProcessElement
+    public void processElement(
+        @Element KV<byte[], TopicPartition> partition,
+        OutputReceiver<KafkaSourceDescriptor> receiver) {
+      TopicPartition topicPartition = Objects.requireNonNull(partition.getValue());
+      if (checkStopReadingFn == null || !checkStopReadingFn.apply(topicPartition)) {
+        Counter foundedTopicPartition =
+            Metrics.counter(COUNTER_NAMESPACE, topicPartition.toString());
+        foundedTopicPartition.inc();
+        receiver.output(
+            KafkaSourceDescriptor.of(
+                topicPartition, null, startReadTime, null, stopReadTime, null));
+      }
+    }
+  }
+
+  private static class WatchPartitionFn extends PollFn<byte[], TopicPartition> {
+
+    private final SerializableFunction<Map<String, Object>, Consumer<byte[], byte[]>>
+        kafkaConsumerFactoryFn;
+    private final Map<String, Object> kafkaConsumerConfig;
+    private final List<String> topics;
+
+    private WatchPartitionFn(
+        SerializableFunction<Map<String, Object>, Consumer<byte[], byte[]>> kafkaConsumerFactoryFn,
+        Map<String, Object> kafkaConsumerConfig,
+        List<String> topics) {
+      this.kafkaConsumerFactoryFn = kafkaConsumerFactoryFn;
+      this.kafkaConsumerConfig = kafkaConsumerConfig;
+      this.topics = topics;
+    }
+
+    @Override
+    public Watch.Growth.PollResult<TopicPartition> apply(byte[] element, Context c)
+        throws Exception {
+      Instant now = Instant.now();
+      return Watch.Growth.PollResult.incomplete(
+              now, getAllTopicPartitions(kafkaConsumerFactoryFn, kafkaConsumerConfig, topics))
+          .withWatermark(now);
+    }
+  }
+
+  @VisibleForTesting
+  static List<TopicPartition> getAllTopicPartitions(
+      SerializableFunction<Map<String, Object>, Consumer<byte[], byte[]>> kafkaConsumerFactoryFn,
+      Map<String, Object> kafkaConsumerConfig,
+      List<String> topics) {
+    List<TopicPartition> current = new ArrayList<>();
+    try (Consumer<byte[], byte[]> kafkaConsumer =
+        kafkaConsumerFactoryFn.apply(kafkaConsumerConfig)) {
+      if (topics != null && !topics.isEmpty()) {
+        for (String topic : topics) {
+          for (PartitionInfo partition : kafkaConsumer.partitionsFor(topic)) {

Review Comment:
   Based on some reading and experimentation, this won't throw an exception, but will actually create an empty single partition topic, which appears to be the correct Kafka default behavior



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] johnjcasey commented on pull request #21752: Feature/beam 13852 reimplement with dynamic read

Posted by GitBox <gi...@apache.org>.
johnjcasey commented on PR #21752:
URL: https://github.com/apache/beam/pull/21752#issuecomment-1150271703

   Run Java Precommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] asf-ci commented on pull request #21752: Feature/beam 13852 reimplement with dynamic read

Posted by GitBox <gi...@apache.org>.
asf-ci commented on PR #21752:
URL: https://github.com/apache/beam/pull/21752#issuecomment-1150184526

   Can one of the admins verify this patch?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] damccorm commented on a diff in pull request #21752: Feature/beam 13852 reimplement with dynamic read

Posted by GitBox <gi...@apache.org>.
damccorm commented on code in PR #21752:
URL: https://github.com/apache/beam/pull/21752#discussion_r905402610


##########
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/WatchForKafkaTopicPartitions.java:
##########
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.kafka;
+
+import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.MoreObjects.firstNonNull;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import org.apache.beam.sdk.metrics.Counter;
+import org.apache.beam.sdk.metrics.Metrics;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.Impulse;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.transforms.Watch;
+import org.apache.beam.sdk.transforms.Watch.Growth.PollFn;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PBegin;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
+import org.checkerframework.checker.nullness.qual.Nullable;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+
+class WatchForKafkaTopicPartitions extends PTransform<PBegin, PCollection<KafkaSourceDescriptor>> {

Review Comment:
   Should we be including high level class header comments? Seems like the convention is mixed on this, so I'm asking not requesting



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] johnjcasey commented on a diff in pull request #21752: Feature/beam 13852 reimplement with dynamic read

Posted by GitBox <gi...@apache.org>.
johnjcasey commented on code in PR #21752:
URL: https://github.com/apache/beam/pull/21752#discussion_r906032104


##########
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/WatchForKafkaTopicPartitions.java:
##########
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.kafka;
+
+import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.MoreObjects.firstNonNull;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import org.apache.beam.sdk.metrics.Counter;
+import org.apache.beam.sdk.metrics.Metrics;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.Impulse;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.transforms.Watch;
+import org.apache.beam.sdk.transforms.Watch.Growth.PollFn;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PBegin;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
+import org.checkerframework.checker.nullness.qual.Nullable;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+
+class WatchForKafkaTopicPartitions extends PTransform<PBegin, PCollection<KafkaSourceDescriptor>> {

Review Comment:
   agreed, I'll add some comments



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] johnjcasey commented on a diff in pull request #21752: Feature/beam 13852 reimplement with dynamic read

Posted by GitBox <gi...@apache.org>.
johnjcasey commented on code in PR #21752:
URL: https://github.com/apache/beam/pull/21752#discussion_r903979119


##########
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/WatchForKafkaTopicPartitions.java:
##########
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.kafka;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import org.apache.beam.sdk.metrics.Counter;
+import org.apache.beam.sdk.metrics.Metrics;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.Impulse;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.transforms.Watch;
+import org.apache.beam.sdk.transforms.Watch.Growth.PollFn;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PBegin;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
+import org.checkerframework.checker.nullness.qual.Nullable;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+
+public class WatchForKafkaTopicPartitions

Review Comment:
   Good catch, I'll reduce the scope



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] pabloem commented on pull request #21752: Feature/beam 13852 reimplement with dynamic read

Posted by GitBox <gi...@apache.org>.
pabloem commented on PR #21752:
URL: https://github.com/apache/beam/pull/21752#issuecomment-1170135540

   ugggghhh I don't know why this got closed just now


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] pabloem commented on a diff in pull request #21752: Feature/beam 13852 reimplement with dynamic read

Posted by GitBox <gi...@apache.org>.
pabloem commented on code in PR #21752:
URL: https://github.com/apache/beam/pull/21752#discussion_r908746952


##########
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java:
##########
@@ -1473,7 +1471,7 @@ public PCollection<KafkaRecord<K, V>> expand(PBegin input) {
         }
         PCollection<KafkaSourceDescriptor> output;
         if (kafkaRead.isDynamicRead()) {
-          Set<String> topics = new HashSet<>();
+          List<String> topics = new ArrayList<>();

Review Comment:
   Well, I just think we'll likely have duplicates if we use a List instead of a Set, because in line 1480 we're listing TopicPartitions and appending Topics to the list, no?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] johnjcasey commented on pull request #21752: Feature/beam 13852 reimplement with dynamic read

Posted by GitBox <gi...@apache.org>.
johnjcasey commented on PR #21752:
URL: https://github.com/apache/beam/pull/21752#issuecomment-1151399597

   run java postcommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] Abacn commented on a diff in pull request #21752: Feature/beam 13852 reimplement with dynamic read

Posted by GitBox <gi...@apache.org>.
Abacn commented on code in PR #21752:
URL: https://github.com/apache/beam/pull/21752#discussion_r904195307


##########
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/WatchForKafkaTopicPartitions.java:
##########
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.kafka;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import org.apache.beam.sdk.metrics.Counter;
+import org.apache.beam.sdk.metrics.Metrics;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.Impulse;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.transforms.Watch;
+import org.apache.beam.sdk.transforms.Watch.Growth.PollFn;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PBegin;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
+import org.checkerframework.checker.nullness.qual.Nullable;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+
+public class WatchForKafkaTopicPartitions
+    extends PTransform<PBegin, PCollection<KafkaSourceDescriptor>> {
+
+  private static final Duration DEFAULT_CHECK_DURATION = Duration.standardHours(1);
+  private static final String COUNTER_NAMESPACE = "watch_kafka_topic_partition";
+
+  private final Duration checkDuration;
+  private final SerializableFunction<Map<String, Object>, Consumer<byte[], byte[]>>
+      kafkaConsumerFactoryFn;
+  private final Map<String, Object> kafkaConsumerConfig;
+  private final @Nullable SerializableFunction<TopicPartition, Boolean> checkStopReadingFn;
+  private final List<String> topics;
+  private final @Nullable Instant startReadTime;
+  private final @Nullable Instant stopReadTime;
+
+  public WatchForKafkaTopicPartitions(
+      @Nullable Duration checkDuration,
+      SerializableFunction<Map<String, Object>, Consumer<byte[], byte[]>> kafkaConsumerFactoryFn,
+      Map<String, Object> kafkaConsumerConfig,
+      @Nullable SerializableFunction<TopicPartition, Boolean> checkStopReadingFn,
+      List<String> topics,
+      @Nullable Instant startReadTime,
+      @Nullable Instant stopReadTime) {
+    this.checkDuration = checkDuration == null ? DEFAULT_CHECK_DURATION : checkDuration;

Review Comment:
   sorry, it is guava's firstNonNull: `import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.MoreObjects.firstNonNull`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] pabloem commented on a diff in pull request #21752: Feature/beam 13852 reimplement with dynamic read

Posted by GitBox <gi...@apache.org>.
pabloem commented on code in PR #21752:
URL: https://github.com/apache/beam/pull/21752#discussion_r907664592


##########
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java:
##########
@@ -1473,7 +1471,7 @@ public PCollection<KafkaRecord<K, V>> expand(PBegin input) {
         }
         PCollection<KafkaSourceDescriptor> output;
         if (kafkaRead.isDynamicRead()) {
-          Set<String> topics = new HashSet<>();
+          List<String> topics = new ArrayList<>();

Review Comment:
   why is this a list instead?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] johnjcasey commented on pull request #21752: Feature/beam 13852 reimplement with dynamic read

Posted by GitBox <gi...@apache.org>.
johnjcasey commented on PR #21752:
URL: https://github.com/apache/beam/pull/21752#issuecomment-1170209813

   retest this please


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] johnjcasey commented on pull request #21752: Feature/beam 13852 reimplement with dynamic read

Posted by GitBox <gi...@apache.org>.
johnjcasey commented on PR #21752:
URL: https://github.com/apache/beam/pull/21752#issuecomment-1161842549

   Run Java_Kafka_IO_Direct PreCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] Abacn commented on pull request #21752: Feature/beam 13852 reimplement with dynamic read

Posted by GitBox <gi...@apache.org>.
Abacn commented on PR #21752:
URL: https://github.com/apache/beam/pull/21752#issuecomment-1163129513

   Thanks! `./gradlew :sdks:java:io:kafka:integrationTest` failed locally with 
   ```
   java.lang.IllegalArgumentException: Missing required value for [public abstract java.lang.Integer org.apache.beam.sdk.io.kafka.KafkaIOIT$Options.getReadTimeout(), "Time to wait for the events to be processed by the read pipeline (in seconds)"]. 
   	at ...
   	at org.apache.beam.sdk.io.kafka.KafkaIOIT.setup(KafkaIOIT.java:148)
           ...
   ```
   Taking a look at jenkins build the integration test is not executed in postcommit. Maybe need to define a postcommit task which depends on integration tests (like [here](https://github.com/apache/beam/blob/b3c410a54d7f4f3ebaaafb364801683563a8cbc6/sdks/java/io/google-cloud-platform/build.gradle#L233))?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] johnjcasey commented on a diff in pull request #21752: Feature/beam 13852 reimplement with dynamic read

Posted by GitBox <gi...@apache.org>.
johnjcasey commented on code in PR #21752:
URL: https://github.com/apache/beam/pull/21752#discussion_r908529896


##########
sdks/java/io/kafka/build.gradle:
##########
@@ -135,7 +144,8 @@ kafkaVersions.each {kv ->
     include '**/KafkaIOIT.class'
 
     filter {
-      includeTestsMatching "*InBatch"
+      excludeTestsMatching "*InStreaming"
+      if (!(kv.key in sdfKafkaVersions)) excludeTestsMatching "*DynamicPartitions" //admin client create partitions does not exist in kafka 0.11.0.3 and kafka sdf does not appear to work for kafka versions <2.0.1

Review Comment:
   These get run by Run Java_Kafka_IO_Direct PreCommit



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] pabloem commented on pull request #21752: Feature/beam 13852 reimplement with dynamic read

Posted by GitBox <gi...@apache.org>.
pabloem commented on PR #21752:
URL: https://github.com/apache/beam/pull/21752#issuecomment-1170133859

   LGTM. I'll merge once we have tests passing


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] johnjcasey commented on pull request #21752: Feature/beam 13852 reimplement with dynamic read

Posted by GitBox <gi...@apache.org>.
johnjcasey commented on PR #21752:
URL: https://github.com/apache/beam/pull/21752#issuecomment-1151487919

   run java postcommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] johnjcasey commented on pull request #21752: Feature/beam 13852 reimplement with dynamic read

Posted by GitBox <gi...@apache.org>.
johnjcasey commented on PR #21752:
URL: https://github.com/apache/beam/pull/21752#issuecomment-1151532971

   run java postcommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] johnjcasey commented on pull request #21752: Feature/beam 13852 reimplement with dynamic read

Posted by GitBox <gi...@apache.org>.
johnjcasey commented on PR #21752:
URL: https://github.com/apache/beam/pull/21752#issuecomment-1162025323

   
   run java postcommit
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] Abacn commented on a diff in pull request #21752: Feature/beam 13852 reimplement with dynamic read

Posted by GitBox <gi...@apache.org>.
Abacn commented on code in PR #21752:
URL: https://github.com/apache/beam/pull/21752#discussion_r903851736


##########
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/WatchForKafkaTopicPartitions.java:
##########
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.kafka;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import org.apache.beam.sdk.metrics.Counter;
+import org.apache.beam.sdk.metrics.Metrics;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.Impulse;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.transforms.Watch;
+import org.apache.beam.sdk.transforms.Watch.Growth.PollFn;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PBegin;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
+import org.checkerframework.checker.nullness.qual.Nullable;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+
+public class WatchForKafkaTopicPartitions

Review Comment:
   The original WatchKafkaTopicPartitionDoFn was package private and here this is a public transform. Is it necessary? if decided to introducing a new public ptransform, WatchForKafkaTopicPartitions class should also comes with javadoc.



##########
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/WatchForKafkaTopicPartitions.java:
##########
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.kafka;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import org.apache.beam.sdk.metrics.Counter;
+import org.apache.beam.sdk.metrics.Metrics;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.Impulse;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.transforms.Watch;
+import org.apache.beam.sdk.transforms.Watch.Growth.PollFn;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PBegin;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
+import org.checkerframework.checker.nullness.qual.Nullable;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+
+public class WatchForKafkaTopicPartitions
+    extends PTransform<PBegin, PCollection<KafkaSourceDescriptor>> {
+
+  private static final Duration DEFAULT_CHECK_DURATION = Duration.standardHours(1);
+  private static final String COUNTER_NAMESPACE = "watch_kafka_topic_partition";
+
+  private final Duration checkDuration;
+  private final SerializableFunction<Map<String, Object>, Consumer<byte[], byte[]>>
+      kafkaConsumerFactoryFn;
+  private final Map<String, Object> kafkaConsumerConfig;
+  private final @Nullable SerializableFunction<TopicPartition, Boolean> checkStopReadingFn;
+  private final List<String> topics;
+  private final @Nullable Instant startReadTime;
+  private final @Nullable Instant stopReadTime;
+
+  public WatchForKafkaTopicPartitions(
+      @Nullable Duration checkDuration,
+      SerializableFunction<Map<String, Object>, Consumer<byte[], byte[]>> kafkaConsumerFactoryFn,
+      Map<String, Object> kafkaConsumerConfig,
+      @Nullable SerializableFunction<TopicPartition, Boolean> checkStopReadingFn,
+      List<String> topics,
+      @Nullable Instant startReadTime,
+      @Nullable Instant stopReadTime) {
+    this.checkDuration = checkDuration == null ? DEFAULT_CHECK_DURATION : checkDuration;

Review Comment:
   minor: could use firstNonNull



##########
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/WatchForKafkaTopicPartitions.java:
##########
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.kafka;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import org.apache.beam.sdk.metrics.Counter;
+import org.apache.beam.sdk.metrics.Metrics;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.Impulse;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.transforms.Watch;
+import org.apache.beam.sdk.transforms.Watch.Growth.PollFn;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PBegin;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
+import org.checkerframework.checker.nullness.qual.Nullable;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+
+public class WatchForKafkaTopicPartitions
+    extends PTransform<PBegin, PCollection<KafkaSourceDescriptor>> {
+
+  private static final Duration DEFAULT_CHECK_DURATION = Duration.standardHours(1);
+  private static final String COUNTER_NAMESPACE = "watch_kafka_topic_partition";
+
+  private final Duration checkDuration;
+  private final SerializableFunction<Map<String, Object>, Consumer<byte[], byte[]>>
+      kafkaConsumerFactoryFn;
+  private final Map<String, Object> kafkaConsumerConfig;
+  private final @Nullable SerializableFunction<TopicPartition, Boolean> checkStopReadingFn;
+  private final List<String> topics;
+  private final @Nullable Instant startReadTime;
+  private final @Nullable Instant stopReadTime;
+
+  public WatchForKafkaTopicPartitions(
+      @Nullable Duration checkDuration,
+      SerializableFunction<Map<String, Object>, Consumer<byte[], byte[]>> kafkaConsumerFactoryFn,
+      Map<String, Object> kafkaConsumerConfig,
+      @Nullable SerializableFunction<TopicPartition, Boolean> checkStopReadingFn,
+      List<String> topics,
+      @Nullable Instant startReadTime,
+      @Nullable Instant stopReadTime) {
+    this.checkDuration = checkDuration == null ? DEFAULT_CHECK_DURATION : checkDuration;
+    this.kafkaConsumerFactoryFn = kafkaConsumerFactoryFn;
+    this.kafkaConsumerConfig = kafkaConsumerConfig;
+    this.checkStopReadingFn = checkStopReadingFn;
+    this.topics = topics;
+    this.startReadTime = startReadTime;
+    this.stopReadTime = stopReadTime;
+  }
+
+  @Override
+  public PCollection<KafkaSourceDescriptor> expand(PBegin input) {
+    return input
+        .apply(Impulse.create())
+        .apply(
+            "Match new TopicPartitions",
+            Watch.growthOf(new WatchPartitionFn()).withPollInterval(checkDuration))
+        .apply(ParDo.of(new ConvertToDescriptor()));
+  }
+
+  private class ConvertToDescriptor

Review Comment:
   Consider make inner class static per the guideline of DoFn [here](https://github.com/apache/beam/blob/adac54e1fa4f8f5915b47b16fa7b343b29304c07/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java#L298). May need to change reference-to-outer class to constructor params.



##########
sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOIT.java:
##########
@@ -114,6 +130,17 @@ public class KafkaIOIT {
 
   @Rule public TestPipeline readPipeline = TestPipeline.create();
 
+  private static ExperimentalOptions pipelineOptions;

Review Comment:
   consider naming `sdfPipelineOptions` since it is used only by `sdfReadPipeline` and put the initialization in setUp



##########
sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOIT.java:
##########
@@ -259,8 +272,131 @@ public void processElement(
               return null;
             });
 
+    PipelineResult writeResult = writePipeline.run();
+    writeResult.waitUntilFinish();
+
     PipelineResult readResult = readPipeline.run();
-    readResult.waitUntilFinish();
+    PipelineResult.State readState =
+        readResult.waitUntilFinish(Duration.standardSeconds(options.getReadTimeout()));
+
+    cancelIfTimeouted(readResult, readState);
+  }
+
+  @Test
+  public void testKafkaWithDynamicPartitions() throws IOException {
+    AdminClient client =
+        AdminClient.create(
+            ImmutableMap.of("bootstrap.servers", options.getKafkaBootstrapServerAddresses()));
+    String topicName = "DynamicTopicPartition-" + UUID.randomUUID();
+    Map<Integer, String> records = new HashMap<>();
+    for (int i = 0; i < 100; i++) {
+      records.put(i, String.valueOf(i));
+    }
+    Map<Integer, String> moreRecords = new HashMap<>();
+    for (int i = 100; i < 200; i++) {
+      moreRecords.put(i, String.valueOf(i));
+    }
+    try {
+      client.createTopics(ImmutableSet.of(new NewTopic(topicName, 1, (short) 1)));
+      client.createPartitions(ImmutableMap.of(topicName, NewPartitions.increaseTo(1)));
+
+      writePipeline
+          .apply("Generate Write Elements", Create.of(records))
+          .apply(
+              "Write to Kafka",
+              KafkaIO.<Integer, String>write()
+                  .withBootstrapServers(options.getKafkaBootstrapServerAddresses())
+                  .withTopic(topicName)
+                  .withKeySerializer(IntegerSerializer.class)
+                  .withValueSerializer(StringSerializer.class));
+
+      writePipeline.run().waitUntilFinish(Duration.standardSeconds(15));
+
+      System.out.println("after write 1");
+
+      Thread delayedWriteThread =
+          new Thread(
+              () -> {
+                try {
+                  Thread.sleep(20 * 1000); // wait 20 seconds before changing kafka

Review Comment:
   I was experiencing flakes using Thread inside tests because wait time is often out of control in different test machines. What is the purpose of waiting 20 seconds here, and is it robust?



##########
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/WatchForKafkaTopicPartitions.java:
##########
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.kafka;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import org.apache.beam.sdk.metrics.Counter;
+import org.apache.beam.sdk.metrics.Metrics;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.Impulse;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.transforms.Watch;
+import org.apache.beam.sdk.transforms.Watch.Growth.PollFn;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PBegin;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
+import org.checkerframework.checker.nullness.qual.Nullable;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+
+public class WatchForKafkaTopicPartitions
+    extends PTransform<PBegin, PCollection<KafkaSourceDescriptor>> {
+
+  private static final Duration DEFAULT_CHECK_DURATION = Duration.standardHours(1);
+  private static final String COUNTER_NAMESPACE = "watch_kafka_topic_partition";
+
+  private final Duration checkDuration;
+  private final SerializableFunction<Map<String, Object>, Consumer<byte[], byte[]>>
+      kafkaConsumerFactoryFn;
+  private final Map<String, Object> kafkaConsumerConfig;
+  private final @Nullable SerializableFunction<TopicPartition, Boolean> checkStopReadingFn;
+  private final List<String> topics;
+  private final @Nullable Instant startReadTime;
+  private final @Nullable Instant stopReadTime;
+
+  public WatchForKafkaTopicPartitions(
+      @Nullable Duration checkDuration,
+      SerializableFunction<Map<String, Object>, Consumer<byte[], byte[]>> kafkaConsumerFactoryFn,
+      Map<String, Object> kafkaConsumerConfig,
+      @Nullable SerializableFunction<TopicPartition, Boolean> checkStopReadingFn,
+      List<String> topics,
+      @Nullable Instant startReadTime,
+      @Nullable Instant stopReadTime) {
+    this.checkDuration = checkDuration == null ? DEFAULT_CHECK_DURATION : checkDuration;
+    this.kafkaConsumerFactoryFn = kafkaConsumerFactoryFn;
+    this.kafkaConsumerConfig = kafkaConsumerConfig;
+    this.checkStopReadingFn = checkStopReadingFn;
+    this.topics = topics;
+    this.startReadTime = startReadTime;
+    this.stopReadTime = stopReadTime;
+  }
+
+  @Override
+  public PCollection<KafkaSourceDescriptor> expand(PBegin input) {
+    return input
+        .apply(Impulse.create())
+        .apply(
+            "Match new TopicPartitions",
+            Watch.growthOf(new WatchPartitionFn()).withPollInterval(checkDuration))
+        .apply(ParDo.of(new ConvertToDescriptor()));
+  }
+
+  private class ConvertToDescriptor
+      extends DoFn<KV<byte[], TopicPartition>, KafkaSourceDescriptor> {
+    @ProcessElement
+    public void processElement(
+        @Element KV<byte[], TopicPartition> partition,
+        OutputReceiver<KafkaSourceDescriptor> receiver) {
+      TopicPartition topicPartition = Objects.requireNonNull(partition.getValue());
+      if (checkStopReadingFn == null || !checkStopReadingFn.apply(topicPartition)) {
+        Counter foundedTopicPartition =
+            Metrics.counter(COUNTER_NAMESPACE, topicPartition.toString());
+        foundedTopicPartition.inc();
+        receiver.output(
+            KafkaSourceDescriptor.of(
+                topicPartition, null, startReadTime, null, stopReadTime, null));
+      }
+    }
+  }
+
+  private class WatchPartitionFn extends PollFn<byte[], TopicPartition> {

Review Comment:
   consider static class also here



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] johnjcasey commented on pull request #21752: Feature/beam 13852 reimplement with dynamic read

Posted by GitBox <gi...@apache.org>.
johnjcasey commented on PR #21752:
URL: https://github.com/apache/beam/pull/21752#issuecomment-1157937973

   Run Java_Examples_Dataflow PreCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] Abacn commented on a diff in pull request #21752: Feature/beam 13852 reimplement with dynamic read

Posted by GitBox <gi...@apache.org>.
Abacn commented on code in PR #21752:
URL: https://github.com/apache/beam/pull/21752#discussion_r905429495


##########
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/WatchForKafkaTopicPartitions.java:
##########
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.kafka;
+
+import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.MoreObjects.firstNonNull;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import org.apache.beam.sdk.metrics.Counter;
+import org.apache.beam.sdk.metrics.Metrics;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.Impulse;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.transforms.Watch;
+import org.apache.beam.sdk.transforms.Watch.Growth.PollFn;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PBegin;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
+import org.checkerframework.checker.nullness.qual.Nullable;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+
+class WatchForKafkaTopicPartitions extends PTransform<PBegin, PCollection<KafkaSourceDescriptor>> {

Review Comment:
   Yeah agree, though this is a package-private class, it is nice to have some brief comment (like the discarded WatchKafkaTopicPartitionDoFn)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] johnjcasey commented on pull request #21752: Feature/beam 13852 reimplement with dynamic read

Posted by GitBox <gi...@apache.org>.
johnjcasey commented on PR #21752:
URL: https://github.com/apache/beam/pull/21752#issuecomment-1158877374

   Run Java_Kafka_IO_Direct PreCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] johnjcasey commented on pull request #21752: Feature/beam 13852 reimplement with dynamic read

Posted by GitBox <gi...@apache.org>.
johnjcasey commented on PR #21752:
URL: https://github.com/apache/beam/pull/21752#issuecomment-1162047436

   Run Java_Examples_Dataflow_Java17 PreCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] pabloem merged pull request #21752: Feature/beam 13852 reimplement with dynamic read

Posted by GitBox <gi...@apache.org>.
pabloem merged PR #21752:
URL: https://github.com/apache/beam/pull/21752


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] pabloem closed pull request #21752: Feature/beam 13852 reimplement with dynamic read

Posted by GitBox <gi...@apache.org>.
pabloem closed pull request #21752: Feature/beam 13852 reimplement with dynamic read
URL: https://github.com/apache/beam/pull/21752


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] johnjcasey commented on a diff in pull request #21752: Feature/beam 13852 reimplement with dynamic read

Posted by GitBox <gi...@apache.org>.
johnjcasey commented on code in PR #21752:
URL: https://github.com/apache/beam/pull/21752#discussion_r908817356


##########
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/WatchForKafkaTopicPartitions.java:
##########
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.kafka;
+
+import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.MoreObjects.firstNonNull;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.metrics.Counter;
+import org.apache.beam.sdk.metrics.Metrics;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.Impulse;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.transforms.Watch;
+import org.apache.beam.sdk.transforms.Watch.Growth.PollFn;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PBegin;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
+import org.checkerframework.checker.nullness.qual.Nullable;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+
+/**
+ * A {@link PTransform} for continuously querying Kafka for new partitions, and emitting those
+ * topics as {@link KafkaSourceDescriptor} This transform is implemented using the {@link Watch}
+ * transform, and modifications to this transform should keep that in mind.
+ *
+ * <p>Please see
+ * https://docs.google.com/document/d/1Io49s5LBs29HJyppKG3AlR-gHz5m5PC6CqO0CCoSqLs/edit?usp=sharing
+ * for design details
+ */
+@Experimental
+class WatchForKafkaTopicPartitions extends PTransform<PBegin, PCollection<KafkaSourceDescriptor>> {
+
+  private static final Duration DEFAULT_CHECK_DURATION = Duration.standardHours(1);
+  private static final String COUNTER_NAMESPACE = "watch_kafka_topic_partition";
+
+  private final Duration checkDuration;
+  private final SerializableFunction<Map<String, Object>, Consumer<byte[], byte[]>>
+      kafkaConsumerFactoryFn;
+  private final Map<String, Object> kafkaConsumerConfig;
+  private final @Nullable SerializableFunction<TopicPartition, Boolean> checkStopReadingFn;
+  private final List<String> topics;
+  private final @Nullable Instant startReadTime;
+  private final @Nullable Instant stopReadTime;
+
+  public WatchForKafkaTopicPartitions(
+      @Nullable Duration checkDuration,
+      SerializableFunction<Map<String, Object>, Consumer<byte[], byte[]>> kafkaConsumerFactoryFn,
+      Map<String, Object> kafkaConsumerConfig,
+      @Nullable SerializableFunction<TopicPartition, Boolean> checkStopReadingFn,
+      List<String> topics,
+      @Nullable Instant startReadTime,
+      @Nullable Instant stopReadTime) {
+    this.checkDuration = firstNonNull(checkDuration, DEFAULT_CHECK_DURATION);
+    this.kafkaConsumerFactoryFn = kafkaConsumerFactoryFn;
+    this.kafkaConsumerConfig = kafkaConsumerConfig;
+    this.checkStopReadingFn = checkStopReadingFn;
+    this.topics = topics;
+    this.startReadTime = startReadTime;
+    this.stopReadTime = stopReadTime;
+  }
+
+  @Override
+  public PCollection<KafkaSourceDescriptor> expand(PBegin input) {
+    return input
+        .apply(Impulse.create())
+        .apply(
+            "Match new TopicPartitions",
+            Watch.growthOf(
+                    new WatchPartitionFn(kafkaConsumerFactoryFn, kafkaConsumerConfig, topics))
+                .withPollInterval(checkDuration))
+        .apply(ParDo.of(new ConvertToDescriptor(checkStopReadingFn, startReadTime, stopReadTime)));
+  }
+
+  private static class ConvertToDescriptor
+      extends DoFn<KV<byte[], TopicPartition>, KafkaSourceDescriptor> {
+
+    private final @Nullable SerializableFunction<TopicPartition, Boolean> checkStopReadingFn;
+    private final @Nullable Instant startReadTime;
+    private final @Nullable Instant stopReadTime;
+
+    private ConvertToDescriptor(
+        @Nullable SerializableFunction<TopicPartition, Boolean> checkStopReadingFn,
+        @Nullable Instant startReadTime,
+        @Nullable Instant stopReadTime) {
+      this.checkStopReadingFn = checkStopReadingFn;
+      this.startReadTime = startReadTime;
+      this.stopReadTime = stopReadTime;
+    }
+
+    @ProcessElement
+    public void processElement(
+        @Element KV<byte[], TopicPartition> partition,
+        OutputReceiver<KafkaSourceDescriptor> receiver) {
+      TopicPartition topicPartition = Objects.requireNonNull(partition.getValue());
+      if (checkStopReadingFn == null || !checkStopReadingFn.apply(topicPartition)) {
+        Counter foundedTopicPartition =
+            Metrics.counter(COUNTER_NAMESPACE, topicPartition.toString());
+        foundedTopicPartition.inc();

Review Comment:
   good to know. A few thousand should be fine for the count of total partitions



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] johnjcasey commented on pull request #21752: Feature/beam 13852 reimplement with dynamic read

Posted by GitBox <gi...@apache.org>.
johnjcasey commented on PR #21752:
URL: https://github.com/apache/beam/pull/21752#issuecomment-1158049681

   run java postcommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] johnjcasey commented on pull request #21752: Feature/beam 13852 reimplement with dynamic read

Posted by GitBox <gi...@apache.org>.
johnjcasey commented on PR #21752:
URL: https://github.com/apache/beam/pull/21752#issuecomment-1150271927

   Run Java_Examples_Dataflow PreCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] johnjcasey commented on a diff in pull request #21752: Feature/beam 13852 reimplement with dynamic read

Posted by GitBox <gi...@apache.org>.
johnjcasey commented on code in PR #21752:
URL: https://github.com/apache/beam/pull/21752#discussion_r908531791


##########
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java:
##########
@@ -1473,7 +1471,7 @@ public PCollection<KafkaRecord<K, V>> expand(PBegin input) {
         }
         PCollection<KafkaSourceDescriptor> output;
         if (kafkaRead.isDynamicRead()) {
-          Set<String> topics = new HashSet<>();
+          List<String> topics = new ArrayList<>();

Review Comment:
   In the original implementation, it was a set that got converted to a list. I just changed it to start as a list, but it could be a set just as easily



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] johnjcasey commented on a diff in pull request #21752: Feature/beam 13852 reimplement with dynamic read

Posted by GitBox <gi...@apache.org>.
johnjcasey commented on code in PR #21752:
URL: https://github.com/apache/beam/pull/21752#discussion_r908816708


##########
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java:
##########
@@ -1473,7 +1471,7 @@ public PCollection<KafkaRecord<K, V>> expand(PBegin input) {
         }
         PCollection<KafkaSourceDescriptor> output;
         if (kafkaRead.isDynamicRead()) {
-          Set<String> topics = new HashSet<>();
+          List<String> topics = new ArrayList<>();

Review Comment:
   true. I'll change it to a set everywhere



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] johnjcasey commented on pull request #21752: Feature/beam 13852 reimplement with dynamic read

Posted by GitBox <gi...@apache.org>.
johnjcasey commented on PR #21752:
URL: https://github.com/apache/beam/pull/21752#issuecomment-1164642173

   run java precommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] johnjcasey commented on pull request #21752: Feature/beam 13852 reimplement with dynamic read

Posted by GitBox <gi...@apache.org>.
johnjcasey commented on PR #21752:
URL: https://github.com/apache/beam/pull/21752#issuecomment-1162301939

   Run Java_Kafka_IO_Direct PreCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] Abacn commented on pull request #21752: Feature/beam 13852 reimplement with dynamic read

Posted by GitBox <gi...@apache.org>.
Abacn commented on PR #21752:
URL: https://github.com/apache/beam/pull/21752#issuecomment-1163131011

   > Java_Kafka_IO_Direct
   
   Sorry, I see it is passed in `Java_Kafka_IO_Direct`.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] johnjcasey commented on pull request #21752: Feature/beam 13852 reimplement with dynamic read

Posted by GitBox <gi...@apache.org>.
johnjcasey commented on PR #21752:
URL: https://github.com/apache/beam/pull/21752#issuecomment-1164768459

   retest this please


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] johnjcasey commented on pull request #21752: Feature/beam 13852 reimplement with dynamic read

Posted by GitBox <gi...@apache.org>.
johnjcasey commented on PR #21752:
URL: https://github.com/apache/beam/pull/21752#issuecomment-1151191017

   Run Java_Examples_Dataflow_Java11 PreCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] johnjcasey commented on pull request #21752: Feature/beam 13852 reimplement with dynamic read

Posted by GitBox <gi...@apache.org>.
johnjcasey commented on PR #21752:
URL: https://github.com/apache/beam/pull/21752#issuecomment-1155260310

   run java postcommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] johnjcasey commented on pull request #21752: Feature/beam 13852 reimplement with dynamic read

Posted by GitBox <gi...@apache.org>.
johnjcasey commented on PR #21752:
URL: https://github.com/apache/beam/pull/21752#issuecomment-1157735008

   run java postcommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] johnjcasey commented on pull request #21752: Feature/beam 13852 reimplement with dynamic read

Posted by GitBox <gi...@apache.org>.
johnjcasey commented on PR #21752:
URL: https://github.com/apache/beam/pull/21752#issuecomment-1150272201

   Run Python_PVR_Flink PreCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] johnjcasey commented on pull request #21752: Feature/beam 13852 reimplement with dynamic read

Posted by GitBox <gi...@apache.org>.
johnjcasey commented on PR #21752:
URL: https://github.com/apache/beam/pull/21752#issuecomment-1151190775

   Run Java_Examples_Dataflow PreCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] johnjcasey commented on pull request #21752: Feature/beam 13852 reimplement with dynamic read

Posted by GitBox <gi...@apache.org>.
johnjcasey commented on PR #21752:
URL: https://github.com/apache/beam/pull/21752#issuecomment-1151190445

   Run Java PreCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] pabloem commented on a diff in pull request #21752: Feature/beam 13852 reimplement with dynamic read

Posted by GitBox <gi...@apache.org>.
pabloem commented on code in PR #21752:
URL: https://github.com/apache/beam/pull/21752#discussion_r908780163


##########
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/WatchForKafkaTopicPartitions.java:
##########
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.kafka;
+
+import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.MoreObjects.firstNonNull;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.metrics.Counter;
+import org.apache.beam.sdk.metrics.Metrics;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.Impulse;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.transforms.Watch;
+import org.apache.beam.sdk.transforms.Watch.Growth.PollFn;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PBegin;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
+import org.checkerframework.checker.nullness.qual.Nullable;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+
+/**
+ * A {@link PTransform} for continuously querying Kafka for new partitions, and emitting those
+ * topics as {@link KafkaSourceDescriptor} This transform is implemented using the {@link Watch}
+ * transform, and modifications to this transform should keep that in mind.
+ *
+ * <p>Please see
+ * https://docs.google.com/document/d/1Io49s5LBs29HJyppKG3AlR-gHz5m5PC6CqO0CCoSqLs/edit?usp=sharing
+ * for design details
+ */
+@Experimental
+class WatchForKafkaTopicPartitions extends PTransform<PBegin, PCollection<KafkaSourceDescriptor>> {
+
+  private static final Duration DEFAULT_CHECK_DURATION = Duration.standardHours(1);
+  private static final String COUNTER_NAMESPACE = "watch_kafka_topic_partition";
+
+  private final Duration checkDuration;
+  private final SerializableFunction<Map<String, Object>, Consumer<byte[], byte[]>>
+      kafkaConsumerFactoryFn;
+  private final Map<String, Object> kafkaConsumerConfig;
+  private final @Nullable SerializableFunction<TopicPartition, Boolean> checkStopReadingFn;
+  private final List<String> topics;
+  private final @Nullable Instant startReadTime;
+  private final @Nullable Instant stopReadTime;
+
+  public WatchForKafkaTopicPartitions(
+      @Nullable Duration checkDuration,
+      SerializableFunction<Map<String, Object>, Consumer<byte[], byte[]>> kafkaConsumerFactoryFn,
+      Map<String, Object> kafkaConsumerConfig,
+      @Nullable SerializableFunction<TopicPartition, Boolean> checkStopReadingFn,
+      List<String> topics,
+      @Nullable Instant startReadTime,
+      @Nullable Instant stopReadTime) {
+    this.checkDuration = firstNonNull(checkDuration, DEFAULT_CHECK_DURATION);
+    this.kafkaConsumerFactoryFn = kafkaConsumerFactoryFn;
+    this.kafkaConsumerConfig = kafkaConsumerConfig;
+    this.checkStopReadingFn = checkStopReadingFn;
+    this.topics = topics;
+    this.startReadTime = startReadTime;
+    this.stopReadTime = stopReadTime;
+  }
+
+  @Override
+  public PCollection<KafkaSourceDescriptor> expand(PBegin input) {
+    return input
+        .apply(Impulse.create())
+        .apply(
+            "Match new TopicPartitions",
+            Watch.growthOf(
+                    new WatchPartitionFn(kafkaConsumerFactoryFn, kafkaConsumerConfig, topics))
+                .withPollInterval(checkDuration))
+        .apply(ParDo.of(new ConvertToDescriptor(checkStopReadingFn, startReadTime, stopReadTime)));
+  }
+
+  private static class ConvertToDescriptor
+      extends DoFn<KV<byte[], TopicPartition>, KafkaSourceDescriptor> {
+
+    private final @Nullable SerializableFunction<TopicPartition, Boolean> checkStopReadingFn;
+    private final @Nullable Instant startReadTime;
+    private final @Nullable Instant stopReadTime;
+
+    private ConvertToDescriptor(
+        @Nullable SerializableFunction<TopicPartition, Boolean> checkStopReadingFn,
+        @Nullable Instant startReadTime,
+        @Nullable Instant stopReadTime) {
+      this.checkStopReadingFn = checkStopReadingFn;
+      this.startReadTime = startReadTime;
+      this.stopReadTime = stopReadTime;
+    }
+
+    @ProcessElement
+    public void processElement(
+        @Element KV<byte[], TopicPartition> partition,
+        OutputReceiver<KafkaSourceDescriptor> receiver) {
+      TopicPartition topicPartition = Objects.requireNonNull(partition.getValue());
+      if (checkStopReadingFn == null || !checkStopReadingFn.apply(topicPartition)) {
+        Counter foundedTopicPartition =
+            Metrics.counter(COUNTER_NAMESPACE, topicPartition.toString());
+        foundedTopicPartition.inc();

Review Comment:
   fyi - runner metrics systems are not designed to support 'big data'. Dataflow only guarantees a few thousands of metrics being supported (which I guess is enough in this case?) so just worth keeping in mind.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] johnjcasey commented on a diff in pull request #21752: Feature/beam 13852 reimplement with dynamic read

Posted by GitBox <gi...@apache.org>.
johnjcasey commented on code in PR #21752:
URL: https://github.com/apache/beam/pull/21752#discussion_r907390796


##########
sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOIT.java:
##########
@@ -259,8 +270,117 @@ public void processElement(
               return null;
             });
 
+    PipelineResult writeResult = writePipeline.run();
+    writeResult.waitUntilFinish();
+
     PipelineResult readResult = readPipeline.run();
-    readResult.waitUntilFinish();
+    PipelineResult.State readState =
+        readResult.waitUntilFinish(Duration.standardSeconds(options.getReadTimeout()));
+
+    cancelIfTimeouted(readResult, readState);
+  }
+
+  @Test
+  public void testKafkaWithDynamicPartitions() throws IOException {
+    AdminClient client =
+        AdminClient.create(
+            ImmutableMap.of("bootstrap.servers", options.getKafkaBootstrapServerAddresses()));
+    String topicName = "DynamicTopicPartition-" + UUID.randomUUID();
+    Map<Integer, String> records = new HashMap<>();
+    for (int i = 0; i < 100; i++) {
+      records.put(i, String.valueOf(i));
+    }
+    Map<Integer, String> moreRecords = new HashMap<>();
+    for (int i = 100; i < 200; i++) {
+      moreRecords.put(i, String.valueOf(i));
+    }
+    try {
+      client.createTopics(ImmutableSet.of(new NewTopic(topicName, 1, (short) 1)));
+      client.createPartitions(ImmutableMap.of(topicName, NewPartitions.increaseTo(1)));
+
+      writePipeline
+          .apply("Generate Write Elements", Create.of(records))
+          .apply(
+              "Write to Kafka",
+              KafkaIO.<Integer, String>write()
+                  .withBootstrapServers(options.getKafkaBootstrapServerAddresses())
+                  .withTopic(topicName)
+                  .withKeySerializer(IntegerSerializer.class)
+                  .withValueSerializer(StringSerializer.class));
+
+      writePipeline.run().waitUntilFinish(Duration.standardSeconds(15));
+
+      System.out.println("after write 1");
+
+      Thread delayedWriteThread =
+          new Thread(
+              () -> {
+                try {
+                  Thread.sleep(20 * 1000); // wait 20 seconds before changing kafka
+                } catch (InterruptedException e) {
+                  throw new RuntimeException(e);
+                }
+
+                client.createPartitions(ImmutableMap.of(topicName, NewPartitions.increaseTo(2)));
+
+                writePipeline
+                    .apply("Second Pass generate Write Elements", Create.of(moreRecords))
+                    .apply(
+                        "Write more to Kafka",
+                        KafkaIO.<Integer, String>write()
+                            .withBootstrapServers(options.getKafkaBootstrapServerAddresses())
+                            .withTopic(topicName)
+                            .withKeySerializer(IntegerSerializer.class)
+                            .withValueSerializer(StringSerializer.class));
+
+                writePipeline.run().waitUntilFinish(Duration.standardSeconds(15));
+
+                System.out.println("after write 2");
+              });
+
+      delayedWriteThread.start();
+
+      PCollection<Integer> values =
+          sdfReadPipeline
+              .apply(
+                  "Read from Kafka",
+                  KafkaIO.<Integer, String>read()
+                      .withBootstrapServers(options.getKafkaBootstrapServerAddresses())
+                      .withConsumerConfigUpdates(ImmutableMap.of("auto.offset.reset", "earliest"))
+                      .withTopic(topicName)
+                      .withDynamicRead(Duration.standardSeconds(5))
+                      .withKeyDeserializer(IntegerDeserializer.class)
+                      .withValueDeserializer(StringDeserializer.class))
+              .apply("Key by Partition", ParDo.of(new KeyByPartition()))
+              .apply(Window.into(FixedWindows.of(Duration.standardMinutes(1))))
+              .apply("Group by Partition", GroupByKey.create())
+              .apply("Get Partitions", Keys.create());
+
+      PAssert.that(values).containsInAnyOrder(0, 1);
+
+      PipelineResult readResult = sdfReadPipeline.run();
+
+      State readState =
+          readResult.waitUntilFinish(Duration.standardSeconds(options.getReadTimeout() / 2));
+
+      System.out.println("after read");

Review Comment:
   my bad, this was for test debugging. I'll remove it



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] pabloem commented on pull request #21752: Feature/beam 13852 reimplement with dynamic read

Posted by GitBox <gi...@apache.org>.
pabloem commented on PR #21752:
URL: https://github.com/apache/beam/pull/21752#issuecomment-1170334378

   thakns!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] johnjcasey commented on pull request #21752: Feature/beam 13852 reimplement with dynamic read

Posted by GitBox <gi...@apache.org>.
johnjcasey commented on PR #21752:
URL: https://github.com/apache/beam/pull/21752#issuecomment-1157937849

   Run Java_Kafka_IO_Direct PreCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] Abacn commented on a diff in pull request #21752: Feature/beam 13852 reimplement with dynamic read

Posted by GitBox <gi...@apache.org>.
Abacn commented on code in PR #21752:
URL: https://github.com/apache/beam/pull/21752#discussion_r906363494


##########
sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOIT.java:
##########
@@ -259,8 +270,117 @@ public void processElement(
               return null;
             });
 
+    PipelineResult writeResult = writePipeline.run();
+    writeResult.waitUntilFinish();
+
     PipelineResult readResult = readPipeline.run();
-    readResult.waitUntilFinish();
+    PipelineResult.State readState =
+        readResult.waitUntilFinish(Duration.standardSeconds(options.getReadTimeout()));
+
+    cancelIfTimeouted(readResult, readState);
+  }
+
+  @Test
+  public void testKafkaWithDynamicPartitions() throws IOException {
+    AdminClient client =
+        AdminClient.create(
+            ImmutableMap.of("bootstrap.servers", options.getKafkaBootstrapServerAddresses()));
+    String topicName = "DynamicTopicPartition-" + UUID.randomUUID();
+    Map<Integer, String> records = new HashMap<>();
+    for (int i = 0; i < 100; i++) {
+      records.put(i, String.valueOf(i));
+    }
+    Map<Integer, String> moreRecords = new HashMap<>();
+    for (int i = 100; i < 200; i++) {
+      moreRecords.put(i, String.valueOf(i));
+    }
+    try {
+      client.createTopics(ImmutableSet.of(new NewTopic(topicName, 1, (short) 1)));
+      client.createPartitions(ImmutableMap.of(topicName, NewPartitions.increaseTo(1)));
+
+      writePipeline
+          .apply("Generate Write Elements", Create.of(records))
+          .apply(
+              "Write to Kafka",
+              KafkaIO.<Integer, String>write()
+                  .withBootstrapServers(options.getKafkaBootstrapServerAddresses())
+                  .withTopic(topicName)
+                  .withKeySerializer(IntegerSerializer.class)
+                  .withValueSerializer(StringSerializer.class));
+
+      writePipeline.run().waitUntilFinish(Duration.standardSeconds(15));
+
+      System.out.println("after write 1");
+
+      Thread delayedWriteThread =
+          new Thread(
+              () -> {
+                try {
+                  Thread.sleep(20 * 1000); // wait 20 seconds before changing kafka
+                } catch (InterruptedException e) {
+                  throw new RuntimeException(e);
+                }
+
+                client.createPartitions(ImmutableMap.of(topicName, NewPartitions.increaseTo(2)));
+
+                writePipeline
+                    .apply("Second Pass generate Write Elements", Create.of(moreRecords))
+                    .apply(
+                        "Write more to Kafka",
+                        KafkaIO.<Integer, String>write()
+                            .withBootstrapServers(options.getKafkaBootstrapServerAddresses())
+                            .withTopic(topicName)
+                            .withKeySerializer(IntegerSerializer.class)
+                            .withValueSerializer(StringSerializer.class));
+
+                writePipeline.run().waitUntilFinish(Duration.standardSeconds(15));
+
+                System.out.println("after write 2");
+              });
+
+      delayedWriteThread.start();
+
+      PCollection<Integer> values =
+          sdfReadPipeline
+              .apply(
+                  "Read from Kafka",
+                  KafkaIO.<Integer, String>read()
+                      .withBootstrapServers(options.getKafkaBootstrapServerAddresses())
+                      .withConsumerConfigUpdates(ImmutableMap.of("auto.offset.reset", "earliest"))
+                      .withTopic(topicName)
+                      .withDynamicRead(Duration.standardSeconds(5))
+                      .withKeyDeserializer(IntegerDeserializer.class)
+                      .withValueDeserializer(StringDeserializer.class))
+              .apply("Key by Partition", ParDo.of(new KeyByPartition()))
+              .apply(Window.into(FixedWindows.of(Duration.standardMinutes(1))))
+              .apply("Group by Partition", GroupByKey.create())
+              .apply("Get Partitions", Keys.create());
+
+      PAssert.that(values).containsInAnyOrder(0, 1);
+
+      PipelineResult readResult = sdfReadPipeline.run();
+
+      State readState =
+          readResult.waitUntilFinish(Duration.standardSeconds(options.getReadTimeout() / 2));
+
+      System.out.println("after read");

Review Comment:
   one last nit: may use logger instead of println; or logging is not necessary in this test.
   
   Other than that LGTM



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] johnjcasey commented on pull request #21752: Feature/beam 13852 reimplement with dynamic read

Posted by GitBox <gi...@apache.org>.
johnjcasey commented on PR #21752:
URL: https://github.com/apache/beam/pull/21752#issuecomment-1161923110

   Run Java PreCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org