You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2020/06/03 17:38:38 UTC

[GitHub] [beam] aromanenko-dev commented on a change in pull request #11749: [BEAM-9977] Implement ReadFromKafkaViaSDF

aromanenko-dev commented on a change in pull request #11749:
URL: https://github.com/apache/beam/pull/11749#discussion_r434717019



##########
File path: sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
##########
@@ -817,6 +847,24 @@ public void setValueDeserializer(String valueDeserializer) {
       return toBuilder().setCommitOffsetsInFinalizeEnabled(true).build();
     }
 
+    /**
+     * The {@link Read} transform will be expanded with {@link ReadFromKafkaViaSDF} transform. While
+     * working with {@link #useSDFTransformInRead()} and {@link
+     * #withTimestampPolicyFactory(TimestampPolicyFactory)} together, only {@link
+     * TimestampPolicyFactory#withCreateTime(Duration)}, {@link
+     * TimestampPolicyFactory#withLogAppendTime()} and {@link
+     * TimestampPolicyFactory#withProcessingTime()} will be populated correctly. For other custom
+     * {@link TimestampPolicy}, the transform will use {@link
+     * TimestampPolicyFactory#withProcessingTime()} by default. It's recommended to use {@link
+     * ReadFromKafkaViaSDF} directly in that case.
+     *
+     * <p>Note that the expansion only happens when tbe pipeline has "beam_fn_api" experiment and
+     * meanwhile "beam_fn_api_use_deprecated_read" is not set.
+     */
+    public Read<K, V> useSDFTransformInRead() {

Review comment:
       Maybe call it just useSDF()? Because it's already known that it's a PTransform used in Read

##########
File path: sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIOUtils.java
##########
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.kafka;
+
+import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Random;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.serialization.ByteArrayDeserializer;
+
+/**
+ * Common utility functions and default configurations for {@link KafkaIO.Read} and {@link
+ * ReadFromKafkaViaSDF}.
+ */
+final class KafkaIOUtils {
+  // A set of config defaults.

Review comment:
       I expect that all these constants and methods below were moved here without any changes and just for the sake of code refactoring. If not, please, add some comments on these.

##########
File path: sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
##########
@@ -906,19 +955,110 @@ public void setValueDeserializer(String valueDeserializer) {
       Coder<K> keyCoder = getKeyCoder(coderRegistry);
       Coder<V> valueCoder = getValueCoder(coderRegistry);
 
-      // Handles unbounded source to bounded conversion if maxNumRecords or maxReadTime is set.
-      Unbounded<KafkaRecord<K, V>> unbounded =
-          org.apache.beam.sdk.io.Read.from(
-              toBuilder().setKeyCoder(keyCoder).setValueCoder(valueCoder).build().makeSource());
+      if (!isUseSDFTransform()
+          || !ExperimentalOptions.hasExperiment(input.getPipeline().getOptions(), "beam_fn_api")

Review comment:
       It looks that we depend on some specific pipeline business logic here. I'd prefer to avoid this if possible.

##########
File path: sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
##########
@@ -906,19 +955,110 @@ public void setValueDeserializer(String valueDeserializer) {
       Coder<K> keyCoder = getKeyCoder(coderRegistry);
       Coder<V> valueCoder = getValueCoder(coderRegistry);
 
-      // Handles unbounded source to bounded conversion if maxNumRecords or maxReadTime is set.
-      Unbounded<KafkaRecord<K, V>> unbounded =
-          org.apache.beam.sdk.io.Read.from(
-              toBuilder().setKeyCoder(keyCoder).setValueCoder(valueCoder).build().makeSource());
+      if (!isUseSDFTransform()
+          || !ExperimentalOptions.hasExperiment(input.getPipeline().getOptions(), "beam_fn_api")
+          || ExperimentalOptions.hasExperiment(
+              input.getPipeline().getOptions(), "beam_fn_api_use_deprecated_read")) {

Review comment:
       The same point as above.

##########
File path: sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaViaSDF.java
##########
@@ -0,0 +1,697 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.kafka;
+
+import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument;
+
+import com.google.auto.value.AutoValue;
+import java.util.Map;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderRegistry;
+import org.apache.beam.sdk.io.range.OffsetRange;
+import org.apache.beam.sdk.options.ExperimentalOptions;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.DoFn.Element;
+import org.apache.beam.sdk.transforms.DoFn.GetRestrictionCoder;
+import org.apache.beam.sdk.transforms.DoFn.OutputReceiver;
+import org.apache.beam.sdk.transforms.DoFn.ProcessElement;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.transforms.splittabledofn.GrowableOffsetRangeTracker;
+import org.apache.beam.sdk.transforms.splittabledofn.OffsetRangeTracker;
+import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
+import org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimator;
+import org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimators.MonotonicallyIncreasing;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.io.Closeables;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.utils.AppInfoParser;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A {@link PTransform} that takes a PCollection of {@link KafkaSourceDescription} as input and
+ * outputs a PCollection of {@link KafkaRecord}. The core implementation is based on {@code
+ * SplittableDoFn}. For more details about the concept of {@code SplittableDoFn}, please refer to
+ * the beam blog post: https://beam.apache.org/blog/splittable-do-fn/ and design
+ * doc:https://s.apache.org/beam-fn-api. The major difference from {@link KafkaIO.Read} is, {@link
+ * ReadFromKafkaViaSDF} doesn't require source descriptions(e.g., {@link
+ * KafkaIO.Read#getTopicPartitions()}, {@link KafkaIO.Read#getTopics()}, {@link
+ * KafkaIO.Read#getStartReadTime()}, etc.) during the pipeline construction time. Instead, the
+ * pipeline can populate these source descriptions during runtime. For example, the pipeline can
+ * query Kafka topics from BigQuery table and read these topics via {@link ReadFromKafkaViaSDF}.
+ *
+ * <h3>Common Kafka Consumer Configurations</h3>
+ *
+ * <p>Most Kafka consumer configurations are similar to {@link KafkaIO.Read}:
+ *
+ * <ul>
+ *   <li>{@link ReadFromKafkaViaSDF#getConsumerConfig()} is the same as {@link
+ *       KafkaIO.Read#getConsumerConfig()}.
+ *   <li>{@link ReadFromKafkaViaSDF#getConsumerFactoryFn()} is the same as {@link
+ *       KafkaIO.Read#getConsumerFactoryFn()}.
+ *   <li>{@link ReadFromKafkaViaSDF#getOffsetConsumerConfig()} is the same as {@link
+ *       KafkaIO.Read#getOffsetConsumerConfig()}.
+ *   <li>{@link ReadFromKafkaViaSDF#getKeyCoder()} is the same as {@link
+ *       KafkaIO.Read#getKeyCoder()}.
+ *   <li>{@link ReadFromKafkaViaSDF#getValueCoder()} is the same as {@link
+ *       KafkaIO.Read#getValueCoder()}.
+ *   <li>{@link ReadFromKafkaViaSDF#getKeyDeserializerProvider()} is the same as {@link
+ *       KafkaIO.Read#getKeyDeserializerProvider()}.
+ *   <li>{@link ReadFromKafkaViaSDF#getValueDeserializerProvider()} is the same as {@link
+ *       KafkaIO.Read#getValueDeserializerProvider()}.
+ *   <li>{@link ReadFromKafkaViaSDF#isCommitOffsetEnabled()} means the same as {@link
+ *       KafkaIO.Read#isCommitOffsetsInFinalizeEnabled()}.
+ * </ul>
+ *
+ * <p>For example, to create a basic {@link ReadFromKafkaViaSDF} transform:
+ *
+ * <pre>{@code
+ * pipeline
+ *  .apply(Create.of(KafkaSourceDescription.of(new TopicPartition("my_topic", 1))))
+ *  .apply(ReadFromKafkaViaSDF.create()
+ *          .withBootstrapServers("broker_1:9092,broker_2:9092")
+ *          .withKeyDeserializer(LongDeserializer.class).
+ *          .withValueDeserializer(StringDeserializer.class));
+ * }</pre>
+ *
+ * <h3>Configurations of {@link ReadFromKafkaViaSDF}</h3>
+ *
+ * <p>Except configurations of Kafka Consumer, there are some other configurations which are related
+ * to processing records.
+ *
+ * <p>{@link ReadFromKafkaViaSDF#commitOffsets()} enables committing offset after processing the
+ * record. Note that if {@code isolation.level} is set to "read_committed" or {@link
+ * ConsumerConfig#ENABLE_AUTO_COMMIT_CONFIG} is set in the consumer config, the {@link
+ * ReadFromKafkaViaSDF#commitOffsets()} will be ignored.
+ *
+ * <p>{@link ReadFromKafkaViaSDF#withExtractOutputTimestampFn(SerializableFunction)} asks for a
+ * function which takes a {@link KafkaRecord} as input and outputs outputTimestamp. This function is
+ * used to produce output timestamp per {@link KafkaRecord}. There are three built-in types: {@link
+ * ReadFromKafkaViaSDF#withProcessingTime()}, {@link ReadFromKafkaViaSDF#withCreateTime()} and
+ * {@link ReadFromKafkaViaSDF#withLogAppendTime()}.
+ *
+ * <p>For example, to create a {@link ReadFromKafkaViaSDF} with these configurations:
+ *
+ * <pre>{@code
+ * pipeline
+ * .apply(Create.of(KafkaSourceDescription.of(new TopicPartition("my_topic", 1))))
+ * .apply(ReadFromKafkaViaSDF.create()
+ *          .withBootstrapServers("broker_1:9092,broker_2:9092")
+ *          .withKeyDeserializer(LongDeserializer.class).
+ *          .withValueDeserializer(StringDeserializer.class)
+ *          .withProcessingTime()
+ *          .commitOffsets());
+ *
+ * }</pre>
+ *
+ * <h3>Read from {@link KafkaSourceDescription}</h3>
+ *
+ * {@link ReadFromKafkaDoFn} implements the logic of reading from Kafka. The element is a {@link
+ * KafkaSourceDescription}, and the restriction is an {@link OffsetRange} which represents record
+ * offset. A {@link GrowableOffsetRangeTracker} is used to track an {@link OffsetRange} ended with
+ * {@code Long.MAX_VALUE}. For a finite range, a {@link OffsetRangeTracker} is created.
+ *
+ * <h4>Initialize Restriction</h4>
+ *
+ * {@link ReadFromKafkaDoFn#initialRestriction(KafkaSourceDescription)} creates an initial range for
+ * a input element {@link KafkaSourceDescription}. The end of range will be initialized as {@code
+ * Long.MAX_VALUE}. For the start of the range:
+ *
+ * <ul>
+ *   <li>If {@link KafkaSourceDescription#getStartOffset()} is set, use this offset as start.
+ *   <li>If {@link KafkaSourceDescription#getStartReadTime()} is set, seek the start offset based on
+ *       this time.
+ *   <li>Otherwise, the last committed offset + 1 will be returned by {@link
+ *       Consumer#position(TopicPartition)} as the start.
+ * </ul>
+ *
+ * <h4>Initial Split</h4>
+ *
+ * <p>There is no initial split for now.
+ *
+ * <h4>Checkpoint and Resume Processing</h4>
+ *
+ * <p>There are 2 types of checkpoint here: self-checkpoint which invokes by the DoFn and
+ * system-checkpoint which is issued by the runner via {@link
+ * org.apache.beam.model.fnexecution.v1.BeamFnApi.ProcessBundleSplitRequest}. Every time the
+ * consumer gets empty response from {@link Consumer#poll(long)}, {@link ReadFromKafkaDoFn} will
+ * checkpoint at current {@link KafkaSourceDescription} and move to process the next element. These
+ * deferred elements will be resumed by the runner as soon as possible.
+ *
+ * <h4>Progress and Size</h4>
+ *
+ * <p>The progress is provided by {@link GrowableOffsetRangeTracker} or {@link OffsetRangeTracker}
+ * per {@link KafkaSourceDescription}. For an infinite {@link OffsetRange}, a Kafka {@link Consumer}
+ * is used in the {@link GrowableOffsetRangeTracker} as the {@link
+ * GrowableOffsetRangeTracker.RangeEndEstimator} to poll the latest offset. Please refer to {@link
+ * ReadFromKafkaDoFn.KafkaLatestOffsetEstimator} for details.
+ *
+ * <p>The size is computed by {@link ReadFromKafkaDoFn#getSize(KafkaSourceDescription,
+ * OffsetRange).} A {@link KafkaIOUtils.MovingAvg} is used to track the average size of kafka
+ * records.
+ *
+ * <h4>Track Watermark</h4>
+ *
+ * The estimated watermark is computed by {@link MonotonicallyIncreasing} based on output timestamps
+ * per {@link KafkaSourceDescription}.
+ */
+@AutoValue
+public abstract class ReadFromKafkaViaSDF<K, V>
+    extends PTransform<PCollection<KafkaSourceDescription>, PCollection<KafkaRecord<K, V>>> {
+
+  private static final Logger LOG = LoggerFactory.getLogger(ReadFromKafkaViaSDF.class);
+
+  abstract Map<String, Object> getConsumerConfig();
+
+  @Nullable
+  abstract Map<String, Object> getOffsetConsumerConfig();
+
+  @Nullable
+  abstract DeserializerProvider getKeyDeserializerProvider();
+
+  @Nullable
+  abstract DeserializerProvider getValueDeserializerProvider();
+
+  @Nullable
+  abstract Coder<K> getKeyCoder();
+
+  @Nullable
+  abstract Coder<V> getValueCoder();
+
+  abstract SerializableFunction<Map<String, Object>, Consumer<byte[], byte[]>>
+      getConsumerFactoryFn();
+
+  abstract SerializableFunction<KafkaRecord<K, V>, Instant> getExtractOutputTimestampFn();
+
+  abstract boolean isCommitOffsetEnabled();
+
+  abstract Builder<K, V> toBuilder();
+
+  @AutoValue.Builder
+  abstract static class Builder<K, V> {
+    abstract Builder<K, V> setConsumerConfig(Map<String, Object> config);
+
+    abstract Builder<K, V> setOffsetConsumerConfig(Map<String, Object> offsetConsumerConfig);
+
+    abstract Builder<K, V> setConsumerFactoryFn(
+        SerializableFunction<Map<String, Object>, Consumer<byte[], byte[]>> consumerFactoryFn);
+
+    abstract Builder<K, V> setKeyDeserializerProvider(DeserializerProvider deserializerProvider);
+
+    abstract Builder<K, V> setValueDeserializerProvider(DeserializerProvider deserializerProvider);
+
+    abstract Builder<K, V> setKeyCoder(Coder<K> keyCoder);
+
+    abstract Builder<K, V> setValueCoder(Coder<V> valueCoder);
+
+    abstract Builder<K, V> setExtractOutputTimestampFn(
+        SerializableFunction<KafkaRecord<K, V>, Instant> fn);
+
+    abstract Builder<K, V> setCommitOffsetEnabled(boolean commitOffsetEnabled);
+
+    abstract ReadFromKafkaViaSDF<K, V> build();
+  }
+
+  public static <K, V> ReadFromKafkaViaSDF<K, V> create() {

Review comment:
       Do we need to expose it to user? Could it be just `read()` to be consistent with `KafkaIO.Read`?

##########
File path: sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaViaSDF.java
##########
@@ -0,0 +1,697 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.kafka;
+
+import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument;
+
+import com.google.auto.value.AutoValue;
+import java.util.Map;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderRegistry;
+import org.apache.beam.sdk.io.range.OffsetRange;
+import org.apache.beam.sdk.options.ExperimentalOptions;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.DoFn.Element;
+import org.apache.beam.sdk.transforms.DoFn.GetRestrictionCoder;
+import org.apache.beam.sdk.transforms.DoFn.OutputReceiver;
+import org.apache.beam.sdk.transforms.DoFn.ProcessElement;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.transforms.splittabledofn.GrowableOffsetRangeTracker;
+import org.apache.beam.sdk.transforms.splittabledofn.OffsetRangeTracker;
+import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
+import org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimator;
+import org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimators.MonotonicallyIncreasing;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.io.Closeables;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.utils.AppInfoParser;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A {@link PTransform} that takes a PCollection of {@link KafkaSourceDescription} as input and
+ * outputs a PCollection of {@link KafkaRecord}. The core implementation is based on {@code
+ * SplittableDoFn}. For more details about the concept of {@code SplittableDoFn}, please refer to
+ * the beam blog post: https://beam.apache.org/blog/splittable-do-fn/ and design
+ * doc:https://s.apache.org/beam-fn-api. The major difference from {@link KafkaIO.Read} is, {@link
+ * ReadFromKafkaViaSDF} doesn't require source descriptions(e.g., {@link
+ * KafkaIO.Read#getTopicPartitions()}, {@link KafkaIO.Read#getTopics()}, {@link
+ * KafkaIO.Read#getStartReadTime()}, etc.) during the pipeline construction time. Instead, the
+ * pipeline can populate these source descriptions during runtime. For example, the pipeline can
+ * query Kafka topics from BigQuery table and read these topics via {@link ReadFromKafkaViaSDF}.
+ *
+ * <h3>Common Kafka Consumer Configurations</h3>
+ *
+ * <p>Most Kafka consumer configurations are similar to {@link KafkaIO.Read}:
+ *
+ * <ul>
+ *   <li>{@link ReadFromKafkaViaSDF#getConsumerConfig()} is the same as {@link
+ *       KafkaIO.Read#getConsumerConfig()}.
+ *   <li>{@link ReadFromKafkaViaSDF#getConsumerFactoryFn()} is the same as {@link
+ *       KafkaIO.Read#getConsumerFactoryFn()}.
+ *   <li>{@link ReadFromKafkaViaSDF#getOffsetConsumerConfig()} is the same as {@link
+ *       KafkaIO.Read#getOffsetConsumerConfig()}.
+ *   <li>{@link ReadFromKafkaViaSDF#getKeyCoder()} is the same as {@link
+ *       KafkaIO.Read#getKeyCoder()}.
+ *   <li>{@link ReadFromKafkaViaSDF#getValueCoder()} is the same as {@link
+ *       KafkaIO.Read#getValueCoder()}.
+ *   <li>{@link ReadFromKafkaViaSDF#getKeyDeserializerProvider()} is the same as {@link
+ *       KafkaIO.Read#getKeyDeserializerProvider()}.
+ *   <li>{@link ReadFromKafkaViaSDF#getValueDeserializerProvider()} is the same as {@link
+ *       KafkaIO.Read#getValueDeserializerProvider()}.
+ *   <li>{@link ReadFromKafkaViaSDF#isCommitOffsetEnabled()} means the same as {@link
+ *       KafkaIO.Read#isCommitOffsetsInFinalizeEnabled()}.
+ * </ul>
+ *
+ * <p>For example, to create a basic {@link ReadFromKafkaViaSDF} transform:
+ *
+ * <pre>{@code
+ * pipeline
+ *  .apply(Create.of(KafkaSourceDescription.of(new TopicPartition("my_topic", 1))))
+ *  .apply(ReadFromKafkaViaSDF.create()
+ *          .withBootstrapServers("broker_1:9092,broker_2:9092")
+ *          .withKeyDeserializer(LongDeserializer.class).
+ *          .withValueDeserializer(StringDeserializer.class));
+ * }</pre>
+ *
+ * <h3>Configurations of {@link ReadFromKafkaViaSDF}</h3>
+ *
+ * <p>Except configurations of Kafka Consumer, there are some other configurations which are related
+ * to processing records.
+ *
+ * <p>{@link ReadFromKafkaViaSDF#commitOffsets()} enables committing offset after processing the
+ * record. Note that if {@code isolation.level} is set to "read_committed" or {@link
+ * ConsumerConfig#ENABLE_AUTO_COMMIT_CONFIG} is set in the consumer config, the {@link
+ * ReadFromKafkaViaSDF#commitOffsets()} will be ignored.
+ *
+ * <p>{@link ReadFromKafkaViaSDF#withExtractOutputTimestampFn(SerializableFunction)} asks for a
+ * function which takes a {@link KafkaRecord} as input and outputs outputTimestamp. This function is
+ * used to produce output timestamp per {@link KafkaRecord}. There are three built-in types: {@link
+ * ReadFromKafkaViaSDF#withProcessingTime()}, {@link ReadFromKafkaViaSDF#withCreateTime()} and
+ * {@link ReadFromKafkaViaSDF#withLogAppendTime()}.
+ *
+ * <p>For example, to create a {@link ReadFromKafkaViaSDF} with these configurations:
+ *
+ * <pre>{@code
+ * pipeline
+ * .apply(Create.of(KafkaSourceDescription.of(new TopicPartition("my_topic", 1))))
+ * .apply(ReadFromKafkaViaSDF.create()
+ *          .withBootstrapServers("broker_1:9092,broker_2:9092")
+ *          .withKeyDeserializer(LongDeserializer.class).
+ *          .withValueDeserializer(StringDeserializer.class)
+ *          .withProcessingTime()
+ *          .commitOffsets());
+ *
+ * }</pre>
+ *
+ * <h3>Read from {@link KafkaSourceDescription}</h3>
+ *
+ * {@link ReadFromKafkaDoFn} implements the logic of reading from Kafka. The element is a {@link
+ * KafkaSourceDescription}, and the restriction is an {@link OffsetRange} which represents record
+ * offset. A {@link GrowableOffsetRangeTracker} is used to track an {@link OffsetRange} ended with
+ * {@code Long.MAX_VALUE}. For a finite range, a {@link OffsetRangeTracker} is created.
+ *
+ * <h4>Initialize Restriction</h4>
+ *
+ * {@link ReadFromKafkaDoFn#initialRestriction(KafkaSourceDescription)} creates an initial range for
+ * a input element {@link KafkaSourceDescription}. The end of range will be initialized as {@code
+ * Long.MAX_VALUE}. For the start of the range:
+ *
+ * <ul>
+ *   <li>If {@link KafkaSourceDescription#getStartOffset()} is set, use this offset as start.
+ *   <li>If {@link KafkaSourceDescription#getStartReadTime()} is set, seek the start offset based on
+ *       this time.
+ *   <li>Otherwise, the last committed offset + 1 will be returned by {@link
+ *       Consumer#position(TopicPartition)} as the start.
+ * </ul>
+ *
+ * <h4>Initial Split</h4>
+ *
+ * <p>There is no initial split for now.
+ *
+ * <h4>Checkpoint and Resume Processing</h4>
+ *
+ * <p>There are 2 types of checkpoint here: self-checkpoint which invokes by the DoFn and
+ * system-checkpoint which is issued by the runner via {@link
+ * org.apache.beam.model.fnexecution.v1.BeamFnApi.ProcessBundleSplitRequest}. Every time the
+ * consumer gets empty response from {@link Consumer#poll(long)}, {@link ReadFromKafkaDoFn} will
+ * checkpoint at current {@link KafkaSourceDescription} and move to process the next element. These
+ * deferred elements will be resumed by the runner as soon as possible.
+ *
+ * <h4>Progress and Size</h4>
+ *
+ * <p>The progress is provided by {@link GrowableOffsetRangeTracker} or {@link OffsetRangeTracker}
+ * per {@link KafkaSourceDescription}. For an infinite {@link OffsetRange}, a Kafka {@link Consumer}
+ * is used in the {@link GrowableOffsetRangeTracker} as the {@link
+ * GrowableOffsetRangeTracker.RangeEndEstimator} to poll the latest offset. Please refer to {@link
+ * ReadFromKafkaDoFn.KafkaLatestOffsetEstimator} for details.
+ *
+ * <p>The size is computed by {@link ReadFromKafkaDoFn#getSize(KafkaSourceDescription,
+ * OffsetRange).} A {@link KafkaIOUtils.MovingAvg} is used to track the average size of kafka
+ * records.
+ *
+ * <h4>Track Watermark</h4>
+ *
+ * The estimated watermark is computed by {@link MonotonicallyIncreasing} based on output timestamps
+ * per {@link KafkaSourceDescription}.
+ */
+@AutoValue
+public abstract class ReadFromKafkaViaSDF<K, V>

Review comment:
       I would shorter the name of this class to `ReadWithSDF` since this is clear that it's used to read from Kafka.

##########
File path: sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaViaSDF.java
##########
@@ -0,0 +1,697 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.kafka;
+
+import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument;
+
+import com.google.auto.value.AutoValue;
+import java.util.Map;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderRegistry;
+import org.apache.beam.sdk.io.range.OffsetRange;
+import org.apache.beam.sdk.options.ExperimentalOptions;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.DoFn.Element;
+import org.apache.beam.sdk.transforms.DoFn.GetRestrictionCoder;
+import org.apache.beam.sdk.transforms.DoFn.OutputReceiver;
+import org.apache.beam.sdk.transforms.DoFn.ProcessElement;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.transforms.splittabledofn.GrowableOffsetRangeTracker;
+import org.apache.beam.sdk.transforms.splittabledofn.OffsetRangeTracker;
+import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
+import org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimator;
+import org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimators.MonotonicallyIncreasing;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.io.Closeables;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.utils.AppInfoParser;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A {@link PTransform} that takes a PCollection of {@link KafkaSourceDescription} as input and
+ * outputs a PCollection of {@link KafkaRecord}. The core implementation is based on {@code
+ * SplittableDoFn}. For more details about the concept of {@code SplittableDoFn}, please refer to
+ * the beam blog post: https://beam.apache.org/blog/splittable-do-fn/ and design
+ * doc:https://s.apache.org/beam-fn-api. The major difference from {@link KafkaIO.Read} is, {@link
+ * ReadFromKafkaViaSDF} doesn't require source descriptions(e.g., {@link
+ * KafkaIO.Read#getTopicPartitions()}, {@link KafkaIO.Read#getTopics()}, {@link
+ * KafkaIO.Read#getStartReadTime()}, etc.) during the pipeline construction time. Instead, the
+ * pipeline can populate these source descriptions during runtime. For example, the pipeline can
+ * query Kafka topics from BigQuery table and read these topics via {@link ReadFromKafkaViaSDF}.
+ *
+ * <h3>Common Kafka Consumer Configurations</h3>
+ *
+ * <p>Most Kafka consumer configurations are similar to {@link KafkaIO.Read}:
+ *
+ * <ul>
+ *   <li>{@link ReadFromKafkaViaSDF#getConsumerConfig()} is the same as {@link
+ *       KafkaIO.Read#getConsumerConfig()}.
+ *   <li>{@link ReadFromKafkaViaSDF#getConsumerFactoryFn()} is the same as {@link
+ *       KafkaIO.Read#getConsumerFactoryFn()}.
+ *   <li>{@link ReadFromKafkaViaSDF#getOffsetConsumerConfig()} is the same as {@link
+ *       KafkaIO.Read#getOffsetConsumerConfig()}.
+ *   <li>{@link ReadFromKafkaViaSDF#getKeyCoder()} is the same as {@link
+ *       KafkaIO.Read#getKeyCoder()}.
+ *   <li>{@link ReadFromKafkaViaSDF#getValueCoder()} is the same as {@link
+ *       KafkaIO.Read#getValueCoder()}.
+ *   <li>{@link ReadFromKafkaViaSDF#getKeyDeserializerProvider()} is the same as {@link
+ *       KafkaIO.Read#getKeyDeserializerProvider()}.
+ *   <li>{@link ReadFromKafkaViaSDF#getValueDeserializerProvider()} is the same as {@link
+ *       KafkaIO.Read#getValueDeserializerProvider()}.
+ *   <li>{@link ReadFromKafkaViaSDF#isCommitOffsetEnabled()} means the same as {@link
+ *       KafkaIO.Read#isCommitOffsetsInFinalizeEnabled()}.
+ * </ul>
+ *
+ * <p>For example, to create a basic {@link ReadFromKafkaViaSDF} transform:
+ *
+ * <pre>{@code
+ * pipeline
+ *  .apply(Create.of(KafkaSourceDescription.of(new TopicPartition("my_topic", 1))))
+ *  .apply(ReadFromKafkaViaSDF.create()
+ *          .withBootstrapServers("broker_1:9092,broker_2:9092")
+ *          .withKeyDeserializer(LongDeserializer.class).
+ *          .withValueDeserializer(StringDeserializer.class));
+ * }</pre>
+ *
+ * <h3>Configurations of {@link ReadFromKafkaViaSDF}</h3>
+ *
+ * <p>Except configurations of Kafka Consumer, there are some other configurations which are related
+ * to processing records.
+ *
+ * <p>{@link ReadFromKafkaViaSDF#commitOffsets()} enables committing offset after processing the
+ * record. Note that if {@code isolation.level} is set to "read_committed" or {@link
+ * ConsumerConfig#ENABLE_AUTO_COMMIT_CONFIG} is set in the consumer config, the {@link
+ * ReadFromKafkaViaSDF#commitOffsets()} will be ignored.
+ *
+ * <p>{@link ReadFromKafkaViaSDF#withExtractOutputTimestampFn(SerializableFunction)} asks for a
+ * function which takes a {@link KafkaRecord} as input and outputs outputTimestamp. This function is
+ * used to produce output timestamp per {@link KafkaRecord}. There are three built-in types: {@link
+ * ReadFromKafkaViaSDF#withProcessingTime()}, {@link ReadFromKafkaViaSDF#withCreateTime()} and
+ * {@link ReadFromKafkaViaSDF#withLogAppendTime()}.
+ *
+ * <p>For example, to create a {@link ReadFromKafkaViaSDF} with these configurations:
+ *
+ * <pre>{@code
+ * pipeline
+ * .apply(Create.of(KafkaSourceDescription.of(new TopicPartition("my_topic", 1))))
+ * .apply(ReadFromKafkaViaSDF.create()
+ *          .withBootstrapServers("broker_1:9092,broker_2:9092")
+ *          .withKeyDeserializer(LongDeserializer.class).
+ *          .withValueDeserializer(StringDeserializer.class)
+ *          .withProcessingTime()
+ *          .commitOffsets());
+ *
+ * }</pre>
+ *
+ * <h3>Read from {@link KafkaSourceDescription}</h3>
+ *
+ * {@link ReadFromKafkaDoFn} implements the logic of reading from Kafka. The element is a {@link
+ * KafkaSourceDescription}, and the restriction is an {@link OffsetRange} which represents record
+ * offset. A {@link GrowableOffsetRangeTracker} is used to track an {@link OffsetRange} ended with
+ * {@code Long.MAX_VALUE}. For a finite range, a {@link OffsetRangeTracker} is created.
+ *
+ * <h4>Initialize Restriction</h4>
+ *
+ * {@link ReadFromKafkaDoFn#initialRestriction(KafkaSourceDescription)} creates an initial range for
+ * a input element {@link KafkaSourceDescription}. The end of range will be initialized as {@code
+ * Long.MAX_VALUE}. For the start of the range:
+ *
+ * <ul>
+ *   <li>If {@link KafkaSourceDescription#getStartOffset()} is set, use this offset as start.
+ *   <li>If {@link KafkaSourceDescription#getStartReadTime()} is set, seek the start offset based on
+ *       this time.
+ *   <li>Otherwise, the last committed offset + 1 will be returned by {@link
+ *       Consumer#position(TopicPartition)} as the start.
+ * </ul>
+ *
+ * <h4>Initial Split</h4>
+ *
+ * <p>There is no initial split for now.
+ *
+ * <h4>Checkpoint and Resume Processing</h4>
+ *
+ * <p>There are 2 types of checkpoint here: self-checkpoint which invokes by the DoFn and
+ * system-checkpoint which is issued by the runner via {@link
+ * org.apache.beam.model.fnexecution.v1.BeamFnApi.ProcessBundleSplitRequest}. Every time the
+ * consumer gets empty response from {@link Consumer#poll(long)}, {@link ReadFromKafkaDoFn} will
+ * checkpoint at current {@link KafkaSourceDescription} and move to process the next element. These
+ * deferred elements will be resumed by the runner as soon as possible.
+ *
+ * <h4>Progress and Size</h4>
+ *
+ * <p>The progress is provided by {@link GrowableOffsetRangeTracker} or {@link OffsetRangeTracker}
+ * per {@link KafkaSourceDescription}. For an infinite {@link OffsetRange}, a Kafka {@link Consumer}
+ * is used in the {@link GrowableOffsetRangeTracker} as the {@link
+ * GrowableOffsetRangeTracker.RangeEndEstimator} to poll the latest offset. Please refer to {@link
+ * ReadFromKafkaDoFn.KafkaLatestOffsetEstimator} for details.
+ *
+ * <p>The size is computed by {@link ReadFromKafkaDoFn#getSize(KafkaSourceDescription,
+ * OffsetRange).} A {@link KafkaIOUtils.MovingAvg} is used to track the average size of kafka
+ * records.
+ *
+ * <h4>Track Watermark</h4>
+ *
+ * The estimated watermark is computed by {@link MonotonicallyIncreasing} based on output timestamps
+ * per {@link KafkaSourceDescription}.
+ */
+@AutoValue
+public abstract class ReadFromKafkaViaSDF<K, V>
+    extends PTransform<PCollection<KafkaSourceDescription>, PCollection<KafkaRecord<K, V>>> {
+
+  private static final Logger LOG = LoggerFactory.getLogger(ReadFromKafkaViaSDF.class);
+
+  abstract Map<String, Object> getConsumerConfig();
+
+  @Nullable
+  abstract Map<String, Object> getOffsetConsumerConfig();
+
+  @Nullable
+  abstract DeserializerProvider getKeyDeserializerProvider();
+
+  @Nullable
+  abstract DeserializerProvider getValueDeserializerProvider();
+
+  @Nullable
+  abstract Coder<K> getKeyCoder();
+
+  @Nullable
+  abstract Coder<V> getValueCoder();
+
+  abstract SerializableFunction<Map<String, Object>, Consumer<byte[], byte[]>>
+      getConsumerFactoryFn();
+
+  abstract SerializableFunction<KafkaRecord<K, V>, Instant> getExtractOutputTimestampFn();
+
+  abstract boolean isCommitOffsetEnabled();
+
+  abstract Builder<K, V> toBuilder();
+
+  @AutoValue.Builder
+  abstract static class Builder<K, V> {
+    abstract Builder<K, V> setConsumerConfig(Map<String, Object> config);
+
+    abstract Builder<K, V> setOffsetConsumerConfig(Map<String, Object> offsetConsumerConfig);
+
+    abstract Builder<K, V> setConsumerFactoryFn(
+        SerializableFunction<Map<String, Object>, Consumer<byte[], byte[]>> consumerFactoryFn);
+
+    abstract Builder<K, V> setKeyDeserializerProvider(DeserializerProvider deserializerProvider);
+
+    abstract Builder<K, V> setValueDeserializerProvider(DeserializerProvider deserializerProvider);
+
+    abstract Builder<K, V> setKeyCoder(Coder<K> keyCoder);
+
+    abstract Builder<K, V> setValueCoder(Coder<V> valueCoder);
+
+    abstract Builder<K, V> setExtractOutputTimestampFn(
+        SerializableFunction<KafkaRecord<K, V>, Instant> fn);
+
+    abstract Builder<K, V> setCommitOffsetEnabled(boolean commitOffsetEnabled);
+
+    abstract ReadFromKafkaViaSDF<K, V> build();
+  }
+
+  public static <K, V> ReadFromKafkaViaSDF<K, V> create() {
+    return new AutoValue_ReadFromKafkaViaSDF.Builder<K, V>()
+        .setConsumerFactoryFn(KafkaIOUtils.KAFKA_CONSUMER_FACTORY_FN)
+        .setConsumerConfig(KafkaIOUtils.DEFAULT_CONSUMER_PROPERTIES)
+        .setExtractOutputTimestampFn(ExtractOutputTimestampFns.useProcessingTime())
+        .setCommitOffsetEnabled(false)
+        .build();
+  }
+

Review comment:
       Do all these configuration methods repeat `KafkaIO.Read` methods? Can we avoid a code duplication with new `ReadFromKafkaViaSDF` transform?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org