You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2020/08/05 22:30:43 UTC

[GitHub] [beam] lukecwik commented on a change in pull request #11749: [BEAM-9977] Implement ReadFromKafkaViaSDF

lukecwik commented on a change in pull request #11749:
URL: https://github.com/apache/beam/pull/11749#discussion_r466022189



##########
File path: sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
##########
@@ -198,6 +209,102 @@
  *    ...
  * }</pre>
  *
+ * <h2>Read from Kafka as a {@link DoFn}</h2>
+ *
+ * {@link ReadSourceDescriptors} is the {@link PTransform} that takes a PCollection of {@link
+ * KafkaSourceDescriptor} as input and outputs a PCollection of {@link KafkaRecord}. The core
+ * implementation is based on {@code SplittableDoFn}. For more details about the concept of {@code
+ * SplittableDoFn}, please refer to the <a
+ * href="https://beam.apache.org/blog/splittable-do-fn/">blog post</a> and <a
+ * href="https://s.apache.org/beam-fn-api">design doc</a>. The major difference from {@link
+ * KafkaIO.Read} is, {@link ReadSourceDescriptors} doesn't require source descriptions(e.g., {@link
+ * KafkaIO.Read#getTopicPartitions()}, {@link KafkaIO.Read#getTopics()}, {@link
+ * KafkaIO.Read#getStartReadTime()}, etc.) during the pipeline construction time. Instead, the
+ * pipeline can populate these source descriptions during runtime. For example, the pipeline can
+ * query Kafka topics from a BigQuery table and read these topics via {@link ReadSourceDescriptors}.
+ *
+ * <h3>Common Kafka Consumer Configurations</h3>
+ *
+ * <p>Most Kafka consumer configurations are similar to {@link KafkaIO.Read}:
+ *
+ * <ul>
+ *   <li>{@link ReadSourceDescriptors#getConsumerConfig()} is the same as {@link
+ *       KafkaIO.Read#getConsumerConfig()}.
+ *   <li>{@link ReadSourceDescriptors#getConsumerFactoryFn()} is the same as {@link
+ *       KafkaIO.Read#getConsumerFactoryFn()}.
+ *   <li>{@link ReadSourceDescriptors#getOffsetConsumerConfig()} is the same as {@link
+ *       KafkaIO.Read#getOffsetConsumerConfig()}.
+ *   <li>{@link ReadSourceDescriptors#getKeyCoder()} is the same as {@link
+ *       KafkaIO.Read#getKeyCoder()}.
+ *   <li>{@link ReadSourceDescriptors#getValueCoder()} is the same as {@link
+ *       KafkaIO.Read#getValueCoder()}.
+ *   <li>{@link ReadSourceDescriptors#getKeyDeserializerProvider()} is the same as {@link
+ *       KafkaIO.Read#getKeyDeserializerProvider()}.
+ *   <li>{@link ReadSourceDescriptors#getValueDeserializerProvider()} is the same as {@link
+ *       KafkaIO.Read#getValueDeserializerProvider()}.
+ *   <li>{@link ReadSourceDescriptors#isCommitOffsetEnabled()} has the same meaning as {@link
+ *       KafkaIO.Read#isCommitOffsetsInFinalizeEnabled()}.
+ * </ul>
+ *
+ * <p>For example, to create a basic {@link ReadSourceDescriptors} transform:
+ *
+ * <pre>{@code
+ * pipeline
+ *  .apply(Create.of(KafkaSourceDescriptor.of(new TopicPartition("topic", 1)))
+ *  .apply(KafkaIO.readAll()
+ *          .withBootstrapServers("broker_1:9092,broker_2:9092")
+ *          .withKeyDeserializer(LongDeserializer.class).
+ *          .withValueDeserializer(StringDeserializer.class));
+ * }</pre>
+ *
+ * Note that the {@code bootstrapServers} can also be populated from the {@link
+ * KafkaSourceDescriptor}:
+ *
+ * <pre>{@code
+ * pipeline
+ *  .apply(Create.of(
+ *    KafkaSourceDescriptor.of(
+ *      new TopicPartition("topic", 1),
+ *      null,
+ *      null,
+ *      ImmutableList.of("broker_1:9092", "broker_2:9092"))
+ *  .apply(KafkaIO.readAll()
+ *         .withKeyDeserializer(LongDeserializer.class).
+ *         .withValueDeserializer(StringDeserializer.class));
+ * }</pre>
+ *
+ * <h3>Configurations of {@link ReadSourceDescriptors}</h3>
+ *
+ * <p>Except configurations of Kafka Consumer, there are some other configurations which are related
+ * to processing records.
+ *
+ * <p>{@link ReadSourceDescriptors#commitOffsets()} enables committing offset after processing the
+ * record. Note that if the {@code isolation.level} is set to "read_committed" or {@link
+ * ConsumerConfig#ENABLE_AUTO_COMMIT_CONFIG} is set in the consumer config, the {@link
+ * ReadSourceDescriptors#commitOffsets()} will be ignored.
+ *
+ * <p>{@link ReadSourceDescriptors#withExtractOutputTimestampFn(SerializableFunction)} is used to
+ * compute the {@code output timestamp} for a given {@link KafkaRecord}. There are three built-in
+ * types: {@link ReadSourceDescriptors#withProcessingTime()}, {@link
+ * ReadSourceDescriptors#withCreateTime()} and {@link ReadSourceDescriptors#withLogAppendTime()}.

Review comment:
       use an unorderered list for the options: `<ul><li></ul>`

##########
File path: sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
##########
@@ -198,6 +209,102 @@
  *    ...
  * }</pre>
  *
+ * <h2>Read from Kafka as a {@link DoFn}</h2>
+ *
+ * {@link ReadSourceDescriptors} is the {@link PTransform} that takes a PCollection of {@link

Review comment:
       We should state the order of preference for how we start reading (offset/timestamp/last commit offset) in this block somewhere.

##########
File path: sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java
##########
@@ -0,0 +1,403 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.kafka;
+
+import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.TimeUnit;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.io.kafka.KafkaIO.ReadSourceDescriptors;
+import org.apache.beam.sdk.io.kafka.KafkaIOUtils.MovingAvg;
+import org.apache.beam.sdk.io.kafka.KafkaUnboundedReader.TimestampPolicyContext;
+import org.apache.beam.sdk.io.range.OffsetRange;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.DoFn.UnboundedPerElement;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.transforms.splittabledofn.GrowableOffsetRangeTracker;
+import org.apache.beam.sdk.transforms.splittabledofn.ManualWatermarkEstimator;
+import org.apache.beam.sdk.transforms.splittabledofn.OffsetRangeTracker;
+import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
+import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker.HasProgress;
+import org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimator;
+import org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimators.MonotonicallyIncreasing;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Supplier;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Suppliers;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.CacheBuilder;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.CacheLoader;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LoadingCache;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.io.Closeables;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A SplittableDoFn which reads from {@link KafkaSourceDescriptor} and outputs {@link KafkaRecord}.
+ * By default, a {@link MonotonicallyIncreasing} watermark estimator is used to track watermark.
+ *
+ * <p>{@link ReadFromKafkaDoFn} implements the logic of reading from Kafka. The element is a {@link
+ * KafkaSourceDescriptor}, and the restriction is an {@link OffsetRange} which represents record
+ * offset. A {@link GrowableOffsetRangeTracker} is used to track an {@link OffsetRange} ended with
+ * {@code Long.MAX_VALUE}. For a finite range, a {@link OffsetRangeTracker} is created.
+ *
+ * <h4>Initial Restriction</h4>
+ *
+ * <p>The initial range for a {@link KafkaSourceDescriptor } is defined by {@code [startOffset,
+ * Long.MAX_VALUE)} where {@code startOffset} is defined as:
+ *
+ * <ul>
+ *   <li>the {@code startReadOffset} if {@link KafkaSourceDescriptor#getStartReadOffset} is set.
+ *   <li>the first offset with a greater or equivalent timestamp if {@link
+ *       KafkaSourceDescriptor#getStartReadTime()} is set.
+ *   <li>the {@code last committed offset + 1} for the {@link Consumer#position(TopicPartition)
+ *       topic partition}.
+ * </ul>
+ *
+ * <h4>Splitting</h4>
+ *
+ * <p>TODO(BEAM-10319): Add support for initial splitting.
+ *
+ * <h4>Checkpoint and Resume Processing</h4>
+ *
+ * <p>There are 2 types of checkpoint here: self-checkpoint which invokes by the DoFn and
+ * system-checkpoint which is issued by the runner via {@link
+ * org.apache.beam.model.fnexecution.v1.BeamFnApi.ProcessBundleSplitRequest}. Every time the
+ * consumer gets empty response from {@link Consumer#poll(long)}, {@link ReadFromKafkaDoFn} will
+ * checkpoint at current {@link KafkaSourceDescriptor } and move to process the next element. These
+ * deferred elements will be resumed by the runner as soon as possible.
+ *
+ * <h4>Progress and Size</h4>
+ *
+ * <p>The progress is provided by {@link GrowableOffsetRangeTracker} or per {@link
+ * KafkaSourceDescriptor }. For an infinite {@link OffsetRange}, a Kafka {@link Consumer} is used in
+ * the {@link GrowableOffsetRangeTracker} as the {@link
+ * GrowableOffsetRangeTracker.RangeEndEstimator} to poll the latest offset. Please refer to {@link
+ * ReadFromKafkaDoFn#restrictionTracker(KafkaSourceDescriptor, OffsetRange)} for details.
+ *
+ * <p>The size is computed by {@link ReadFromKafkaDoFn#getSize(KafkaSourceDescriptor, OffsetRange).}

Review comment:
       ```suggestion
    * <p>The size is computed by {@link ReadFromKafkaDoFn#getSize(KafkaSourceDescriptor, OffsetRange)}.
   ```

##########
File path: sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
##########
@@ -1051,33 +1198,352 @@ public void populateDisplayData(DisplayData.Builder builder) {
     }
   }
 
-  ////////////////////////////////////////////////////////////////////////////////////////////////
-
-  private static final Logger LOG = LoggerFactory.getLogger(KafkaIO.class);
-
   /**
-   * Returns a new config map which is merge of current config and updates. Verifies the updates do
-   * not includes ignored properties.
+   * A {@link PTransform} to read from Kafka. See {@link KafkaIO} for more information on usage and
+   * configuration.
    */
-  private static Map<String, Object> updateKafkaProperties(
-      Map<String, Object> currentConfig,
-      Map<String, String> ignoredProperties,
-      Map<String, Object> updates) {
+  @Experimental(Kind.PORTABILITY)
+  @AutoValue
+  public abstract static class ReadSourceDescriptors<K, V>
+      extends PTransform<PCollection<KafkaSourceDescriptor>, PCollection<KafkaRecord<K, V>>> {
+
+    private static final Logger LOG = LoggerFactory.getLogger(ReadSourceDescriptors.class);
+
+    abstract Map<String, Object> getConsumerConfig();

Review comment:
       It is unfortunate that Read doesn't have the documentation for these methods that we could copy.

##########
File path: sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
##########
@@ -1051,33 +1198,352 @@ public void populateDisplayData(DisplayData.Builder builder) {
     }
   }
 
-  ////////////////////////////////////////////////////////////////////////////////////////////////
-
-  private static final Logger LOG = LoggerFactory.getLogger(KafkaIO.class);
-
   /**
-   * Returns a new config map which is merge of current config and updates. Verifies the updates do
-   * not includes ignored properties.
+   * A {@link PTransform} to read from Kafka. See {@link KafkaIO} for more information on usage and

Review comment:
       In the javadoc, we should state:
   * what the defaults are if nothing is configured
   * all the withYYY methods, we should state that the KafkaSourceDescriptor.x takes precendence and if unset then we default to using this method.
   * all the KafkaSourceDescriptor methods should state what they override in the ReadSourceDescriptors transform
   * that the watermark is controlled by one of withCreateWatermarkEstimatorFn / withWallTimeWatermarkEstimator / ...
   * that the output timestamp is controlled by one of withProcessingTime / withLogAppendTime / ...
   
   some of this Javadoc should go on KafkaIO while others makes more sense to place on the methods.

##########
File path: sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java
##########
@@ -0,0 +1,403 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.kafka;
+
+import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.TimeUnit;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.io.kafka.KafkaIO.ReadSourceDescriptors;
+import org.apache.beam.sdk.io.kafka.KafkaIOUtils.MovingAvg;
+import org.apache.beam.sdk.io.kafka.KafkaUnboundedReader.TimestampPolicyContext;
+import org.apache.beam.sdk.io.range.OffsetRange;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.DoFn.UnboundedPerElement;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.transforms.splittabledofn.GrowableOffsetRangeTracker;
+import org.apache.beam.sdk.transforms.splittabledofn.ManualWatermarkEstimator;
+import org.apache.beam.sdk.transforms.splittabledofn.OffsetRangeTracker;
+import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
+import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker.HasProgress;
+import org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimator;
+import org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimators.MonotonicallyIncreasing;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Supplier;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Suppliers;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.CacheBuilder;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.CacheLoader;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LoadingCache;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.io.Closeables;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A SplittableDoFn which reads from {@link KafkaSourceDescriptor} and outputs {@link KafkaRecord}.

Review comment:
       Most of my comments above about Javadoc can be copied directly from the comments you have made here.

##########
File path: sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java
##########
@@ -0,0 +1,403 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.kafka;
+
+import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.TimeUnit;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.io.kafka.KafkaIO.ReadSourceDescriptors;
+import org.apache.beam.sdk.io.kafka.KafkaIOUtils.MovingAvg;
+import org.apache.beam.sdk.io.kafka.KafkaUnboundedReader.TimestampPolicyContext;
+import org.apache.beam.sdk.io.range.OffsetRange;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.DoFn.UnboundedPerElement;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.transforms.splittabledofn.GrowableOffsetRangeTracker;
+import org.apache.beam.sdk.transforms.splittabledofn.ManualWatermarkEstimator;
+import org.apache.beam.sdk.transforms.splittabledofn.OffsetRangeTracker;
+import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
+import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker.HasProgress;
+import org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimator;
+import org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimators.MonotonicallyIncreasing;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Supplier;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Suppliers;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.CacheBuilder;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.CacheLoader;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LoadingCache;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.io.Closeables;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A SplittableDoFn which reads from {@link KafkaSourceDescriptor} and outputs {@link KafkaRecord}.
+ * By default, a {@link MonotonicallyIncreasing} watermark estimator is used to track watermark.
+ *
+ * <p>{@link ReadFromKafkaDoFn} implements the logic of reading from Kafka. The element is a {@link
+ * KafkaSourceDescriptor}, and the restriction is an {@link OffsetRange} which represents record
+ * offset. A {@link GrowableOffsetRangeTracker} is used to track an {@link OffsetRange} ended with
+ * {@code Long.MAX_VALUE}. For a finite range, a {@link OffsetRangeTracker} is created.
+ *
+ * <h4>Initial Restriction</h4>
+ *
+ * <p>The initial range for a {@link KafkaSourceDescriptor } is defined by {@code [startOffset,

Review comment:
       ```suggestion
    * <p>The initial range for a {@link KafkaSourceDescriptor} is defined by {@code [startOffset,
   ```

##########
File path: sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
##########
@@ -198,6 +209,102 @@
  *    ...
  * }</pre>
  *
+ * <h2>Read from Kafka as a {@link DoFn}</h2>
+ *
+ * {@link ReadSourceDescriptors} is the {@link PTransform} that takes a PCollection of {@link
+ * KafkaSourceDescriptor} as input and outputs a PCollection of {@link KafkaRecord}. The core
+ * implementation is based on {@code SplittableDoFn}. For more details about the concept of {@code
+ * SplittableDoFn}, please refer to the <a
+ * href="https://beam.apache.org/blog/splittable-do-fn/">blog post</a> and <a
+ * href="https://s.apache.org/beam-fn-api">design doc</a>. The major difference from {@link
+ * KafkaIO.Read} is, {@link ReadSourceDescriptors} doesn't require source descriptions(e.g., {@link
+ * KafkaIO.Read#getTopicPartitions()}, {@link KafkaIO.Read#getTopics()}, {@link
+ * KafkaIO.Read#getStartReadTime()}, etc.) during the pipeline construction time. Instead, the
+ * pipeline can populate these source descriptions during runtime. For example, the pipeline can
+ * query Kafka topics from a BigQuery table and read these topics via {@link ReadSourceDescriptors}.
+ *
+ * <h3>Common Kafka Consumer Configurations</h3>
+ *
+ * <p>Most Kafka consumer configurations are similar to {@link KafkaIO.Read}:
+ *
+ * <ul>
+ *   <li>{@link ReadSourceDescriptors#getConsumerConfig()} is the same as {@link
+ *       KafkaIO.Read#getConsumerConfig()}.
+ *   <li>{@link ReadSourceDescriptors#getConsumerFactoryFn()} is the same as {@link
+ *       KafkaIO.Read#getConsumerFactoryFn()}.
+ *   <li>{@link ReadSourceDescriptors#getOffsetConsumerConfig()} is the same as {@link
+ *       KafkaIO.Read#getOffsetConsumerConfig()}.
+ *   <li>{@link ReadSourceDescriptors#getKeyCoder()} is the same as {@link
+ *       KafkaIO.Read#getKeyCoder()}.
+ *   <li>{@link ReadSourceDescriptors#getValueCoder()} is the same as {@link
+ *       KafkaIO.Read#getValueCoder()}.
+ *   <li>{@link ReadSourceDescriptors#getKeyDeserializerProvider()} is the same as {@link
+ *       KafkaIO.Read#getKeyDeserializerProvider()}.
+ *   <li>{@link ReadSourceDescriptors#getValueDeserializerProvider()} is the same as {@link
+ *       KafkaIO.Read#getValueDeserializerProvider()}.
+ *   <li>{@link ReadSourceDescriptors#isCommitOffsetEnabled()} has the same meaning as {@link
+ *       KafkaIO.Read#isCommitOffsetsInFinalizeEnabled()}.
+ * </ul>
+ *
+ * <p>For example, to create a basic {@link ReadSourceDescriptors} transform:
+ *
+ * <pre>{@code
+ * pipeline
+ *  .apply(Create.of(KafkaSourceDescriptor.of(new TopicPartition("topic", 1)))
+ *  .apply(KafkaIO.readAll()
+ *          .withBootstrapServers("broker_1:9092,broker_2:9092")
+ *          .withKeyDeserializer(LongDeserializer.class).
+ *          .withValueDeserializer(StringDeserializer.class));
+ * }</pre>
+ *
+ * Note that the {@code bootstrapServers} can also be populated from the {@link
+ * KafkaSourceDescriptor}:
+ *
+ * <pre>{@code
+ * pipeline
+ *  .apply(Create.of(
+ *    KafkaSourceDescriptor.of(
+ *      new TopicPartition("topic", 1),
+ *      null,
+ *      null,
+ *      ImmutableList.of("broker_1:9092", "broker_2:9092"))
+ *  .apply(KafkaIO.readAll()
+ *         .withKeyDeserializer(LongDeserializer.class).
+ *         .withValueDeserializer(StringDeserializer.class));
+ * }</pre>
+ *
+ * <h3>Configurations of {@link ReadSourceDescriptors}</h3>
+ *
+ * <p>Except configurations of Kafka Consumer, there are some other configurations which are related
+ * to processing records.
+ *
+ * <p>{@link ReadSourceDescriptors#commitOffsets()} enables committing offset after processing the
+ * record. Note that if the {@code isolation.level} is set to "read_committed" or {@link
+ * ConsumerConfig#ENABLE_AUTO_COMMIT_CONFIG} is set in the consumer config, the {@link
+ * ReadSourceDescriptors#commitOffsets()} will be ignored.
+ *
+ * <p>{@link ReadSourceDescriptors#withExtractOutputTimestampFn(SerializableFunction)} is used to
+ * compute the {@code output timestamp} for a given {@link KafkaRecord}. There are three built-in

Review comment:
       ```suggestion
    * compute the {@code output timestamp} for a given {@link KafkaRecord} and controls the watermark advancement. There are three built-in
   ```

##########
File path: sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java
##########
@@ -0,0 +1,403 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.kafka;
+
+import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.TimeUnit;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.io.kafka.KafkaIO.ReadSourceDescriptors;
+import org.apache.beam.sdk.io.kafka.KafkaIOUtils.MovingAvg;
+import org.apache.beam.sdk.io.kafka.KafkaUnboundedReader.TimestampPolicyContext;
+import org.apache.beam.sdk.io.range.OffsetRange;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.DoFn.UnboundedPerElement;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.transforms.splittabledofn.GrowableOffsetRangeTracker;
+import org.apache.beam.sdk.transforms.splittabledofn.ManualWatermarkEstimator;
+import org.apache.beam.sdk.transforms.splittabledofn.OffsetRangeTracker;
+import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
+import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker.HasProgress;
+import org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimator;
+import org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimators.MonotonicallyIncreasing;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Supplier;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Suppliers;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.CacheBuilder;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.CacheLoader;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LoadingCache;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.io.Closeables;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A SplittableDoFn which reads from {@link KafkaSourceDescriptor} and outputs {@link KafkaRecord}.
+ * By default, a {@link MonotonicallyIncreasing} watermark estimator is used to track watermark.
+ *
+ * <p>{@link ReadFromKafkaDoFn} implements the logic of reading from Kafka. The element is a {@link
+ * KafkaSourceDescriptor}, and the restriction is an {@link OffsetRange} which represents record
+ * offset. A {@link GrowableOffsetRangeTracker} is used to track an {@link OffsetRange} ended with
+ * {@code Long.MAX_VALUE}. For a finite range, a {@link OffsetRangeTracker} is created.
+ *
+ * <h4>Initial Restriction</h4>
+ *
+ * <p>The initial range for a {@link KafkaSourceDescriptor } is defined by {@code [startOffset,
+ * Long.MAX_VALUE)} where {@code startOffset} is defined as:
+ *
+ * <ul>
+ *   <li>the {@code startReadOffset} if {@link KafkaSourceDescriptor#getStartReadOffset} is set.
+ *   <li>the first offset with a greater or equivalent timestamp if {@link
+ *       KafkaSourceDescriptor#getStartReadTime()} is set.
+ *   <li>the {@code last committed offset + 1} for the {@link Consumer#position(TopicPartition)
+ *       topic partition}.
+ * </ul>
+ *
+ * <h4>Splitting</h4>
+ *
+ * <p>TODO(BEAM-10319): Add support for initial splitting.
+ *
+ * <h4>Checkpoint and Resume Processing</h4>
+ *
+ * <p>There are 2 types of checkpoint here: self-checkpoint which invokes by the DoFn and
+ * system-checkpoint which is issued by the runner via {@link
+ * org.apache.beam.model.fnexecution.v1.BeamFnApi.ProcessBundleSplitRequest}. Every time the
+ * consumer gets empty response from {@link Consumer#poll(long)}, {@link ReadFromKafkaDoFn} will
+ * checkpoint at current {@link KafkaSourceDescriptor } and move to process the next element. These
+ * deferred elements will be resumed by the runner as soon as possible.
+ *
+ * <h4>Progress and Size</h4>
+ *
+ * <p>The progress is provided by {@link GrowableOffsetRangeTracker} or per {@link
+ * KafkaSourceDescriptor }. For an infinite {@link OffsetRange}, a Kafka {@link Consumer} is used in

Review comment:
       ```suggestion
    * KafkaSourceDescriptor}. For an infinite {@link OffsetRange}, a Kafka {@link Consumer} is used in
   ```

##########
File path: sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
##########
@@ -1051,33 +1198,352 @@ public void populateDisplayData(DisplayData.Builder builder) {
     }
   }
 
-  ////////////////////////////////////////////////////////////////////////////////////////////////
-
-  private static final Logger LOG = LoggerFactory.getLogger(KafkaIO.class);
-
   /**
-   * Returns a new config map which is merge of current config and updates. Verifies the updates do
-   * not includes ignored properties.
+   * A {@link PTransform} to read from Kafka. See {@link KafkaIO} for more information on usage and
+   * configuration.
    */
-  private static Map<String, Object> updateKafkaProperties(
-      Map<String, Object> currentConfig,
-      Map<String, String> ignoredProperties,
-      Map<String, Object> updates) {
+  @Experimental(Kind.PORTABILITY)
+  @AutoValue
+  public abstract static class ReadSourceDescriptors<K, V>
+      extends PTransform<PCollection<KafkaSourceDescriptor>, PCollection<KafkaRecord<K, V>>> {
+
+    private static final Logger LOG = LoggerFactory.getLogger(ReadSourceDescriptors.class);
+
+    abstract Map<String, Object> getConsumerConfig();
+
+    @Nullable
+    abstract Map<String, Object> getOffsetConsumerConfig();
+
+    @Nullable
+    abstract DeserializerProvider getKeyDeserializerProvider();
+
+    @Nullable
+    abstract DeserializerProvider getValueDeserializerProvider();
+
+    @Nullable
+    abstract Coder<K> getKeyCoder();
+
+    @Nullable
+    abstract Coder<V> getValueCoder();
+
+    abstract SerializableFunction<Map<String, Object>, Consumer<byte[], byte[]>>
+        getConsumerFactoryFn();
+
+    @Nullable
+    abstract SerializableFunction<KafkaRecord<K, V>, Instant> getExtractOutputTimestampFn();
+
+    @Nullable
+    abstract SerializableFunction<Instant, WatermarkEstimator<Instant>>
+        getCreateWatermarkEstimatorFn();
+
+    abstract boolean isCommitOffsetEnabled();
+
+    @Nullable
+    abstract TimestampPolicyFactory<K, V> getTimestampPolicyFactory();
+
+    abstract ReadSourceDescriptors.Builder<K, V> toBuilder();
+
+    @AutoValue.Builder
+    abstract static class Builder<K, V> {
+      abstract ReadSourceDescriptors.Builder<K, V> setConsumerConfig(Map<String, Object> config);
+
+      abstract ReadSourceDescriptors.Builder<K, V> setOffsetConsumerConfig(
+          Map<String, Object> offsetConsumerConfig);
+
+      abstract ReadSourceDescriptors.Builder<K, V> setConsumerFactoryFn(
+          SerializableFunction<Map<String, Object>, Consumer<byte[], byte[]>> consumerFactoryFn);
+
+      abstract ReadSourceDescriptors.Builder<K, V> setKeyDeserializerProvider(
+          DeserializerProvider deserializerProvider);
+
+      abstract ReadSourceDescriptors.Builder<K, V> setValueDeserializerProvider(
+          DeserializerProvider deserializerProvider);
+
+      abstract ReadSourceDescriptors.Builder<K, V> setKeyCoder(Coder<K> keyCoder);
+
+      abstract ReadSourceDescriptors.Builder<K, V> setValueCoder(Coder<V> valueCoder);
+
+      abstract ReadSourceDescriptors.Builder<K, V> setExtractOutputTimestampFn(
+          SerializableFunction<KafkaRecord<K, V>, Instant> fn);
+
+      abstract ReadSourceDescriptors.Builder<K, V> setCreateWatermarkEstimatorFn(
+          SerializableFunction<Instant, WatermarkEstimator<Instant>> fn);
+
+      abstract ReadSourceDescriptors.Builder<K, V> setCommitOffsetEnabled(
+          boolean commitOffsetEnabled);
+
+      abstract ReadSourceDescriptors.Builder<K, V> setTimestampPolicyFactory(
+          TimestampPolicyFactory<K, V> policy);
+
+      abstract ReadSourceDescriptors<K, V> build();
+    }
+
+    public static <K, V> ReadSourceDescriptors<K, V> read() {
+      return new AutoValue_KafkaIO_ReadSourceDescriptors.Builder<K, V>()
+          .setConsumerFactoryFn(KafkaIOUtils.KAFKA_CONSUMER_FACTORY_FN)
+          .setConsumerConfig(KafkaIOUtils.DEFAULT_CONSUMER_PROPERTIES)
+          .setCommitOffsetEnabled(false)
+          .build()
+          .withProcessingTime()
+          .withMonotonicallyIncreasingWatermarkEstimator();
+    }
+
+    // Note that if the bootstrapServers is set here but also populated with the element, the
+    // element
+    // will override the bootstrapServers from the config.
+    public ReadSourceDescriptors<K, V> withBootstrapServers(String bootstrapServers) {
+      return withConsumerConfigUpdates(
+          ImmutableMap.of(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers));
+    }
+
+    public ReadSourceDescriptors<K, V> withKeyDeserializerProvider(
+        DeserializerProvider<K> deserializerProvider) {
+      return toBuilder().setKeyDeserializerProvider(deserializerProvider).build();
+    }
+
+    public ReadSourceDescriptors<K, V> withValueDeserializerProvider(
+        DeserializerProvider<V> deserializerProvider) {
+      return toBuilder().setValueDeserializerProvider(deserializerProvider).build();
+    }
+
+    public ReadSourceDescriptors<K, V> withKeyDeserializer(
+        Class<? extends Deserializer<K>> keyDeserializer) {
+      return withKeyDeserializerProvider(LocalDeserializerProvider.of(keyDeserializer));
+    }
+
+    public ReadSourceDescriptors<K, V> withValueDeserializer(
+        Class<? extends Deserializer<V>> valueDeserializer) {
+      return withValueDeserializerProvider(LocalDeserializerProvider.of(valueDeserializer));
+    }
+
+    public ReadSourceDescriptors<K, V> withKeyDeserializerAndCoder(
+        Class<? extends Deserializer<K>> keyDeserializer, Coder<K> keyCoder) {
+      return withKeyDeserializer(keyDeserializer).toBuilder().setKeyCoder(keyCoder).build();
+    }
+
+    public ReadSourceDescriptors<K, V> withValueDeserializerAndCoder(
+        Class<? extends Deserializer<V>> valueDeserializer, Coder<V> valueCoder) {
+      return withValueDeserializer(valueDeserializer).toBuilder().setValueCoder(valueCoder).build();
+    }
+
+    public ReadSourceDescriptors<K, V> withConsumerFactoryFn(
+        SerializableFunction<Map<String, Object>, Consumer<byte[], byte[]>> consumerFactoryFn) {
+      return toBuilder().setConsumerFactoryFn(consumerFactoryFn).build();
+    }
+
+    public ReadSourceDescriptors<K, V> withConsumerConfigUpdates(
+        Map<String, Object> configUpdates) {
+      Map<String, Object> config =
+          KafkaIOUtils.updateKafkaProperties(getConsumerConfig(), configUpdates);
+      return toBuilder().setConsumerConfig(config).build();
+    }
+
+    public ReadSourceDescriptors<K, V> withExtractOutputTimestampFn(
+        SerializableFunction<KafkaRecord<K, V>, Instant> fn) {
+      return toBuilder().setExtractOutputTimestampFn(fn).build();
+    }
+
+    public ReadSourceDescriptors<K, V> withCreatWatermarkEstimatorFn(
+        SerializableFunction<Instant, WatermarkEstimator<Instant>> fn) {
+      return toBuilder().setCreateWatermarkEstimatorFn(fn).build();
+    }
+
+    public ReadSourceDescriptors<K, V> withLogAppendTime() {
+      return withExtractOutputTimestampFn(
+          ReadSourceDescriptors.ExtractOutputTimestampFns.useLogAppendTime());
+    }
+
+    public ReadSourceDescriptors<K, V> withProcessingTime() {
+      return withExtractOutputTimestampFn(
+          ReadSourceDescriptors.ExtractOutputTimestampFns.useProcessingTime());
+    }
+
+    public ReadSourceDescriptors<K, V> withCreateTime() {
+      return withExtractOutputTimestampFn(
+          ReadSourceDescriptors.ExtractOutputTimestampFns.useCreateTime());
+    }
+
+    public ReadSourceDescriptors<K, V> withWallTimeWatermarkEstimator() {
+      return withCreatWatermarkEstimatorFn(
+          state -> {
+            return new WallTime(state);
+          });
+    }
+
+    public ReadSourceDescriptors<K, V> withMonotonicallyIncreasingWatermarkEstimator() {
+      return withCreatWatermarkEstimatorFn(
+          state -> {
+            return new MonotonicallyIncreasing(state);
+          });
+    }
+
+    public ReadSourceDescriptors<K, V> withManualWatermarkEstimator() {
+      return withCreatWatermarkEstimatorFn(
+          state -> {
+            return new Manual(state);
+          });
+    }
+
+    // If a transactional producer is used and it's desired to only read records from committed
+    // transaction, it's recommended to set read_committed. Otherwise, read_uncommitted is the
+    // default value.
+    public ReadSourceDescriptors<K, V> withReadCommitted() {
+      return withConsumerConfigUpdates(ImmutableMap.of("isolation.level", "read_committed"));
+    }
+
+    public ReadSourceDescriptors<K, V> commitOffsets() {
+      return toBuilder().setCommitOffsetEnabled(true).build();
+    }
+
+    public ReadSourceDescriptors<K, V> withOffsetConsumerConfigOverrides(
+        Map<String, Object> offsetConsumerConfig) {
+      return toBuilder().setOffsetConsumerConfig(offsetConsumerConfig).build();
+    }
+
+    public ReadSourceDescriptors<K, V> withConsumerConfigOverrides(
+        Map<String, Object> consumerConfig) {
+      return toBuilder().setConsumerConfig(consumerConfig).build();
+    }
+
+    // TODO(BEAM-10320): Create external build transform for ReadSourceDescriptors().

Review comment:
       This looks like it is done.

##########
File path: sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java
##########
@@ -0,0 +1,403 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.kafka;
+
+import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.TimeUnit;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.io.kafka.KafkaIO.ReadSourceDescriptors;
+import org.apache.beam.sdk.io.kafka.KafkaIOUtils.MovingAvg;
+import org.apache.beam.sdk.io.kafka.KafkaUnboundedReader.TimestampPolicyContext;
+import org.apache.beam.sdk.io.range.OffsetRange;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.DoFn.UnboundedPerElement;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.transforms.splittabledofn.GrowableOffsetRangeTracker;
+import org.apache.beam.sdk.transforms.splittabledofn.ManualWatermarkEstimator;
+import org.apache.beam.sdk.transforms.splittabledofn.OffsetRangeTracker;
+import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
+import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker.HasProgress;
+import org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimator;
+import org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimators.MonotonicallyIncreasing;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Supplier;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Suppliers;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.CacheBuilder;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.CacheLoader;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LoadingCache;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.io.Closeables;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A SplittableDoFn which reads from {@link KafkaSourceDescriptor} and outputs {@link KafkaRecord}.
+ * By default, a {@link MonotonicallyIncreasing} watermark estimator is used to track watermark.
+ *
+ * <p>{@link ReadFromKafkaDoFn} implements the logic of reading from Kafka. The element is a {@link
+ * KafkaSourceDescriptor}, and the restriction is an {@link OffsetRange} which represents record
+ * offset. A {@link GrowableOffsetRangeTracker} is used to track an {@link OffsetRange} ended with
+ * {@code Long.MAX_VALUE}. For a finite range, a {@link OffsetRangeTracker} is created.
+ *
+ * <h4>Initial Restriction</h4>
+ *
+ * <p>The initial range for a {@link KafkaSourceDescriptor } is defined by {@code [startOffset,
+ * Long.MAX_VALUE)} where {@code startOffset} is defined as:
+ *
+ * <ul>
+ *   <li>the {@code startReadOffset} if {@link KafkaSourceDescriptor#getStartReadOffset} is set.
+ *   <li>the first offset with a greater or equivalent timestamp if {@link
+ *       KafkaSourceDescriptor#getStartReadTime()} is set.
+ *   <li>the {@code last committed offset + 1} for the {@link Consumer#position(TopicPartition)
+ *       topic partition}.
+ * </ul>
+ *
+ * <h4>Splitting</h4>
+ *
+ * <p>TODO(BEAM-10319): Add support for initial splitting.
+ *
+ * <h4>Checkpoint and Resume Processing</h4>
+ *
+ * <p>There are 2 types of checkpoint here: self-checkpoint which invokes by the DoFn and
+ * system-checkpoint which is issued by the runner via {@link
+ * org.apache.beam.model.fnexecution.v1.BeamFnApi.ProcessBundleSplitRequest}. Every time the
+ * consumer gets empty response from {@link Consumer#poll(long)}, {@link ReadFromKafkaDoFn} will
+ * checkpoint at current {@link KafkaSourceDescriptor } and move to process the next element. These

Review comment:
       ```suggestion
    * checkpoint the current {@link KafkaSourceDescriptor} and move to process the next element. These
   ```

##########
File path: sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
##########
@@ -1051,33 +1198,352 @@ public void populateDisplayData(DisplayData.Builder builder) {
     }
   }
 
-  ////////////////////////////////////////////////////////////////////////////////////////////////
-
-  private static final Logger LOG = LoggerFactory.getLogger(KafkaIO.class);
-
   /**
-   * Returns a new config map which is merge of current config and updates. Verifies the updates do
-   * not includes ignored properties.
+   * A {@link PTransform} to read from Kafka. See {@link KafkaIO} for more information on usage and
+   * configuration.
    */
-  private static Map<String, Object> updateKafkaProperties(
-      Map<String, Object> currentConfig,
-      Map<String, String> ignoredProperties,
-      Map<String, Object> updates) {
+  @Experimental(Kind.PORTABILITY)
+  @AutoValue
+  public abstract static class ReadSourceDescriptors<K, V>
+      extends PTransform<PCollection<KafkaSourceDescriptor>, PCollection<KafkaRecord<K, V>>> {
+
+    private static final Logger LOG = LoggerFactory.getLogger(ReadSourceDescriptors.class);
+
+    abstract Map<String, Object> getConsumerConfig();
+
+    @Nullable
+    abstract Map<String, Object> getOffsetConsumerConfig();
+
+    @Nullable
+    abstract DeserializerProvider getKeyDeserializerProvider();
+
+    @Nullable
+    abstract DeserializerProvider getValueDeserializerProvider();
+
+    @Nullable
+    abstract Coder<K> getKeyCoder();
+
+    @Nullable
+    abstract Coder<V> getValueCoder();
+
+    abstract SerializableFunction<Map<String, Object>, Consumer<byte[], byte[]>>
+        getConsumerFactoryFn();
+
+    @Nullable
+    abstract SerializableFunction<KafkaRecord<K, V>, Instant> getExtractOutputTimestampFn();
+
+    @Nullable
+    abstract SerializableFunction<Instant, WatermarkEstimator<Instant>>
+        getCreateWatermarkEstimatorFn();
+
+    abstract boolean isCommitOffsetEnabled();
+
+    @Nullable
+    abstract TimestampPolicyFactory<K, V> getTimestampPolicyFactory();
+
+    abstract ReadSourceDescriptors.Builder<K, V> toBuilder();
+
+    @AutoValue.Builder
+    abstract static class Builder<K, V> {
+      abstract ReadSourceDescriptors.Builder<K, V> setConsumerConfig(Map<String, Object> config);
+
+      abstract ReadSourceDescriptors.Builder<K, V> setOffsetConsumerConfig(
+          Map<String, Object> offsetConsumerConfig);
+
+      abstract ReadSourceDescriptors.Builder<K, V> setConsumerFactoryFn(
+          SerializableFunction<Map<String, Object>, Consumer<byte[], byte[]>> consumerFactoryFn);
+
+      abstract ReadSourceDescriptors.Builder<K, V> setKeyDeserializerProvider(
+          DeserializerProvider deserializerProvider);
+
+      abstract ReadSourceDescriptors.Builder<K, V> setValueDeserializerProvider(
+          DeserializerProvider deserializerProvider);
+
+      abstract ReadSourceDescriptors.Builder<K, V> setKeyCoder(Coder<K> keyCoder);
+
+      abstract ReadSourceDescriptors.Builder<K, V> setValueCoder(Coder<V> valueCoder);
+
+      abstract ReadSourceDescriptors.Builder<K, V> setExtractOutputTimestampFn(
+          SerializableFunction<KafkaRecord<K, V>, Instant> fn);
+
+      abstract ReadSourceDescriptors.Builder<K, V> setCreateWatermarkEstimatorFn(
+          SerializableFunction<Instant, WatermarkEstimator<Instant>> fn);
+
+      abstract ReadSourceDescriptors.Builder<K, V> setCommitOffsetEnabled(
+          boolean commitOffsetEnabled);
+
+      abstract ReadSourceDescriptors.Builder<K, V> setTimestampPolicyFactory(
+          TimestampPolicyFactory<K, V> policy);
+
+      abstract ReadSourceDescriptors<K, V> build();
+    }
+
+    public static <K, V> ReadSourceDescriptors<K, V> read() {
+      return new AutoValue_KafkaIO_ReadSourceDescriptors.Builder<K, V>()
+          .setConsumerFactoryFn(KafkaIOUtils.KAFKA_CONSUMER_FACTORY_FN)
+          .setConsumerConfig(KafkaIOUtils.DEFAULT_CONSUMER_PROPERTIES)
+          .setCommitOffsetEnabled(false)
+          .build()
+          .withProcessingTime()
+          .withMonotonicallyIncreasingWatermarkEstimator();
+    }
+
+    // Note that if the bootstrapServers is set here but also populated with the element, the
+    // element
+    // will override the bootstrapServers from the config.
+    public ReadSourceDescriptors<K, V> withBootstrapServers(String bootstrapServers) {
+      return withConsumerConfigUpdates(
+          ImmutableMap.of(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers));
+    }
+
+    public ReadSourceDescriptors<K, V> withKeyDeserializerProvider(
+        DeserializerProvider<K> deserializerProvider) {
+      return toBuilder().setKeyDeserializerProvider(deserializerProvider).build();
+    }
+
+    public ReadSourceDescriptors<K, V> withValueDeserializerProvider(
+        DeserializerProvider<V> deserializerProvider) {
+      return toBuilder().setValueDeserializerProvider(deserializerProvider).build();
+    }
+
+    public ReadSourceDescriptors<K, V> withKeyDeserializer(
+        Class<? extends Deserializer<K>> keyDeserializer) {
+      return withKeyDeserializerProvider(LocalDeserializerProvider.of(keyDeserializer));
+    }
+
+    public ReadSourceDescriptors<K, V> withValueDeserializer(
+        Class<? extends Deserializer<V>> valueDeserializer) {
+      return withValueDeserializerProvider(LocalDeserializerProvider.of(valueDeserializer));
+    }
+
+    public ReadSourceDescriptors<K, V> withKeyDeserializerAndCoder(
+        Class<? extends Deserializer<K>> keyDeserializer, Coder<K> keyCoder) {
+      return withKeyDeserializer(keyDeserializer).toBuilder().setKeyCoder(keyCoder).build();
+    }
+
+    public ReadSourceDescriptors<K, V> withValueDeserializerAndCoder(
+        Class<? extends Deserializer<V>> valueDeserializer, Coder<V> valueCoder) {
+      return withValueDeserializer(valueDeserializer).toBuilder().setValueCoder(valueCoder).build();
+    }
+
+    public ReadSourceDescriptors<K, V> withConsumerFactoryFn(
+        SerializableFunction<Map<String, Object>, Consumer<byte[], byte[]>> consumerFactoryFn) {
+      return toBuilder().setConsumerFactoryFn(consumerFactoryFn).build();
+    }
+
+    public ReadSourceDescriptors<K, V> withConsumerConfigUpdates(
+        Map<String, Object> configUpdates) {
+      Map<String, Object> config =
+          KafkaIOUtils.updateKafkaProperties(getConsumerConfig(), configUpdates);
+      return toBuilder().setConsumerConfig(config).build();
+    }
+
+    public ReadSourceDescriptors<K, V> withExtractOutputTimestampFn(
+        SerializableFunction<KafkaRecord<K, V>, Instant> fn) {
+      return toBuilder().setExtractOutputTimestampFn(fn).build();
+    }
+
+    public ReadSourceDescriptors<K, V> withCreatWatermarkEstimatorFn(
+        SerializableFunction<Instant, WatermarkEstimator<Instant>> fn) {
+      return toBuilder().setCreateWatermarkEstimatorFn(fn).build();
+    }
+
+    public ReadSourceDescriptors<K, V> withLogAppendTime() {
+      return withExtractOutputTimestampFn(
+          ReadSourceDescriptors.ExtractOutputTimestampFns.useLogAppendTime());
+    }
+
+    public ReadSourceDescriptors<K, V> withProcessingTime() {
+      return withExtractOutputTimestampFn(
+          ReadSourceDescriptors.ExtractOutputTimestampFns.useProcessingTime());
+    }
+
+    public ReadSourceDescriptors<K, V> withCreateTime() {
+      return withExtractOutputTimestampFn(
+          ReadSourceDescriptors.ExtractOutputTimestampFns.useCreateTime());
+    }
+
+    public ReadSourceDescriptors<K, V> withWallTimeWatermarkEstimator() {
+      return withCreatWatermarkEstimatorFn(
+          state -> {
+            return new WallTime(state);
+          });
+    }
+
+    public ReadSourceDescriptors<K, V> withMonotonicallyIncreasingWatermarkEstimator() {
+      return withCreatWatermarkEstimatorFn(
+          state -> {
+            return new MonotonicallyIncreasing(state);
+          });
+    }
+
+    public ReadSourceDescriptors<K, V> withManualWatermarkEstimator() {
+      return withCreatWatermarkEstimatorFn(
+          state -> {
+            return new Manual(state);
+          });
+    }
+
+    // If a transactional producer is used and it's desired to only read records from committed
+    // transaction, it's recommended to set read_committed. Otherwise, read_uncommitted is the
+    // default value.
+    public ReadSourceDescriptors<K, V> withReadCommitted() {
+      return withConsumerConfigUpdates(ImmutableMap.of("isolation.level", "read_committed"));
+    }
+
+    public ReadSourceDescriptors<K, V> commitOffsets() {
+      return toBuilder().setCommitOffsetEnabled(true).build();
+    }
+
+    public ReadSourceDescriptors<K, V> withOffsetConsumerConfigOverrides(
+        Map<String, Object> offsetConsumerConfig) {
+      return toBuilder().setOffsetConsumerConfig(offsetConsumerConfig).build();
+    }
+
+    public ReadSourceDescriptors<K, V> withConsumerConfigOverrides(
+        Map<String, Object> consumerConfig) {
+      return toBuilder().setConsumerConfig(consumerConfig).build();
+    }
+
+    // TODO(BEAM-10320): Create external build transform for ReadSourceDescriptors().
+    ReadAllFromRow forExternalBuild() {
+      return new ReadAllFromRow(this);
+    }
+
+    // This transform is used in cross-language case. The input Row should be encoded with an
+    // equivalent schema as KafkaSourceDescriptor.
+    private static class ReadAllFromRow<K, V>
+        extends PTransform<PCollection<Row>, PCollection<KV<K, V>>> {
+
+      private final ReadSourceDescriptors<K, V> readViaSDF;
+
+      ReadAllFromRow(ReadSourceDescriptors read) {
+        readViaSDF = read;
+      }
+
+      @Override
+      public PCollection<KV<K, V>> expand(PCollection<Row> input) {
+        return input
+            .apply(Convert.fromRows(KafkaSourceDescriptor.class))
+            .apply(readViaSDF)
+            .apply(
+                ParDo.of(
+                    new DoFn<KafkaRecord<K, V>, KV<K, V>>() {
+                      @ProcessElement
+                      public void processElement(
+                          @Element KafkaRecord element, OutputReceiver<KV<K, V>> outputReceiver) {
+                        outputReceiver.output(element.getKV());
+                      }
+                    }))
+            .setCoder(KvCoder.<K, V>of(readViaSDF.getKeyCoder(), readViaSDF.getValueCoder()));
+      }
+    }
+
+    ReadSourceDescriptors<K, V> withTimestampPolicyFactory(
+        TimestampPolicyFactory<K, V> timestampPolicyFactory) {
+      return toBuilder().setTimestampPolicyFactory(timestampPolicyFactory).build();
+    }
 
-    for (String key : updates.keySet()) {
+    @Override
+    public PCollection<KafkaRecord<K, V>> expand(PCollection<KafkaSourceDescriptor> input) {
       checkArgument(
-          !ignoredProperties.containsKey(key),
-          "No need to configure '%s'. %s",
-          key,
-          ignoredProperties.get(key));
+          ExperimentalOptions.hasExperiment(input.getPipeline().getOptions(), "beam_fn_api"),
+          "The ReadSourceDescriptors can only used when beam_fn_api is enabled.");
+
+      checkArgument(getKeyDeserializerProvider() != null, "withKeyDeserializer() is required");
+      checkArgument(getValueDeserializerProvider() != null, "withValueDeserializer() is required");
+
+      ConsumerSpEL consumerSpEL = new ConsumerSpEL();
+      if (!consumerSpEL.hasOffsetsForTimes()) {
+        LOG.warn(
+            "Kafka client version {} is too old. Versions before 0.10.1.0 are deprecated and "
+                + "may not be supported in next release of Apache Beam. "
+                + "Please upgrade your Kafka client version.",
+            AppInfoParser.getVersion());
+      }
+
+      if (isCommitOffsetEnabled()) {
+        if (configuredKafkaCommit()) {
+          LOG.info(
+              "Either read_committed or auto_commit is set together with commitOffsetEnabled but you "
+                  + "only need one of them. The commitOffsetEnabled is going to be ignored");
+        }
+      }
+
+      if (getConsumerConfig().get(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG) == null) {
+        LOG.warn(
+            "The bootstrapServers is not set. Then it must be populated through KafkaSourceDescriptor during runtime. Otherwise, the pipeline will fail.");

Review comment:
       ```suggestion
               "The bootstrapServers is not set. It must be populated through the KafkaSourceDescriptor during runtime otherwise the pipeline will fail.");
   ```

##########
File path: sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java
##########
@@ -0,0 +1,403 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.kafka;
+
+import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.TimeUnit;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.io.kafka.KafkaIO.ReadSourceDescriptors;
+import org.apache.beam.sdk.io.kafka.KafkaIOUtils.MovingAvg;
+import org.apache.beam.sdk.io.kafka.KafkaUnboundedReader.TimestampPolicyContext;
+import org.apache.beam.sdk.io.range.OffsetRange;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.DoFn.UnboundedPerElement;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.transforms.splittabledofn.GrowableOffsetRangeTracker;
+import org.apache.beam.sdk.transforms.splittabledofn.ManualWatermarkEstimator;
+import org.apache.beam.sdk.transforms.splittabledofn.OffsetRangeTracker;
+import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
+import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker.HasProgress;
+import org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimator;
+import org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimators.MonotonicallyIncreasing;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Supplier;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Suppliers;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.CacheBuilder;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.CacheLoader;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LoadingCache;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.io.Closeables;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A SplittableDoFn which reads from {@link KafkaSourceDescriptor} and outputs {@link KafkaRecord}.
+ * By default, a {@link MonotonicallyIncreasing} watermark estimator is used to track watermark.
+ *
+ * <p>{@link ReadFromKafkaDoFn} implements the logic of reading from Kafka. The element is a {@link
+ * KafkaSourceDescriptor}, and the restriction is an {@link OffsetRange} which represents record
+ * offset. A {@link GrowableOffsetRangeTracker} is used to track an {@link OffsetRange} ended with
+ * {@code Long.MAX_VALUE}. For a finite range, a {@link OffsetRangeTracker} is created.
+ *
+ * <h4>Initial Restriction</h4>
+ *
+ * <p>The initial range for a {@link KafkaSourceDescriptor } is defined by {@code [startOffset,
+ * Long.MAX_VALUE)} where {@code startOffset} is defined as:
+ *
+ * <ul>
+ *   <li>the {@code startReadOffset} if {@link KafkaSourceDescriptor#getStartReadOffset} is set.
+ *   <li>the first offset with a greater or equivalent timestamp if {@link
+ *       KafkaSourceDescriptor#getStartReadTime()} is set.
+ *   <li>the {@code last committed offset + 1} for the {@link Consumer#position(TopicPartition)
+ *       topic partition}.
+ * </ul>
+ *
+ * <h4>Splitting</h4>
+ *
+ * <p>TODO(BEAM-10319): Add support for initial splitting.
+ *
+ * <h4>Checkpoint and Resume Processing</h4>
+ *
+ * <p>There are 2 types of checkpoint here: self-checkpoint which invokes by the DoFn and
+ * system-checkpoint which is issued by the runner via {@link
+ * org.apache.beam.model.fnexecution.v1.BeamFnApi.ProcessBundleSplitRequest}. Every time the
+ * consumer gets empty response from {@link Consumer#poll(long)}, {@link ReadFromKafkaDoFn} will
+ * checkpoint at current {@link KafkaSourceDescriptor } and move to process the next element. These
+ * deferred elements will be resumed by the runner as soon as possible.
+ *
+ * <h4>Progress and Size</h4>
+ *
+ * <p>The progress is provided by {@link GrowableOffsetRangeTracker} or per {@link
+ * KafkaSourceDescriptor }. For an infinite {@link OffsetRange}, a Kafka {@link Consumer} is used in
+ * the {@link GrowableOffsetRangeTracker} as the {@link
+ * GrowableOffsetRangeTracker.RangeEndEstimator} to poll the latest offset. Please refer to {@link
+ * ReadFromKafkaDoFn#restrictionTracker(KafkaSourceDescriptor, OffsetRange)} for details.
+ *
+ * <p>The size is computed by {@link ReadFromKafkaDoFn#getSize(KafkaSourceDescriptor, OffsetRange).}
+ * A {@link KafkaIOUtils.MovingAvg} is used to track the average size of kafka records.
+ *
+ * <h4>Track Watermark</h4>
+ *
+ * <p>The {@link WatermarkEstimator} is created by {@link
+ * ReadSourceDescriptors#getCreateWatermarkEstimatorFn()}. The estimated watermark is computed by
+ * this {@link WatermarkEstimator} based on output timestamps computed by {@link
+ * ReadSourceDescriptors#getExtractOutputTimestampFn()} (SerializableFunction)}. The default
+ * configuration is using {@link ReadSourceDescriptors#withProcessingTime()} as {@code

Review comment:
       ```suggestion
    * configuration is using {@link ReadSourceDescriptors#withProcessingTime()} as the {@code
   ```

##########
File path: sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java
##########
@@ -0,0 +1,403 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.kafka;
+
+import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.TimeUnit;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.io.kafka.KafkaIO.ReadSourceDescriptors;
+import org.apache.beam.sdk.io.kafka.KafkaIOUtils.MovingAvg;
+import org.apache.beam.sdk.io.kafka.KafkaUnboundedReader.TimestampPolicyContext;
+import org.apache.beam.sdk.io.range.OffsetRange;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.DoFn.UnboundedPerElement;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.transforms.splittabledofn.GrowableOffsetRangeTracker;
+import org.apache.beam.sdk.transforms.splittabledofn.ManualWatermarkEstimator;
+import org.apache.beam.sdk.transforms.splittabledofn.OffsetRangeTracker;
+import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
+import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker.HasProgress;
+import org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimator;
+import org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimators.MonotonicallyIncreasing;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Supplier;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Suppliers;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.CacheBuilder;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.CacheLoader;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LoadingCache;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.io.Closeables;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A SplittableDoFn which reads from {@link KafkaSourceDescriptor} and outputs {@link KafkaRecord}.
+ * By default, a {@link MonotonicallyIncreasing} watermark estimator is used to track watermark.
+ *
+ * <p>{@link ReadFromKafkaDoFn} implements the logic of reading from Kafka. The element is a {@link
+ * KafkaSourceDescriptor}, and the restriction is an {@link OffsetRange} which represents record
+ * offset. A {@link GrowableOffsetRangeTracker} is used to track an {@link OffsetRange} ended with
+ * {@code Long.MAX_VALUE}. For a finite range, a {@link OffsetRangeTracker} is created.
+ *
+ * <h4>Initial Restriction</h4>
+ *
+ * <p>The initial range for a {@link KafkaSourceDescriptor } is defined by {@code [startOffset,
+ * Long.MAX_VALUE)} where {@code startOffset} is defined as:
+ *
+ * <ul>
+ *   <li>the {@code startReadOffset} if {@link KafkaSourceDescriptor#getStartReadOffset} is set.
+ *   <li>the first offset with a greater or equivalent timestamp if {@link
+ *       KafkaSourceDescriptor#getStartReadTime()} is set.
+ *   <li>the {@code last committed offset + 1} for the {@link Consumer#position(TopicPartition)
+ *       topic partition}.
+ * </ul>
+ *
+ * <h4>Splitting</h4>
+ *
+ * <p>TODO(BEAM-10319): Add support for initial splitting.
+ *
+ * <h4>Checkpoint and Resume Processing</h4>
+ *
+ * <p>There are 2 types of checkpoint here: self-checkpoint which invokes by the DoFn and
+ * system-checkpoint which is issued by the runner via {@link
+ * org.apache.beam.model.fnexecution.v1.BeamFnApi.ProcessBundleSplitRequest}. Every time the
+ * consumer gets empty response from {@link Consumer#poll(long)}, {@link ReadFromKafkaDoFn} will
+ * checkpoint at current {@link KafkaSourceDescriptor } and move to process the next element. These
+ * deferred elements will be resumed by the runner as soon as possible.
+ *
+ * <h4>Progress and Size</h4>
+ *
+ * <p>The progress is provided by {@link GrowableOffsetRangeTracker} or per {@link
+ * KafkaSourceDescriptor }. For an infinite {@link OffsetRange}, a Kafka {@link Consumer} is used in
+ * the {@link GrowableOffsetRangeTracker} as the {@link
+ * GrowableOffsetRangeTracker.RangeEndEstimator} to poll the latest offset. Please refer to {@link
+ * ReadFromKafkaDoFn#restrictionTracker(KafkaSourceDescriptor, OffsetRange)} for details.
+ *
+ * <p>The size is computed by {@link ReadFromKafkaDoFn#getSize(KafkaSourceDescriptor, OffsetRange).}
+ * A {@link KafkaIOUtils.MovingAvg} is used to track the average size of kafka records.
+ *
+ * <h4>Track Watermark</h4>
+ *
+ * <p>The {@link WatermarkEstimator} is created by {@link
+ * ReadSourceDescriptors#getCreateWatermarkEstimatorFn()}. The estimated watermark is computed by
+ * this {@link WatermarkEstimator} based on output timestamps computed by {@link
+ * ReadSourceDescriptors#getExtractOutputTimestampFn()} (SerializableFunction)}. The default
+ * configuration is using {@link ReadSourceDescriptors#withProcessingTime()} as {@code
+ * extractTimestampFn} and {@link
+ * ReadSourceDescriptors#withMonotonicallyIncreasingWatermarkEstimator()} as {@link
+ * WatermarkEstimator}.
+ */
+@UnboundedPerElement
+class ReadFromKafkaDoFn<K, V> extends DoFn<KafkaSourceDescriptor, KafkaRecord<K, V>> {
+
+  ReadFromKafkaDoFn(ReadSourceDescriptors transform) {
+    this.consumerConfig = transform.getConsumerConfig();
+    this.offsetConsumerConfig = transform.getOffsetConsumerConfig();
+    this.keyDeserializerProvider = transform.getKeyDeserializerProvider();
+    this.valueDeserializerProvider = transform.getValueDeserializerProvider();
+    this.consumerFactoryFn = transform.getConsumerFactoryFn();
+    this.extractOutputTimestampFn = transform.getExtractOutputTimestampFn();
+    this.createWatermarkEstimatorFn = transform.getCreateWatermarkEstimatorFn();
+    this.timestampPolicyFactory = transform.getTimestampPolicyFactory();
+  }
+
+  private static final Logger LOG = LoggerFactory.getLogger(ReadFromKafkaDoFn.class);
+
+  private final Map<String, Object> offsetConsumerConfig;
+
+  private final SerializableFunction<Map<String, Object>, Consumer<byte[], byte[]>>
+      consumerFactoryFn;
+  private final SerializableFunction<KafkaRecord<K, V>, Instant> extractOutputTimestampFn;
+  private final SerializableFunction<Instant, WatermarkEstimator<Instant>>
+      createWatermarkEstimatorFn;
+  private final TimestampPolicyFactory<K, V> timestampPolicyFactory;
+
+  // Valid between bundle start and bundle finish.
+  private transient ConsumerSpEL consumerSpEL = null;
+  private transient Deserializer<K> keyDeserializerInstance = null;
+  private transient Deserializer<V> valueDeserializerInstance = null;
+
+  private transient LoadingCache<TopicPartition, AverageRecordSize> avgRecordSize;
+
+  private static final Duration KAFKA_POLL_TIMEOUT = Duration.millis(1000);
+
+  @VisibleForTesting final DeserializerProvider keyDeserializerProvider;
+  @VisibleForTesting final DeserializerProvider valueDeserializerProvider;
+  @VisibleForTesting final Map<String, Object> consumerConfig;
+
+  /**
+   * A {@link GrowableOffsetRangeTracker.RangeEndEstimator} which uses a Kafka {@link Consumer} to
+   * fetch backlog.
+   */
+  private static class KafkaLatestOffsetEstimator
+      implements GrowableOffsetRangeTracker.RangeEndEstimator {
+
+    private final Consumer<byte[], byte[]> offsetConsumer;
+    private final TopicPartition topicPartition;
+    private final ConsumerSpEL consumerSpEL;
+    private final Supplier<Long> memoizedBacklog;
+
+    KafkaLatestOffsetEstimator(
+        Consumer<byte[], byte[]> offsetConsumer, TopicPartition topicPartition) {
+      this.offsetConsumer = offsetConsumer;
+      this.topicPartition = topicPartition;
+      this.consumerSpEL = new ConsumerSpEL();
+      this.consumerSpEL.evaluateAssign(this.offsetConsumer, ImmutableList.of(this.topicPartition));
+      memoizedBacklog =
+          Suppliers.memoizeWithExpiration(
+              () -> {
+                consumerSpEL.evaluateSeek2End(offsetConsumer, topicPartition);
+                return offsetConsumer.position(topicPartition);
+              },
+              5,
+              TimeUnit.SECONDS);
+    }
+
+    @Override
+    protected void finalize() {
+      try {
+        Closeables.close(offsetConsumer, true);
+      } catch (Exception anyException) {
+        LOG.warn("Failed to close offset consumer for {}", topicPartition);
+      }
+    }
+
+    @Override
+    public long estimate() {
+      return memoizedBacklog.get();
+    }
+  }
+
+  @GetInitialRestriction
+  public OffsetRange initialRestriction(@Element KafkaSourceDescriptor kafkaSourceDescriptor) {
+    Map<String, Object> updatedConsumerConfig =
+        overrideBootstrapServersConfig(consumerConfig, kafkaSourceDescriptor);
+    try (Consumer<byte[], byte[]> offsetConsumer =
+        consumerFactoryFn.apply(
+            KafkaIOUtils.getOffsetConsumerConfig(
+                "initialOffset", offsetConsumerConfig, updatedConsumerConfig))) {
+      consumerSpEL.evaluateAssign(
+          offsetConsumer, ImmutableList.of(kafkaSourceDescriptor.getTopicPartition()));
+      long startOffset;
+      if (kafkaSourceDescriptor.getStartReadOffset() != null) {
+        startOffset = kafkaSourceDescriptor.getStartReadOffset();
+      } else if (kafkaSourceDescriptor.getStartReadTime() != null) {
+        startOffset =
+            consumerSpEL.offsetForTime(
+                offsetConsumer,
+                kafkaSourceDescriptor.getTopicPartition(),
+                kafkaSourceDescriptor.getStartReadTime());
+      } else {
+        startOffset = offsetConsumer.position(kafkaSourceDescriptor.getTopicPartition());
+      }
+      return new OffsetRange(startOffset, Long.MAX_VALUE);
+    }
+  }
+
+  @GetInitialWatermarkEstimatorState
+  public Instant getInitialWatermarkEstimatorState(@Timestamp Instant currentElementTimestamp) {
+    return currentElementTimestamp;
+  }
+
+  @NewWatermarkEstimator
+  public WatermarkEstimator<Instant> newWatermarkEstimator(
+      @WatermarkEstimatorState Instant watermarkEstimatorState) {
+    return createWatermarkEstimatorFn.apply(watermarkEstimatorState);
+  }
+
+  @GetSize
+  public double getSize(
+      @Element KafkaSourceDescriptor kafkaSourceDescriptor, @Restriction OffsetRange offsetRange)
+      throws Exception {
+    double numRecords =
+        restrictionTracker(kafkaSourceDescriptor, offsetRange).getProgress().getWorkRemaining();
+    // Before processing elements, we don't have a good estimated size of records and offset gap.
+    if (!avgRecordSize.asMap().containsKey(kafkaSourceDescriptor.getTopicPartition())) {
+      return numRecords;
+    }
+    return avgRecordSize.get(kafkaSourceDescriptor.getTopicPartition()).getTotalSize(numRecords);
+  }
+
+  @NewTracker
+  public GrowableOffsetRangeTracker restrictionTracker(
+      @Element KafkaSourceDescriptor kafkaSourceDescriptor, @Restriction OffsetRange restriction) {
+    Map<String, Object> updatedConsumerConfig =
+        overrideBootstrapServersConfig(consumerConfig, kafkaSourceDescriptor);
+    KafkaLatestOffsetEstimator offsetPoller =
+        new KafkaLatestOffsetEstimator(
+            consumerFactoryFn.apply(
+                KafkaIOUtils.getOffsetConsumerConfig(
+                    "tracker-" + kafkaSourceDescriptor.getTopicPartition(),
+                    offsetConsumerConfig,
+                    updatedConsumerConfig)),
+            kafkaSourceDescriptor.getTopicPartition());
+    return new GrowableOffsetRangeTracker(restriction.getFrom(), offsetPoller);
+  }
+
+  @ProcessElement
+  public ProcessContinuation processElement(
+      @Element KafkaSourceDescriptor kafkaSourceDescriptor,
+      RestrictionTracker<OffsetRange, Long> tracker,
+      WatermarkEstimator watermarkEstimator,
+      OutputReceiver<KafkaRecord<K, V>> receiver) {
+    // If there is no future work, resume with max timeout and move to the next element.
+    Map<String, Object> updatedConsumerConfig =
+        overrideBootstrapServersConfig(consumerConfig, kafkaSourceDescriptor);
+    // If there is a timestampPolicyFactory, create the TimestampPolicy for current
+    // TopicPartition.
+    TimestampPolicy timestampPolicy = null;
+    if (timestampPolicyFactory != null) {
+      timestampPolicy =
+          timestampPolicyFactory.createTimestampPolicy(
+              kafkaSourceDescriptor.getTopicPartition(),
+              Optional.ofNullable(watermarkEstimator.currentWatermark()));
+    }
+    try (Consumer<byte[], byte[]> consumer = consumerFactoryFn.apply(updatedConsumerConfig)) {
+      consumerSpEL.evaluateAssign(
+          consumer, ImmutableList.of(kafkaSourceDescriptor.getTopicPartition()));
+      long startOffset = tracker.currentRestriction().getFrom();
+      long expectedOffset = startOffset;
+      consumer.seek(kafkaSourceDescriptor.getTopicPartition(), startOffset);
+      ConsumerRecords<byte[], byte[]> rawRecords = ConsumerRecords.empty();
+
+      while (true) {
+        rawRecords = consumer.poll(KAFKA_POLL_TIMEOUT.getMillis());
+        // When there are no records available for the current TopicPartition, self-checkpoint
+        // and move to process the next element.
+        if (rawRecords.isEmpty()) {
+          return ProcessContinuation.resume();
+        }
+        for (ConsumerRecord<byte[], byte[]> rawRecord : rawRecords) {
+          if (!tracker.tryClaim(rawRecord.offset())) {
+            return ProcessContinuation.stop();
+          }
+          KafkaRecord<K, V> kafkaRecord =
+              new KafkaRecord<>(
+                  rawRecord.topic(),
+                  rawRecord.partition(),
+                  rawRecord.offset(),
+                  consumerSpEL.getRecordTimestamp(rawRecord),
+                  consumerSpEL.getRecordTimestampType(rawRecord),
+                  ConsumerSpEL.hasHeaders() ? rawRecord.headers() : null,
+                  keyDeserializerInstance.deserialize(rawRecord.topic(), rawRecord.key()),
+                  valueDeserializerInstance.deserialize(rawRecord.topic(), rawRecord.value()));
+          int recordSize =
+              (rawRecord.key() == null ? 0 : rawRecord.key().length)
+                  + (rawRecord.value() == null ? 0 : rawRecord.value().length);
+          avgRecordSize
+              .getUnchecked(kafkaSourceDescriptor.getTopicPartition())
+              .update(recordSize, rawRecord.offset() - expectedOffset);
+          expectedOffset = rawRecord.offset() + 1;
+          Instant outputTimestamp;
+          // The outputTimestamp and watermark will be computed by timestampPolicy, where the
+          // WatermarkEstimator should be a Manual one.

Review comment:
       ```suggestion
             // WatermarkEstimator should be a manual one.
   ```

##########
File path: sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java
##########
@@ -0,0 +1,403 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.kafka;
+
+import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.TimeUnit;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.io.kafka.KafkaIO.ReadSourceDescriptors;
+import org.apache.beam.sdk.io.kafka.KafkaIOUtils.MovingAvg;
+import org.apache.beam.sdk.io.kafka.KafkaUnboundedReader.TimestampPolicyContext;
+import org.apache.beam.sdk.io.range.OffsetRange;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.DoFn.UnboundedPerElement;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.transforms.splittabledofn.GrowableOffsetRangeTracker;
+import org.apache.beam.sdk.transforms.splittabledofn.ManualWatermarkEstimator;
+import org.apache.beam.sdk.transforms.splittabledofn.OffsetRangeTracker;
+import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
+import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker.HasProgress;
+import org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimator;
+import org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimators.MonotonicallyIncreasing;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Supplier;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Suppliers;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.CacheBuilder;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.CacheLoader;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LoadingCache;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.io.Closeables;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A SplittableDoFn which reads from {@link KafkaSourceDescriptor} and outputs {@link KafkaRecord}.
+ * By default, a {@link MonotonicallyIncreasing} watermark estimator is used to track watermark.
+ *
+ * <p>{@link ReadFromKafkaDoFn} implements the logic of reading from Kafka. The element is a {@link
+ * KafkaSourceDescriptor}, and the restriction is an {@link OffsetRange} which represents record
+ * offset. A {@link GrowableOffsetRangeTracker} is used to track an {@link OffsetRange} ended with
+ * {@code Long.MAX_VALUE}. For a finite range, a {@link OffsetRangeTracker} is created.
+ *
+ * <h4>Initial Restriction</h4>
+ *
+ * <p>The initial range for a {@link KafkaSourceDescriptor } is defined by {@code [startOffset,
+ * Long.MAX_VALUE)} where {@code startOffset} is defined as:
+ *
+ * <ul>
+ *   <li>the {@code startReadOffset} if {@link KafkaSourceDescriptor#getStartReadOffset} is set.
+ *   <li>the first offset with a greater or equivalent timestamp if {@link
+ *       KafkaSourceDescriptor#getStartReadTime()} is set.
+ *   <li>the {@code last committed offset + 1} for the {@link Consumer#position(TopicPartition)
+ *       topic partition}.
+ * </ul>
+ *
+ * <h4>Splitting</h4>
+ *
+ * <p>TODO(BEAM-10319): Add support for initial splitting.
+ *
+ * <h4>Checkpoint and Resume Processing</h4>
+ *
+ * <p>There are 2 types of checkpoint here: self-checkpoint which invokes by the DoFn and
+ * system-checkpoint which is issued by the runner via {@link
+ * org.apache.beam.model.fnexecution.v1.BeamFnApi.ProcessBundleSplitRequest}. Every time the
+ * consumer gets empty response from {@link Consumer#poll(long)}, {@link ReadFromKafkaDoFn} will
+ * checkpoint at current {@link KafkaSourceDescriptor } and move to process the next element. These
+ * deferred elements will be resumed by the runner as soon as possible.
+ *
+ * <h4>Progress and Size</h4>
+ *
+ * <p>The progress is provided by {@link GrowableOffsetRangeTracker} or per {@link
+ * KafkaSourceDescriptor }. For an infinite {@link OffsetRange}, a Kafka {@link Consumer} is used in
+ * the {@link GrowableOffsetRangeTracker} as the {@link
+ * GrowableOffsetRangeTracker.RangeEndEstimator} to poll the latest offset. Please refer to {@link
+ * ReadFromKafkaDoFn#restrictionTracker(KafkaSourceDescriptor, OffsetRange)} for details.
+ *
+ * <p>The size is computed by {@link ReadFromKafkaDoFn#getSize(KafkaSourceDescriptor, OffsetRange).}
+ * A {@link KafkaIOUtils.MovingAvg} is used to track the average size of kafka records.
+ *
+ * <h4>Track Watermark</h4>
+ *
+ * <p>The {@link WatermarkEstimator} is created by {@link
+ * ReadSourceDescriptors#getCreateWatermarkEstimatorFn()}. The estimated watermark is computed by
+ * this {@link WatermarkEstimator} based on output timestamps computed by {@link
+ * ReadSourceDescriptors#getExtractOutputTimestampFn()} (SerializableFunction)}. The default
+ * configuration is using {@link ReadSourceDescriptors#withProcessingTime()} as {@code
+ * extractTimestampFn} and {@link
+ * ReadSourceDescriptors#withMonotonicallyIncreasingWatermarkEstimator()} as {@link
+ * WatermarkEstimator}.
+ */
+@UnboundedPerElement
+class ReadFromKafkaDoFn<K, V> extends DoFn<KafkaSourceDescriptor, KafkaRecord<K, V>> {
+
+  ReadFromKafkaDoFn(ReadSourceDescriptors transform) {
+    this.consumerConfig = transform.getConsumerConfig();
+    this.offsetConsumerConfig = transform.getOffsetConsumerConfig();
+    this.keyDeserializerProvider = transform.getKeyDeserializerProvider();
+    this.valueDeserializerProvider = transform.getValueDeserializerProvider();
+    this.consumerFactoryFn = transform.getConsumerFactoryFn();
+    this.extractOutputTimestampFn = transform.getExtractOutputTimestampFn();
+    this.createWatermarkEstimatorFn = transform.getCreateWatermarkEstimatorFn();
+    this.timestampPolicyFactory = transform.getTimestampPolicyFactory();
+  }
+
+  private static final Logger LOG = LoggerFactory.getLogger(ReadFromKafkaDoFn.class);
+
+  private final Map<String, Object> offsetConsumerConfig;
+
+  private final SerializableFunction<Map<String, Object>, Consumer<byte[], byte[]>>
+      consumerFactoryFn;
+  private final SerializableFunction<KafkaRecord<K, V>, Instant> extractOutputTimestampFn;
+  private final SerializableFunction<Instant, WatermarkEstimator<Instant>>
+      createWatermarkEstimatorFn;
+  private final TimestampPolicyFactory<K, V> timestampPolicyFactory;
+
+  // Valid between bundle start and bundle finish.
+  private transient ConsumerSpEL consumerSpEL = null;
+  private transient Deserializer<K> keyDeserializerInstance = null;
+  private transient Deserializer<V> valueDeserializerInstance = null;
+
+  private transient LoadingCache<TopicPartition, AverageRecordSize> avgRecordSize;
+
+  private static final Duration KAFKA_POLL_TIMEOUT = Duration.millis(1000);
+
+  @VisibleForTesting final DeserializerProvider keyDeserializerProvider;
+  @VisibleForTesting final DeserializerProvider valueDeserializerProvider;
+  @VisibleForTesting final Map<String, Object> consumerConfig;
+
+  /**
+   * A {@link GrowableOffsetRangeTracker.RangeEndEstimator} which uses a Kafka {@link Consumer} to
+   * fetch backlog.
+   */
+  private static class KafkaLatestOffsetEstimator
+      implements GrowableOffsetRangeTracker.RangeEndEstimator {
+
+    private final Consumer<byte[], byte[]> offsetConsumer;
+    private final TopicPartition topicPartition;
+    private final ConsumerSpEL consumerSpEL;
+    private final Supplier<Long> memoizedBacklog;
+
+    KafkaLatestOffsetEstimator(
+        Consumer<byte[], byte[]> offsetConsumer, TopicPartition topicPartition) {
+      this.offsetConsumer = offsetConsumer;
+      this.topicPartition = topicPartition;
+      this.consumerSpEL = new ConsumerSpEL();
+      this.consumerSpEL.evaluateAssign(this.offsetConsumer, ImmutableList.of(this.topicPartition));
+      memoizedBacklog =
+          Suppliers.memoizeWithExpiration(
+              () -> {
+                consumerSpEL.evaluateSeek2End(offsetConsumer, topicPartition);
+                return offsetConsumer.position(topicPartition);
+              },
+              5,
+              TimeUnit.SECONDS);
+    }
+
+    @Override
+    protected void finalize() {
+      try {
+        Closeables.close(offsetConsumer, true);
+      } catch (Exception anyException) {
+        LOG.warn("Failed to close offset consumer for {}", topicPartition);
+      }
+    }
+
+    @Override
+    public long estimate() {
+      return memoizedBacklog.get();
+    }
+  }
+
+  @GetInitialRestriction
+  public OffsetRange initialRestriction(@Element KafkaSourceDescriptor kafkaSourceDescriptor) {
+    Map<String, Object> updatedConsumerConfig =
+        overrideBootstrapServersConfig(consumerConfig, kafkaSourceDescriptor);
+    try (Consumer<byte[], byte[]> offsetConsumer =
+        consumerFactoryFn.apply(
+            KafkaIOUtils.getOffsetConsumerConfig(
+                "initialOffset", offsetConsumerConfig, updatedConsumerConfig))) {
+      consumerSpEL.evaluateAssign(
+          offsetConsumer, ImmutableList.of(kafkaSourceDescriptor.getTopicPartition()));
+      long startOffset;
+      if (kafkaSourceDescriptor.getStartReadOffset() != null) {
+        startOffset = kafkaSourceDescriptor.getStartReadOffset();
+      } else if (kafkaSourceDescriptor.getStartReadTime() != null) {
+        startOffset =
+            consumerSpEL.offsetForTime(
+                offsetConsumer,
+                kafkaSourceDescriptor.getTopicPartition(),
+                kafkaSourceDescriptor.getStartReadTime());
+      } else {
+        startOffset = offsetConsumer.position(kafkaSourceDescriptor.getTopicPartition());
+      }
+      return new OffsetRange(startOffset, Long.MAX_VALUE);
+    }
+  }
+
+  @GetInitialWatermarkEstimatorState
+  public Instant getInitialWatermarkEstimatorState(@Timestamp Instant currentElementTimestamp) {
+    return currentElementTimestamp;
+  }
+
+  @NewWatermarkEstimator
+  public WatermarkEstimator<Instant> newWatermarkEstimator(
+      @WatermarkEstimatorState Instant watermarkEstimatorState) {
+    return createWatermarkEstimatorFn.apply(watermarkEstimatorState);
+  }
+
+  @GetSize
+  public double getSize(
+      @Element KafkaSourceDescriptor kafkaSourceDescriptor, @Restriction OffsetRange offsetRange)
+      throws Exception {
+    double numRecords =
+        restrictionTracker(kafkaSourceDescriptor, offsetRange).getProgress().getWorkRemaining();
+    // Before processing elements, we don't have a good estimated size of records and offset gap.
+    if (!avgRecordSize.asMap().containsKey(kafkaSourceDescriptor.getTopicPartition())) {
+      return numRecords;
+    }
+    return avgRecordSize.get(kafkaSourceDescriptor.getTopicPartition()).getTotalSize(numRecords);
+  }
+
+  @NewTracker
+  public GrowableOffsetRangeTracker restrictionTracker(
+      @Element KafkaSourceDescriptor kafkaSourceDescriptor, @Restriction OffsetRange restriction) {
+    Map<String, Object> updatedConsumerConfig =
+        overrideBootstrapServersConfig(consumerConfig, kafkaSourceDescriptor);
+    KafkaLatestOffsetEstimator offsetPoller =
+        new KafkaLatestOffsetEstimator(
+            consumerFactoryFn.apply(
+                KafkaIOUtils.getOffsetConsumerConfig(
+                    "tracker-" + kafkaSourceDescriptor.getTopicPartition(),
+                    offsetConsumerConfig,
+                    updatedConsumerConfig)),
+            kafkaSourceDescriptor.getTopicPartition());
+    return new GrowableOffsetRangeTracker(restriction.getFrom(), offsetPoller);
+  }
+
+  @ProcessElement
+  public ProcessContinuation processElement(
+      @Element KafkaSourceDescriptor kafkaSourceDescriptor,
+      RestrictionTracker<OffsetRange, Long> tracker,
+      WatermarkEstimator watermarkEstimator,
+      OutputReceiver<KafkaRecord<K, V>> receiver) {
+    // If there is no future work, resume with max timeout and move to the next element.
+    Map<String, Object> updatedConsumerConfig =
+        overrideBootstrapServersConfig(consumerConfig, kafkaSourceDescriptor);
+    // If there is a timestampPolicyFactory, create the TimestampPolicy for current
+    // TopicPartition.
+    TimestampPolicy timestampPolicy = null;
+    if (timestampPolicyFactory != null) {
+      timestampPolicy =
+          timestampPolicyFactory.createTimestampPolicy(
+              kafkaSourceDescriptor.getTopicPartition(),
+              Optional.ofNullable(watermarkEstimator.currentWatermark()));
+    }
+    try (Consumer<byte[], byte[]> consumer = consumerFactoryFn.apply(updatedConsumerConfig)) {
+      consumerSpEL.evaluateAssign(
+          consumer, ImmutableList.of(kafkaSourceDescriptor.getTopicPartition()));
+      long startOffset = tracker.currentRestriction().getFrom();
+      long expectedOffset = startOffset;
+      consumer.seek(kafkaSourceDescriptor.getTopicPartition(), startOffset);
+      ConsumerRecords<byte[], byte[]> rawRecords = ConsumerRecords.empty();
+
+      while (true) {
+        rawRecords = consumer.poll(KAFKA_POLL_TIMEOUT.getMillis());
+        // When there are no records available for the current TopicPartition, self-checkpoint
+        // and move to process the next element.
+        if (rawRecords.isEmpty()) {
+          return ProcessContinuation.resume();
+        }
+        for (ConsumerRecord<byte[], byte[]> rawRecord : rawRecords) {
+          if (!tracker.tryClaim(rawRecord.offset())) {
+            return ProcessContinuation.stop();
+          }
+          KafkaRecord<K, V> kafkaRecord =
+              new KafkaRecord<>(
+                  rawRecord.topic(),
+                  rawRecord.partition(),
+                  rawRecord.offset(),
+                  consumerSpEL.getRecordTimestamp(rawRecord),
+                  consumerSpEL.getRecordTimestampType(rawRecord),
+                  ConsumerSpEL.hasHeaders() ? rawRecord.headers() : null,
+                  keyDeserializerInstance.deserialize(rawRecord.topic(), rawRecord.key()),
+                  valueDeserializerInstance.deserialize(rawRecord.topic(), rawRecord.value()));
+          int recordSize =
+              (rawRecord.key() == null ? 0 : rawRecord.key().length)
+                  + (rawRecord.value() == null ? 0 : rawRecord.value().length);
+          avgRecordSize
+              .getUnchecked(kafkaSourceDescriptor.getTopicPartition())
+              .update(recordSize, rawRecord.offset() - expectedOffset);
+          expectedOffset = rawRecord.offset() + 1;
+          Instant outputTimestamp;
+          // The outputTimestamp and watermark will be computed by timestampPolicy, where the
+          // WatermarkEstimator should be a Manual one.
+          if (timestampPolicy != null) {
+            checkState(watermarkEstimator instanceof ManualWatermarkEstimator);
+            TimestampPolicyContext context =
+                new TimestampPolicyContext(
+                    (long) ((HasProgress) tracker).getProgress().getWorkRemaining(), Instant.now());
+            outputTimestamp = timestampPolicy.getTimestampForRecord(context, kafkaRecord);
+            ((ManualWatermarkEstimator) watermarkEstimator)
+                .setWatermark(timestampPolicy.getWatermark(context));
+          } else {
+            outputTimestamp = extractOutputTimestampFn.apply(kafkaRecord);
+          }
+          receiver.outputWithTimestamp(kafkaRecord, outputTimestamp);
+        }
+      }
+    }
+  }
+
+  @GetRestrictionCoder
+  public Coder<OffsetRange> restrictionCoder() {
+    return new OffsetRange.Coder();
+  }
+
+  @Setup
+  public void setup() throws Exception {
+    // Start to track record size and offset gap per bundle.
+    avgRecordSize =
+        CacheBuilder.newBuilder()
+            .maximumSize(1000L)
+            .build(
+                new CacheLoader<TopicPartition, AverageRecordSize>() {
+                  @Override
+                  public AverageRecordSize load(TopicPartition topicPartition) throws Exception {
+                    return new AverageRecordSize();
+                  }
+                });
+    consumerSpEL = new ConsumerSpEL();
+    keyDeserializerInstance = keyDeserializerProvider.getDeserializer(consumerConfig, true);
+    valueDeserializerInstance = valueDeserializerProvider.getDeserializer(consumerConfig, false);
+  }
+
+  @Teardown
+  public void teardown() throws Exception {
+    try {
+      Closeables.close(keyDeserializerInstance, true);
+      Closeables.close(valueDeserializerInstance, true);
+    } catch (Exception anyException) {
+      LOG.warn("Fail to close resource during finishing bundle: {}", anyException.getMessage());

Review comment:
       slf4j has special logic to format exceptions.
   ```suggestion
         LOG.warn("Fail to close resource during finishing bundle.", anyException);
   ```

##########
File path: sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java
##########
@@ -0,0 +1,403 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.kafka;
+
+import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.TimeUnit;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.io.kafka.KafkaIO.ReadSourceDescriptors;
+import org.apache.beam.sdk.io.kafka.KafkaIOUtils.MovingAvg;
+import org.apache.beam.sdk.io.kafka.KafkaUnboundedReader.TimestampPolicyContext;
+import org.apache.beam.sdk.io.range.OffsetRange;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.DoFn.UnboundedPerElement;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.transforms.splittabledofn.GrowableOffsetRangeTracker;
+import org.apache.beam.sdk.transforms.splittabledofn.ManualWatermarkEstimator;
+import org.apache.beam.sdk.transforms.splittabledofn.OffsetRangeTracker;
+import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
+import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker.HasProgress;
+import org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimator;
+import org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimators.MonotonicallyIncreasing;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Supplier;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Suppliers;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.CacheBuilder;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.CacheLoader;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LoadingCache;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.io.Closeables;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A SplittableDoFn which reads from {@link KafkaSourceDescriptor} and outputs {@link KafkaRecord}.
+ * By default, a {@link MonotonicallyIncreasing} watermark estimator is used to track watermark.
+ *
+ * <p>{@link ReadFromKafkaDoFn} implements the logic of reading from Kafka. The element is a {@link
+ * KafkaSourceDescriptor}, and the restriction is an {@link OffsetRange} which represents record
+ * offset. A {@link GrowableOffsetRangeTracker} is used to track an {@link OffsetRange} ended with
+ * {@code Long.MAX_VALUE}. For a finite range, a {@link OffsetRangeTracker} is created.
+ *
+ * <h4>Initial Restriction</h4>
+ *
+ * <p>The initial range for a {@link KafkaSourceDescriptor } is defined by {@code [startOffset,
+ * Long.MAX_VALUE)} where {@code startOffset} is defined as:
+ *
+ * <ul>
+ *   <li>the {@code startReadOffset} if {@link KafkaSourceDescriptor#getStartReadOffset} is set.
+ *   <li>the first offset with a greater or equivalent timestamp if {@link
+ *       KafkaSourceDescriptor#getStartReadTime()} is set.
+ *   <li>the {@code last committed offset + 1} for the {@link Consumer#position(TopicPartition)
+ *       topic partition}.
+ * </ul>
+ *
+ * <h4>Splitting</h4>
+ *
+ * <p>TODO(BEAM-10319): Add support for initial splitting.
+ *
+ * <h4>Checkpoint and Resume Processing</h4>
+ *
+ * <p>There are 2 types of checkpoint here: self-checkpoint which invokes by the DoFn and
+ * system-checkpoint which is issued by the runner via {@link
+ * org.apache.beam.model.fnexecution.v1.BeamFnApi.ProcessBundleSplitRequest}. Every time the
+ * consumer gets empty response from {@link Consumer#poll(long)}, {@link ReadFromKafkaDoFn} will
+ * checkpoint at current {@link KafkaSourceDescriptor } and move to process the next element. These
+ * deferred elements will be resumed by the runner as soon as possible.
+ *
+ * <h4>Progress and Size</h4>
+ *
+ * <p>The progress is provided by {@link GrowableOffsetRangeTracker} or per {@link
+ * KafkaSourceDescriptor }. For an infinite {@link OffsetRange}, a Kafka {@link Consumer} is used in
+ * the {@link GrowableOffsetRangeTracker} as the {@link
+ * GrowableOffsetRangeTracker.RangeEndEstimator} to poll the latest offset. Please refer to {@link
+ * ReadFromKafkaDoFn#restrictionTracker(KafkaSourceDescriptor, OffsetRange)} for details.
+ *
+ * <p>The size is computed by {@link ReadFromKafkaDoFn#getSize(KafkaSourceDescriptor, OffsetRange).}
+ * A {@link KafkaIOUtils.MovingAvg} is used to track the average size of kafka records.
+ *
+ * <h4>Track Watermark</h4>
+ *
+ * <p>The {@link WatermarkEstimator} is created by {@link
+ * ReadSourceDescriptors#getCreateWatermarkEstimatorFn()}. The estimated watermark is computed by
+ * this {@link WatermarkEstimator} based on output timestamps computed by {@link
+ * ReadSourceDescriptors#getExtractOutputTimestampFn()} (SerializableFunction)}. The default
+ * configuration is using {@link ReadSourceDescriptors#withProcessingTime()} as {@code
+ * extractTimestampFn} and {@link
+ * ReadSourceDescriptors#withMonotonicallyIncreasingWatermarkEstimator()} as {@link

Review comment:
       ```suggestion
    * ReadSourceDescriptors#withMonotonicallyIncreasingWatermarkEstimator()} as the {@link
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org