You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by "chamikaramj (via GitHub)" <gi...@apache.org> on 2023/02/01 22:52:03 UTC

[GitHub] [beam] chamikaramj commented on a diff in pull request #24799: Create an Example IO to pair with the How to write an IO guide

chamikaramj commented on code in PR #24799:
URL: https://github.com/apache/beam/pull/24799#discussion_r1092645974


##########
examples/java/src/main/java/org/apache/beam/examples/io/examplekafkaread/ExampleKafkaReadIO.java:
##########
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.examples.io.examplekafkaread;
+
+import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument;
+
+import com.google.auto.value.AutoValue;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.annotations.Experimental.Kind;
+import org.apache.beam.sdk.transforms.Impulse;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PBegin;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.checkerframework.checker.nullness.qual.NonNull;
+import org.checkerframework.dataflow.qual.Pure;
+
+/**
+ * An example unbounded SDF IO, using Kafka as an underlying source. This IO deliberately has a
+ * minimal set of Kafka features, and exists as an example of how to write an unbounded SDF source.
+ *
+ * <p>This IO should not be used in pipelines, and has no guarantees for quality, correctness, or
+ * performance.
+ *
+ * <p>This IO was generated as a pared down version of KafkaIO, and should act to consume
+ * KV<byte[],byte[]> pairs from Kafka In practice, this IO would be used thusly: pipeline
+ * .apply(ExampleKafkaReadIO.read() .withBootstrapServers("broker_1:9092,broker_2:9092")
+ * .withTopics(ImmutableList.of("my_topic"))) .apply(Some Other Transforms);
+ */
+public class ExampleKafkaReadIO {
+
+  public static Read read() {
+    return new AutoValue_ExampleKafkaReadIO_Read.Builder()
+        .setTopics(new ArrayList<>())
+        .setConsumerConfig(ExampleKafkaReadIOUtils.DEFAULT_CONSUMER_PROPERTIES)
+        .build();
+  }
+
+  @AutoValue
+  @AutoValue.CopyAnnotations
+  public abstract static class Read extends PTransform<PBegin, PCollection<KV<byte[], byte[]>>> {
+
+    @Pure
+    abstract Map<String, Object> getConsumerConfig();
+
+    @Pure
+    abstract List<String> getTopics();
+
+    abstract Read.Builder toBuilder();
+
+    @Experimental(Kind.PORTABILITY)
+    @AutoValue.Builder
+    abstract static class Builder {
+      abstract Builder setConsumerConfig(Map<String, Object> config);
+
+      abstract Builder setTopics(List<String> topics);

Review Comment:
   I suggest dropping this in favor of the PConnectiorn\<TopicPartion\> input type.



##########
examples/java/src/main/java/org/apache/beam/examples/io/examplekafkaread/ExampleKafkaReadIO.java:
##########
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.examples.io.examplekafkaread;
+
+import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument;
+
+import com.google.auto.value.AutoValue;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.annotations.Experimental.Kind;
+import org.apache.beam.sdk.transforms.Impulse;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PBegin;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.checkerframework.checker.nullness.qual.NonNull;
+import org.checkerframework.dataflow.qual.Pure;
+
+/**
+ * An example unbounded SDF IO, using Kafka as an underlying source. This IO deliberately has a
+ * minimal set of Kafka features, and exists as an example of how to write an unbounded SDF source.
+ *
+ * <p>This IO should not be used in pipelines, and has no guarantees for quality, correctness, or
+ * performance.
+ *
+ * <p>This IO was generated as a pared down version of KafkaIO, and should act to consume
+ * KV<byte[],byte[]> pairs from Kafka In practice, this IO would be used thusly: pipeline
+ * .apply(ExampleKafkaReadIO.read() .withBootstrapServers("broker_1:9092,broker_2:9092")
+ * .withTopics(ImmutableList.of("my_topic"))) .apply(Some Other Transforms);
+ */
+public class ExampleKafkaReadIO {
+
+  public static Read read() {
+    return new AutoValue_ExampleKafkaReadIO_Read.Builder()
+        .setTopics(new ArrayList<>())
+        .setConsumerConfig(ExampleKafkaReadIOUtils.DEFAULT_CONSUMER_PROPERTIES)
+        .build();
+  }
+
+  @AutoValue
+  @AutoValue.CopyAnnotations
+  public abstract static class Read extends PTransform<PBegin, PCollection<KV<byte[], byte[]>>> {

Review Comment:
   Probably we should use a source that uses an actual input PCollection type for the example. How about a Kafka source that can take a PCollection\<TopicPartition\> as input ?



##########
examples/java/src/main/java/org/apache/beam/examples/io/examplekafkaread/ExampleKafkaReadIO.java:
##########
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.examples.io.examplekafkaread;
+
+import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument;
+
+import com.google.auto.value.AutoValue;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.annotations.Experimental.Kind;
+import org.apache.beam.sdk.transforms.Impulse;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PBegin;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.checkerframework.checker.nullness.qual.NonNull;
+import org.checkerframework.dataflow.qual.Pure;
+
+/**
+ * An example unbounded SDF IO, using Kafka as an underlying source. This IO deliberately has a
+ * minimal set of Kafka features, and exists as an example of how to write an unbounded SDF source.
+ *
+ * <p>This IO should not be used in pipelines, and has no guarantees for quality, correctness, or
+ * performance.
+ *
+ * <p>This IO was generated as a pared down version of KafkaIO, and should act to consume
+ * KV<byte[],byte[]> pairs from Kafka In practice, this IO would be used thusly: pipeline
+ * .apply(ExampleKafkaReadIO.read() .withBootstrapServers("broker_1:9092,broker_2:9092")
+ * .withTopics(ImmutableList.of("my_topic"))) .apply(Some Other Transforms);
+ */
+public class ExampleKafkaReadIO {
+
+  public static Read read() {
+    return new AutoValue_ExampleKafkaReadIO_Read.Builder()
+        .setTopics(new ArrayList<>())
+        .setConsumerConfig(ExampleKafkaReadIOUtils.DEFAULT_CONSUMER_PROPERTIES)
+        .build();
+  }
+
+  @AutoValue
+  @AutoValue.CopyAnnotations
+  public abstract static class Read extends PTransform<PBegin, PCollection<KV<byte[], byte[]>>> {
+
+    @Pure
+    abstract Map<String, Object> getConsumerConfig();
+
+    @Pure
+    abstract List<String> getTopics();
+
+    abstract Read.Builder toBuilder();
+
+    @Experimental(Kind.PORTABILITY)

Review Comment:
   Probably we should discourage using such experimental tags. I don't know if this adds anything to the quality of the connector.



##########
examples/java/src/main/java/org/apache/beam/examples/io/examplekafkaread/ExampleKafkaReadIO.java:
##########
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.examples.io.examplekafkaread;
+
+import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument;
+
+import com.google.auto.value.AutoValue;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.annotations.Experimental.Kind;
+import org.apache.beam.sdk.transforms.Impulse;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PBegin;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.checkerframework.checker.nullness.qual.NonNull;
+import org.checkerframework.dataflow.qual.Pure;
+
+/**
+ * An example unbounded SDF IO, using Kafka as an underlying source. This IO deliberately has a
+ * minimal set of Kafka features, and exists as an example of how to write an unbounded SDF source.
+ *
+ * <p>This IO should not be used in pipelines, and has no guarantees for quality, correctness, or
+ * performance.
+ *
+ * <p>This IO was generated as a pared down version of KafkaIO, and should act to consume
+ * KV<byte[],byte[]> pairs from Kafka In practice, this IO would be used thusly: pipeline
+ * .apply(ExampleKafkaReadIO.read() .withBootstrapServers("broker_1:9092,broker_2:9092")
+ * .withTopics(ImmutableList.of("my_topic"))) .apply(Some Other Transforms);
+ */
+public class ExampleKafkaReadIO {
+
+  public static Read read() {
+    return new AutoValue_ExampleKafkaReadIO_Read.Builder()
+        .setTopics(new ArrayList<>())
+        .setConsumerConfig(ExampleKafkaReadIOUtils.DEFAULT_CONSUMER_PROPERTIES)
+        .build();
+  }
+
+  @AutoValue
+  @AutoValue.CopyAnnotations
+  public abstract static class Read extends PTransform<PBegin, PCollection<KV<byte[], byte[]>>> {
+
+    @Pure
+    abstract Map<String, Object> getConsumerConfig();
+
+    @Pure
+    abstract List<String> getTopics();
+
+    abstract Read.Builder toBuilder();
+
+    @Experimental(Kind.PORTABILITY)
+    @AutoValue.Builder
+    abstract static class Builder {
+      abstract Builder setConsumerConfig(Map<String, Object> config);
+
+      abstract Builder setTopics(List<String> topics);
+
+      abstract Read build();
+    }
+
+    /**
+     * This expand method is characteristic of many top level expands for IOs. It validates required
+     * parameters and configurations, and then it applies a series of DoFns to actually execute the
+     * read or write. It is typical to have multiple DoFns here, as frequently an IO needs to

Review Comment:
   How about:
   s\
   "It is typical to have multiple DoFns here , as frequently an IO needs to determine what to read from the configuration before it actually reads"
   \"It's typical for I/O connector transforms to be composite transforms since I/O connectors usually requires multiple computation units that have to be separated out for correctness or efficiency"



##########
examples/java/src/main/java/org/apache/beam/examples/io/examplekafkaread/ExampleKafkaReadIO.java:
##########
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.examples.io.examplekafkaread;
+
+import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument;
+
+import com.google.auto.value.AutoValue;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.annotations.Experimental.Kind;
+import org.apache.beam.sdk.transforms.Impulse;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PBegin;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.checkerframework.checker.nullness.qual.NonNull;
+import org.checkerframework.dataflow.qual.Pure;
+
+/**
+ * An example unbounded SDF IO, using Kafka as an underlying source. This IO deliberately has a
+ * minimal set of Kafka features, and exists as an example of how to write an unbounded SDF source.
+ *
+ * <p>This IO should not be used in pipelines, and has no guarantees for quality, correctness, or
+ * performance.
+ *
+ * <p>This IO was generated as a pared down version of KafkaIO, and should act to consume
+ * KV<byte[],byte[]> pairs from Kafka In practice, this IO would be used thusly: pipeline
+ * .apply(ExampleKafkaReadIO.read() .withBootstrapServers("broker_1:9092,broker_2:9092")
+ * .withTopics(ImmutableList.of("my_topic"))) .apply(Some Other Transforms);
+ */
+public class ExampleKafkaReadIO {
+
+  public static Read read() {
+    return new AutoValue_ExampleKafkaReadIO_Read.Builder()
+        .setTopics(new ArrayList<>())
+        .setConsumerConfig(ExampleKafkaReadIOUtils.DEFAULT_CONSUMER_PROPERTIES)
+        .build();
+  }
+
+  @AutoValue
+  @AutoValue.CopyAnnotations
+  public abstract static class Read extends PTransform<PBegin, PCollection<KV<byte[], byte[]>>> {
+
+    @Pure
+    abstract Map<String, Object> getConsumerConfig();
+
+    @Pure
+    abstract List<String> getTopics();
+
+    abstract Read.Builder toBuilder();
+
+    @Experimental(Kind.PORTABILITY)
+    @AutoValue.Builder
+    abstract static class Builder {
+      abstract Builder setConsumerConfig(Map<String, Object> config);
+
+      abstract Builder setTopics(List<String> topics);
+
+      abstract Read build();
+    }
+
+    /**
+     * This expand method is characteristic of many top level expands for IOs. It validates required
+     * parameters and configurations, and then it applies a series of DoFns to actually execute the
+     * read or write. It is typical to have multiple DoFns here, as frequently an IO needs to
+     * determine what to read from the configuration before it actually reads. In this case, we are
+     * provided a topic name, but Kafka further divides its topics into partitions. As such, {@link
+     * GenerateTopicPartitions} exists to query Kafka for this detail, yielding one or more {@link
+     * org.apache.kafka.common.TopicPartition}s for every topic name.
+     */
+    @Override
+    public PCollection<KV<byte[], byte[]>> expand(@NonNull PBegin input) {

Review Comment:
   Ditto regarding using an actual input type.



##########
examples/java/src/main/java/org/apache/beam/examples/io/examplekafkaread/ReadFromKafka.java:
##########
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.examples.io.examplekafkaread;
+
+import java.util.Map;
+import org.apache.beam.sdk.io.range.OffsetRange;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.DoFn.UnboundedPerElement;
+import org.apache.beam.sdk.transforms.splittabledofn.GrowableOffsetRangeTracker;
+import org.apache.beam.sdk.transforms.splittabledofn.ManualWatermarkEstimator;
+import org.apache.beam.sdk.transforms.splittabledofn.OffsetRangeTracker;
+import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
+import org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimator;
+import org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimators.Manual;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.TopicPartition;
+import org.checkerframework.checker.nullness.qual.NonNull;
+import org.joda.time.Instant;
+
+/**
+ * Because the element here is a topic partition, there can be any number of KV pairs per element.
+ * As such, we mark this as {@link UnboundedPerElement}.
+ *
+ * <p>Note, although base {@link org.apache.beam.sdk.io.kafka.KafkaIO} does provide an @GetSize
+ * method, it is not required for a Kafka based IO to function properly. Because both IOs use an
+ * OffsetRangeTracker, the size of work remaining can be computed using that trackers built-in
+ * getProgress function. In KafkaIO, this is enhanced providing a @GetSize method that improves the
+ * estimate by keeping track of the size of the Kafka elements, but that is not strictly necessary.
+ */
+@UnboundedPerElement
+public class ReadFromKafka extends DoFn<TopicPartition, KV<byte[], byte[]>> {

Review Comment:
   Probably rename to ReadFromKafkaDoFn for clarity.



##########
examples/java/src/main/java/org/apache/beam/examples/io/examplekafkaread/ExampleKafkaReadIO.java:
##########
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *

Review Comment:
   Should we make this a separate Gradle submodule with the correct set of dependencies needed for actually using this in a pipeline ?



##########
examples/java/src/main/java/org/apache/beam/examples/io/examplekafkaread/ExampleKafkaReadIO.java:
##########
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.

Review Comment:
   Should we also add the Schema-Aware transform for this and recommend adding it for all new I/O connectors ?



##########
examples/java/src/main/java/org/apache/beam/examples/io/examplekafkaread/ReadFromKafka.java:
##########
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.examples.io.examplekafkaread;
+
+import java.util.Map;
+import org.apache.beam.sdk.io.range.OffsetRange;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.DoFn.UnboundedPerElement;
+import org.apache.beam.sdk.transforms.splittabledofn.GrowableOffsetRangeTracker;
+import org.apache.beam.sdk.transforms.splittabledofn.ManualWatermarkEstimator;
+import org.apache.beam.sdk.transforms.splittabledofn.OffsetRangeTracker;
+import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
+import org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimator;
+import org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimators.Manual;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.TopicPartition;
+import org.checkerframework.checker.nullness.qual.NonNull;
+import org.joda.time.Instant;
+
+/**
+ * Because the element here is a topic partition, there can be any number of KV pairs per element.
+ * As such, we mark this as {@link UnboundedPerElement}.
+ *
+ * <p>Note, although base {@link org.apache.beam.sdk.io.kafka.KafkaIO} does provide an @GetSize
+ * method, it is not required for a Kafka based IO to function properly. Because both IOs use an
+ * OffsetRangeTracker, the size of work remaining can be computed using that trackers built-in
+ * getProgress function. In KafkaIO, this is enhanced providing a @GetSize method that improves the
+ * estimate by keeping track of the size of the Kafka elements, but that is not strictly necessary.
+ */
+@UnboundedPerElement
+public class ReadFromKafka extends DoFn<TopicPartition, KV<byte[], byte[]>> {
+  private static final java.time.Duration KAFKA_POLL_TIMEOUT = java.time.Duration.ofSeconds(1);
+
+  final Map<String, Object> consumerConfig;
+
+  public ReadFromKafka(@NonNull Map<String, Object> consumerConfig) {
+    this.consumerConfig = consumerConfig;
+  }
+
+  /**
+   * Our initial restriction from Kafka is the range from the current consumer position until
+   * Long.MAX_VALUE. The reason we do not go from 0 to max value, is that Kafka may have cleaned up
+   * old elements, such that the minimum offset is not 0, or is that our consumer could be using a
+   * group identifier that has already consumed from this topic partition.
+   */
+  @GetInitialRestriction
+  public OffsetRange initialRestriction(@Element TopicPartition topicPartition) {
+    try (Consumer<byte[], byte[]> offsetConsumer =
+        new KafkaConsumer<byte[], byte[]>(consumerConfig)) {
+      offsetConsumer.assign(ImmutableList.of(topicPartition));
+      long startOffset = offsetConsumer.position(topicPartition);
+
+      return new OffsetRange(startOffset, Long.MAX_VALUE);
+    }
+  }
+
+  /**
+   * Because elements can be added to Kafka continuously, we want our offset range tracker to
+   * reflect this. As such, we use a {@link GrowableOffsetRangeTracker} to poll Kafka and see how
+   * far out the end of the range is.
+   *
+   * <p>If the restriction provided by the runner has a getTo() of Long.MAX_VALUE, we don't need to
+   * query Kafka to know how far out to grab our restriction.
+   */
+  @NewTracker
+  public OffsetRangeTracker restrictionTracker(
+      @Element TopicPartition topicPartition, @Restriction OffsetRange restriction) {
+    if (restriction.getTo() < Long.MAX_VALUE) {
+      return new OffsetRangeTracker(restriction);
+    }
+    ExampleKafkaReadOffsetEstimator offsetPoller =
+        new ExampleKafkaReadOffsetEstimator(
+            new KafkaConsumer<byte[], byte[]>(
+                ExampleKafkaReadIOUtils.getOffsetConsumerConfig(
+                    "tracker-" + topicPartition, consumerConfig)),
+            topicPartition);
+    return new GrowableOffsetRangeTracker(restriction.getFrom(), offsetPoller);

Review Comment:
   Why is GrowableOffsetRangeTracker useful in this case ? I.e. what would we loose if we just use OffsetRangeTracker(0, Long.MAX_VALUE) for al cases ?



##########
examples/java/src/main/java/org/apache/beam/examples/io/examplekafkaread/ExampleKafkaReadIO.java:
##########
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.examples.io.examplekafkaread;
+
+import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument;
+
+import com.google.auto.value.AutoValue;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.annotations.Experimental.Kind;
+import org.apache.beam.sdk.transforms.Impulse;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PBegin;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.checkerframework.checker.nullness.qual.NonNull;
+import org.checkerframework.dataflow.qual.Pure;
+
+/**
+ * An example unbounded SDF IO, using Kafka as an underlying source. This IO deliberately has a
+ * minimal set of Kafka features, and exists as an example of how to write an unbounded SDF source.
+ *
+ * <p>This IO should not be used in pipelines, and has no guarantees for quality, correctness, or
+ * performance.
+ *
+ * <p>This IO was generated as a pared down version of KafkaIO, and should act to consume
+ * KV<byte[],byte[]> pairs from Kafka In practice, this IO would be used thusly: pipeline
+ * .apply(ExampleKafkaReadIO.read() .withBootstrapServers("broker_1:9092,broker_2:9092")
+ * .withTopics(ImmutableList.of("my_topic"))) .apply(Some Other Transforms);
+ */
+public class ExampleKafkaReadIO {
+
+  public static Read read() {
+    return new AutoValue_ExampleKafkaReadIO_Read.Builder()
+        .setTopics(new ArrayList<>())
+        .setConsumerConfig(ExampleKafkaReadIOUtils.DEFAULT_CONSUMER_PROPERTIES)
+        .build();
+  }
+
+  @AutoValue
+  @AutoValue.CopyAnnotations
+  public abstract static class Read extends PTransform<PBegin, PCollection<KV<byte[], byte[]>>> {
+
+    @Pure
+    abstract Map<String, Object> getConsumerConfig();
+
+    @Pure
+    abstract List<String> getTopics();
+
+    abstract Read.Builder toBuilder();
+
+    @Experimental(Kind.PORTABILITY)
+    @AutoValue.Builder
+    abstract static class Builder {
+      abstract Builder setConsumerConfig(Map<String, Object> config);
+
+      abstract Builder setTopics(List<String> topics);
+
+      abstract Read build();
+    }
+
+    /**
+     * This expand method is characteristic of many top level expands for IOs. It validates required
+     * parameters and configurations, and then it applies a series of DoFns to actually execute the
+     * read or write. It is typical to have multiple DoFns here, as frequently an IO needs to
+     * determine what to read from the configuration before it actually reads. In this case, we are
+     * provided a topic name, but Kafka further divides its topics into partitions. As such, {@link
+     * GenerateTopicPartitions} exists to query Kafka for this detail, yielding one or more {@link
+     * org.apache.kafka.common.TopicPartition}s for every topic name.
+     */
+    @Override
+    public PCollection<KV<byte[], byte[]>> expand(@NonNull PBegin input) {
+      checkArgument(
+          getConsumerConfig().get(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG) != null,
+          "withBootstrapServers() is required");
+      checkArgument(getTopics() != null && getTopics().size() > 0, "withTopics() is required");
+      return input
+          .apply(Impulse.create())
+          .apply(ParDo.of(new GenerateTopicPartitions(getConsumerConfig(), getTopics())))

Review Comment:
   There should be a Reshuffle between these two. Otherwise topic partition generation and reading gets fused and everything will happen in a single worker.
   
   Also, a failure when reading will result in topic-partitions being regenerated.



##########
examples/java/src/main/java/org/apache/beam/examples/io/examplekafkaread/ReadFromKafka.java:
##########
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.examples.io.examplekafkaread;
+
+import java.util.Map;
+import org.apache.beam.sdk.io.range.OffsetRange;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.DoFn.UnboundedPerElement;
+import org.apache.beam.sdk.transforms.splittabledofn.GrowableOffsetRangeTracker;
+import org.apache.beam.sdk.transforms.splittabledofn.ManualWatermarkEstimator;
+import org.apache.beam.sdk.transforms.splittabledofn.OffsetRangeTracker;
+import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
+import org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimator;
+import org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimators.Manual;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.TopicPartition;
+import org.checkerframework.checker.nullness.qual.NonNull;
+import org.joda.time.Instant;
+
+/**
+ * Because the element here is a topic partition, there can be any number of KV pairs per element.
+ * As such, we mark this as {@link UnboundedPerElement}.
+ *
+ * <p>Note, although base {@link org.apache.beam.sdk.io.kafka.KafkaIO} does provide an @GetSize
+ * method, it is not required for a Kafka based IO to function properly. Because both IOs use an
+ * OffsetRangeTracker, the size of work remaining can be computed using that trackers built-in
+ * getProgress function. In KafkaIO, this is enhanced providing a @GetSize method that improves the
+ * estimate by keeping track of the size of the Kafka elements, but that is not strictly necessary.
+ */
+@UnboundedPerElement
+public class ReadFromKafka extends DoFn<TopicPartition, KV<byte[], byte[]>> {
+  private static final java.time.Duration KAFKA_POLL_TIMEOUT = java.time.Duration.ofSeconds(1);
+
+  final Map<String, Object> consumerConfig;
+
+  public ReadFromKafka(@NonNull Map<String, Object> consumerConfig) {
+    this.consumerConfig = consumerConfig;
+  }
+
+  /**
+   * Our initial restriction from Kafka is the range from the current consumer position until
+   * Long.MAX_VALUE. The reason we do not go from 0 to max value, is that Kafka may have cleaned up
+   * old elements, such that the minimum offset is not 0, or is that our consumer could be using a
+   * group identifier that has already consumed from this topic partition.
+   */
+  @GetInitialRestriction
+  public OffsetRange initialRestriction(@Element TopicPartition topicPartition) {
+    try (Consumer<byte[], byte[]> offsetConsumer =
+        new KafkaConsumer<byte[], byte[]>(consumerConfig)) {
+      offsetConsumer.assign(ImmutableList.of(topicPartition));
+      long startOffset = offsetConsumer.position(topicPartition);
+
+      return new OffsetRange(startOffset, Long.MAX_VALUE);
+    }
+  }
+
+  /**
+   * Because elements can be added to Kafka continuously, we want our offset range tracker to
+   * reflect this. As such, we use a {@link GrowableOffsetRangeTracker} to poll Kafka and see how
+   * far out the end of the range is.
+   *
+   * <p>If the restriction provided by the runner has a getTo() of Long.MAX_VALUE, we don't need to
+   * query Kafka to know how far out to grab our restriction.
+   */
+  @NewTracker
+  public OffsetRangeTracker restrictionTracker(
+      @Element TopicPartition topicPartition, @Restriction OffsetRange restriction) {
+    if (restriction.getTo() < Long.MAX_VALUE) {
+      return new OffsetRangeTracker(restriction);
+    }
+    ExampleKafkaReadOffsetEstimator offsetPoller =
+        new ExampleKafkaReadOffsetEstimator(
+            new KafkaConsumer<byte[], byte[]>(
+                ExampleKafkaReadIOUtils.getOffsetConsumerConfig(
+                    "tracker-" + topicPartition, consumerConfig)),
+            topicPartition);
+    return new GrowableOffsetRangeTracker(restriction.getFrom(), offsetPoller);
+  }
+
+  @GetInitialWatermarkEstimatorState
+  public Instant getInitialWatermarkEstimatorState(@Timestamp Instant currentElementTimestamp) {
+    return currentElementTimestamp;
+  }
+
+  /**
+   * For this example, we estimate the watermark based on the clock time we receive elements from
+   * Kafka. As such, we use a manual estimator that we update during ProcessElement.
+   */
+  @NewWatermarkEstimator
+  public WatermarkEstimator<Instant> newWatermarkEstimator(
+      @WatermarkEstimatorState Instant watermarkEstimatorState) {
+    return new Manual(watermarkEstimatorState);
+  }
+
+  /**
+   * @param topicPartition is required to know what topic this bundle is being consumed from
+   * @param tracker is required to claim records as we receive them
+   * @param watermarkEstimator is required to be updated as we receive records
+   * @param receiver for outputting the KV Pairs
+   */
+  @ProcessElement
+  public ProcessContinuation processElement(
+      @Element TopicPartition topicPartition,
+      RestrictionTracker<OffsetRange, Long> tracker,
+      WatermarkEstimator<Instant> watermarkEstimator,
+      OutputReceiver<KV<byte[], byte[]>> receiver) {
+    try (Consumer<byte[], byte[]> consumer = new KafkaConsumer<byte[], byte[]>(consumerConfig)) {
+      consumer.assign(ImmutableList.of(topicPartition));
+      long startOffset = tracker.currentRestriction().getFrom();
+      // Seek to the lowest element for the restriction before we poll
+      consumer.seek(topicPartition, startOffset);
+      ConsumerRecords<byte[], byte[]> rawRecords = ConsumerRecords.empty();
+
+      while (true) {
+        rawRecords = consumer.poll(KAFKA_POLL_TIMEOUT);
+        // When there are no records available for the current TopicPartition, self-checkpoint
+        // and move to process the next element.
+        if (rawRecords.isEmpty()) {
+          // If we receive no records, we still should update the watermark. This ensures that
+          // watermark driven logic continues to work downstream, even if the data on Kafka is
+          // sparse
+          ((ManualWatermarkEstimator<Instant>) watermarkEstimator).setWatermark(Instant.now());
+
+          // We continue processing, as there is still work to be done in the range of the tracker
+          return ProcessContinuation.resume();
+        }
+        for (ConsumerRecord<byte[], byte[]> rawRecord : rawRecords) {
+          if (!tracker.tryClaim(rawRecord.offset())) {
+            // Because we poll from Kafka, as opposed to querying for specific rows, we may receive

Review Comment:
   > records that are beyond the scope of the tracker
   Do we expect multiple workers to process the same subscription at the same time ? I thought a Kafka worker would own the full offset range for a given subscription.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org