You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2020/08/13 22:17:06 UTC

[GitHub] [beam] boyuanzz opened a new pull request #12572: [WIP][BEAM-10123] Add commit transform.

boyuanzz opened a new pull request #12572:
URL: https://github.com/apache/beam/pull/12572


   **Please** add a meaningful description for your change here
   
   ------------------------
   
   Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
   
    - [ ] [**Choose reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and mention them in a comment (`R: @username`).
    - [ ] Format the pull request title like `[BEAM-XXX] Fixes bug in ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue.
    - [ ] Update `CHANGES.md` with noteworthy changes.
    - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).
   
   See the [Contributor Guide](https://beam.apache.org/contribute) for more tips on [how to make review process smoother](https://beam.apache.org/contribute/#make-reviewers-job-easier).
   
   Post-Commit Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   
   Lang | SDK | Dataflow | Flink | Samza | Spark | Twister2
   --- | --- | --- | --- | --- | --- | ---
   Go | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/) | ---
   Java | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/badge/i
 con)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/)<br>[![Build Status](htt
 ps://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Twister2/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Twister2/lastCompletedBuild/)
   Python | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python35/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python35/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python37/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python37/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python38/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python38/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_
 Py_VR_Dataflow/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow_V2/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow_V2/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Python2_PVR_Flink_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Python2_PVR_Flink_Cron/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python35_VR_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python35_VR_Flink/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_P
 ostCommit_Python_VR_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Spark/lastCompletedBuild/) | ---
   XLang | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Direct/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Direct/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Flink/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Spark/lastCompletedBuild/) | ---
   
   Pre-Commit Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   
   --- |Java | Python | Go | Website
   --- | --- | --- | --- | ---
   Non-portable | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Java_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Java_Cron/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Python_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Python_Cron/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_PythonLint_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_PythonLint_Cron/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_PythonDocker_Cron/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_PythonDocker_Cron/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Go_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Go_Cron/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/b
 eam_PreCommit_Website_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Website_Cron/lastCompletedBuild/)
   Portable | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Portable_Python_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Portable_Python_Cron/lastCompletedBuild/) | --- | ---
   
   See [.test-infra/jenkins/README](https://github.com/apache/beam/blob/master/.test-infra/jenkins/README.md) for trigger phrase, status and link of all Jenkins jobs.
   
   
   GitHub Actions Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   ![Build python source distribution and wheels](https://github.com/apache/beam/workflows/Build%20python%20source%20distribution%20and%20wheels/badge.svg)
   
   See [CI.md](https://github.com/apache/beam/blob/master/CI.md) for more information about GitHub Actions CI.
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] boyuanzz commented on pull request #12572: [BEAM-10123] Add Kafka Commit transform.

Posted by GitBox <gi...@apache.org>.
boyuanzz commented on pull request #12572:
URL: https://github.com/apache/beam/pull/12572#issuecomment-700980738


   > Thanks! I did a quick look on this - are you going to add some tests for this addition?
   
   Sorry for the late and thanks for your help! Yeah I also want to have some tests around this but it may not be possible to involve actual Kafka consumer to do so. The way I can come up is to use mock but I'm not sure whether it's feasible.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] boyuanzz commented on pull request #12572: [BEAM-10123] Add Kafka Commit transform.

Posted by GitBox <gi...@apache.org>.
boyuanzz commented on pull request #12572:
URL: https://github.com/apache/beam/pull/12572#issuecomment-718258631


   > > Yeah I also want to have some tests around this but it may not be possible to involve actual Kafka consumer to do so. The way I can come up is to use mock but I'm not sure whether it's feasible.
   > 
   > Can we add "SDF Read"-related tests to `KafkaIOIT` in this case?
   
   Sorry for the late. I was on something urgent. KafkaIOIt will use `SDF Read` automatically if the runner is using `beam_fn_api` and `use_sdf_kafka_read `. In the upcoming future, we will make `SDF Read` as default for Kafka.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] sameerbhadouria edited a comment on pull request #12572: [BEAM-10123] Add Kafka Commit transform.

Posted by GitBox <gi...@apache.org>.
sameerbhadouria edited a comment on pull request #12572:
URL: https://github.com/apache/beam/pull/12572#issuecomment-761157308


   @boyuanzz I am using the cross language support from beam however the [ReadFromKafka](https://github.com/apache/beam/blob/0f36a72f80ecaac05d3e42f3874afae762c039d6/sdks/python/apache_beam/io/kafka.py#L110) transform does not return the Kafka record metadata. 
   It also needs support for `commitOffsetInFinalize` and `TimestampPolicyFactory` 
   
   I want to disable the auto.commit on Kafka and manually commit the offset after rest of my stages in the pipeline are successful. I don't think I can do this right now. 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] aromanenko-dev edited a comment on pull request #12572: [BEAM-10123] Add Kafka Commit transform.

Posted by GitBox <gi...@apache.org>.
aromanenko-dev edited a comment on pull request #12572:
URL: https://github.com/apache/beam/pull/12572#issuecomment-721205595


   > How about, we check in this PR first if it looks to you, then I'll open another PR to setup the KafkaIO performance test properly?
   
   Sounds good to me, thanks


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] sameerbhadouria edited a comment on pull request #12572: [BEAM-10123] Add Kafka Commit transform.

Posted by GitBox <gi...@apache.org>.
sameerbhadouria edited a comment on pull request #12572:
URL: https://github.com/apache/beam/pull/12572#issuecomment-761157308


   @boyuanzz I am using the cross language support from beam however the [ReadFromKafka](https://github.com/apache/beam/blob/0f36a72f80ecaac05d3e42f3874afae762c039d6/sdks/python/apache_beam/io/kafka.py#L110) transform does not return the Kafka record metadata. 
   It also needs support for `commitOffsetInFinalize` and `TimestampPolicyFactory` 
   
   I want to disable the auto.commit on Kafka consumer config and manually commit the offset after rest of my stages in the pipeline are successful. I don't think I can do this right now. 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] aromanenko-dev commented on pull request #12572: [BEAM-10123] Add Kafka Commit transform.

Posted by GitBox <gi...@apache.org>.
aromanenko-dev commented on pull request #12572:
URL: https://github.com/apache/beam/pull/12572#issuecomment-701311000


   > Yeah I also want to have some tests around this but it may not be possible to involve actual Kafka consumer to do so. The way I can come up is to use mock but I'm not sure whether it's feasible.
   
   Can we add "SDF Read"-related tests to `KafkaIOIT` in this case?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] aromanenko-dev commented on pull request #12572: [BEAM-10123] Add Kafka Commit transform.

Posted by GitBox <gi...@apache.org>.
aromanenko-dev commented on pull request #12572:
URL: https://github.com/apache/beam/pull/12572#issuecomment-717262304


   @boyuanzz ping on this


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] boyuanzz commented on pull request #12572: [BEAM-10123] Add Kafka Commit transform.

Posted by GitBox <gi...@apache.org>.
boyuanzz commented on pull request #12572:
URL: https://github.com/apache/beam/pull/12572#issuecomment-720646877






----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] aromanenko-dev commented on a change in pull request #12572: [BEAM-10123] Add Kafka Commit transform.

Posted by GitBox <gi...@apache.org>.
aromanenko-dev commented on a change in pull request #12572:
URL: https://github.com/apache/beam/pull/12572#discussion_r496100066



##########
File path: sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
##########
@@ -1620,14 +1635,43 @@ public void processElement(
       CoderRegistry coderRegistry = input.getPipeline().getCoderRegistry();
       Coder<K> keyCoder = getKeyCoder(coderRegistry);
       Coder<V> valueCoder = getValueCoder(coderRegistry);
-      Coder<KafkaRecord<K, V>> outputCoder = KafkaRecordCoder.of(keyCoder, valueCoder);
-      PCollection<KafkaRecord<K, V>> output =
-          input.apply(ParDo.of(new ReadFromKafkaDoFn<K, V>(this))).setCoder(outputCoder);
-      // TODO(BEAM-10123): Add CommitOffsetTransform to expansion.
-      if (isCommitOffsetEnabled() && !configuredKafkaCommit()) {
-        throw new IllegalStateException("Offset committed is not supported yet");
+      Coder<KafkaRecord<K, V>> recordCoder = KafkaRecordCoder.of(keyCoder, valueCoder);
+
+      try {
+        PCollection<KV<KafkaSourceDescriptor, KafkaRecord<K, V>>> outputWithDescriptor =
+            input
+                .apply(ParDo.of(new ReadFromKafkaDoFn<K, V>(this)))
+                .setCoder(
+                    KvCoder.of(
+                        input
+                            .getPipeline()
+                            .getSchemaRegistry()
+                            .getSchemaCoder(KafkaSourceDescriptor.class),
+                        recordCoder));
+        if (isCommitOffsetEnabled() && !configuredKafkaCommit()) {
+          outputWithDescriptor =
+              outputWithDescriptor
+                  .apply(Reshuffle.viaRandomKey())

Review comment:
       Why do we need `Reshuffle`? 

##########
File path: sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
##########
@@ -1620,14 +1635,43 @@ public void processElement(
       CoderRegistry coderRegistry = input.getPipeline().getCoderRegistry();
       Coder<K> keyCoder = getKeyCoder(coderRegistry);
       Coder<V> valueCoder = getValueCoder(coderRegistry);
-      Coder<KafkaRecord<K, V>> outputCoder = KafkaRecordCoder.of(keyCoder, valueCoder);
-      PCollection<KafkaRecord<K, V>> output =
-          input.apply(ParDo.of(new ReadFromKafkaDoFn<K, V>(this))).setCoder(outputCoder);
-      // TODO(BEAM-10123): Add CommitOffsetTransform to expansion.
-      if (isCommitOffsetEnabled() && !configuredKafkaCommit()) {
-        throw new IllegalStateException("Offset committed is not supported yet");
+      Coder<KafkaRecord<K, V>> recordCoder = KafkaRecordCoder.of(keyCoder, valueCoder);
+
+      try {
+        PCollection<KV<KafkaSourceDescriptor, KafkaRecord<K, V>>> outputWithDescriptor =
+            input
+                .apply(ParDo.of(new ReadFromKafkaDoFn<K, V>(this)))
+                .setCoder(
+                    KvCoder.of(
+                        input
+                            .getPipeline()
+                            .getSchemaRegistry()
+                            .getSchemaCoder(KafkaSourceDescriptor.class),
+                        recordCoder));
+        if (isCommitOffsetEnabled() && !configuredKafkaCommit()) {
+          outputWithDescriptor =
+              outputWithDescriptor
+                  .apply(Reshuffle.viaRandomKey())

Review comment:
       Why do we need `Reshuffle` here? 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] aromanenko-dev commented on a change in pull request #12572: [BEAM-10123] Add Kafka Commit transform.

Posted by GitBox <gi...@apache.org>.
aromanenko-dev commented on a change in pull request #12572:
URL: https://github.com/apache/beam/pull/12572#discussion_r496887078



##########
File path: sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaCommitOffset.java
##########
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.kafka;
+
+import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.coders.VarLongCoder;
+import org.apache.beam.sdk.coders.VoidCoder;
+import org.apache.beam.sdk.schemas.NoSuchSchemaException;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.MapElements;
+import org.apache.beam.sdk.transforms.Max;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.transforms.windowing.FixedWindows;
+import org.apache.beam.sdk.transforms.windowing.Window;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.TypeDescriptor;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.joda.time.Duration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** A {@link PTransform} that commits offsets of {@link KafkaRecord}. */
+public class KafkaCommitOffset<K, V>
+    extends PTransform<
+        PCollection<KV<KafkaSourceDescriptor, KafkaRecord<K, V>>>, PCollection<Void>> {
+  private final KafkaIO.ReadSourceDescriptors<K, V> readSourceDescriptors;
+
+  KafkaCommitOffset(KafkaIO.ReadSourceDescriptors<K, V> readSourceDescriptors) {
+    this.readSourceDescriptors = readSourceDescriptors;
+  }
+
+  static class CommitOffsetDoFn extends DoFn<KV<KafkaSourceDescriptor, Long>, Void> {
+    private static final Logger LOG = LoggerFactory.getLogger(CommitOffsetDoFn.class);
+    private final Map<String, Object> offsetConsumerConfig;
+    private final Map<String, Object> consumerConfig;
+    private final SerializableFunction<Map<String, Object>, Consumer<byte[], byte[]>>
+        consumerFactoryFn;
+
+    private transient ConsumerSpEL consumerSpEL = null;
+
+    CommitOffsetDoFn(KafkaIO.ReadSourceDescriptors readSourceDescriptors) {
+      offsetConsumerConfig = readSourceDescriptors.getOffsetConsumerConfig();
+      consumerConfig = readSourceDescriptors.getConsumerConfig();
+      consumerFactoryFn = readSourceDescriptors.getConsumerFactoryFn();
+    }
+
+    @ProcessElement
+    public void processElement(@Element KV<KafkaSourceDescriptor, Long> element) {
+      Map<String, Object> updatedConsumerConfig =
+          overrideBootstrapServersConfig(consumerConfig, element.getKey());

Review comment:
       How many uniq keys per bundle are expected? Only one (because of `Max.longsPerKey()`) on previous step?

##########
File path: sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaCommitOffset.java
##########
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.kafka;
+
+import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.coders.VarLongCoder;
+import org.apache.beam.sdk.coders.VoidCoder;
+import org.apache.beam.sdk.schemas.NoSuchSchemaException;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.MapElements;
+import org.apache.beam.sdk.transforms.Max;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.transforms.windowing.FixedWindows;
+import org.apache.beam.sdk.transforms.windowing.Window;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.TypeDescriptor;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.joda.time.Duration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** A {@link PTransform} that commits offsets of {@link KafkaRecord}. */
+public class KafkaCommitOffset<K, V>
+    extends PTransform<
+        PCollection<KV<KafkaSourceDescriptor, KafkaRecord<K, V>>>, PCollection<Void>> {
+  private final KafkaIO.ReadSourceDescriptors<K, V> readSourceDescriptors;
+
+  KafkaCommitOffset(KafkaIO.ReadSourceDescriptors<K, V> readSourceDescriptors) {
+    this.readSourceDescriptors = readSourceDescriptors;
+  }
+
+  static class CommitOffsetDoFn extends DoFn<KV<KafkaSourceDescriptor, Long>, Void> {
+    private static final Logger LOG = LoggerFactory.getLogger(CommitOffsetDoFn.class);
+    private final Map<String, Object> offsetConsumerConfig;
+    private final Map<String, Object> consumerConfig;
+    private final SerializableFunction<Map<String, Object>, Consumer<byte[], byte[]>>
+        consumerFactoryFn;
+
+    private transient ConsumerSpEL consumerSpEL = null;
+
+    CommitOffsetDoFn(KafkaIO.ReadSourceDescriptors readSourceDescriptors) {
+      offsetConsumerConfig = readSourceDescriptors.getOffsetConsumerConfig();
+      consumerConfig = readSourceDescriptors.getConsumerConfig();
+      consumerFactoryFn = readSourceDescriptors.getConsumerFactoryFn();
+    }
+
+    @ProcessElement
+    public void processElement(@Element KV<KafkaSourceDescriptor, Long> element) {
+      Map<String, Object> updatedConsumerConfig =
+          overrideBootstrapServersConfig(consumerConfig, element.getKey());
+      try (Consumer<byte[], byte[]> offsetConsumer =
+          consumerFactoryFn.apply(
+              KafkaIOUtils.getOffsetConsumerConfig(
+                  "commitOffset", offsetConsumerConfig, updatedConsumerConfig))) {
+        try {
+          offsetConsumer.commitSync(
+              Collections.singletonMap(
+                  element.getKey().getTopicPartition(),
+                  new OffsetAndMetadata(element.getValue() + 1)));
+        } catch (Exception e) {
+          // TODO: consider retrying.
+          LOG.warn("Getting exception when committing offset: {}", e.getMessage());
+        }
+      }
+    }
+
+    private Map<String, Object> overrideBootstrapServersConfig(
+        Map<String, Object> currentConfig, KafkaSourceDescriptor description) {
+      checkState(
+          currentConfig.containsKey(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG)
+              || description.getBootStrapServers() != null);

Review comment:
       Is it a possible state that there are no bootstrap servers defined?

##########
File path: sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaCommitOffset.java
##########
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.kafka;
+
+import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.coders.VarLongCoder;
+import org.apache.beam.sdk.coders.VoidCoder;
+import org.apache.beam.sdk.schemas.NoSuchSchemaException;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.MapElements;
+import org.apache.beam.sdk.transforms.Max;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.transforms.windowing.FixedWindows;
+import org.apache.beam.sdk.transforms.windowing.Window;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.TypeDescriptor;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.joda.time.Duration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** A {@link PTransform} that commits offsets of {@link KafkaRecord}. */
+public class KafkaCommitOffset<K, V>
+    extends PTransform<
+        PCollection<KV<KafkaSourceDescriptor, KafkaRecord<K, V>>>, PCollection<Void>> {
+  private final KafkaIO.ReadSourceDescriptors<K, V> readSourceDescriptors;
+
+  KafkaCommitOffset(KafkaIO.ReadSourceDescriptors<K, V> readSourceDescriptors) {
+    this.readSourceDescriptors = readSourceDescriptors;
+  }
+
+  static class CommitOffsetDoFn extends DoFn<KV<KafkaSourceDescriptor, Long>, Void> {
+    private static final Logger LOG = LoggerFactory.getLogger(CommitOffsetDoFn.class);
+    private final Map<String, Object> offsetConsumerConfig;
+    private final Map<String, Object> consumerConfig;
+    private final SerializableFunction<Map<String, Object>, Consumer<byte[], byte[]>>
+        consumerFactoryFn;
+
+    private transient ConsumerSpEL consumerSpEL = null;
+
+    CommitOffsetDoFn(KafkaIO.ReadSourceDescriptors readSourceDescriptors) {
+      offsetConsumerConfig = readSourceDescriptors.getOffsetConsumerConfig();
+      consumerConfig = readSourceDescriptors.getConsumerConfig();
+      consumerFactoryFn = readSourceDescriptors.getConsumerFactoryFn();
+    }
+
+    @ProcessElement
+    public void processElement(@Element KV<KafkaSourceDescriptor, Long> element) {
+      Map<String, Object> updatedConsumerConfig =
+          overrideBootstrapServersConfig(consumerConfig, element.getKey());
+      try (Consumer<byte[], byte[]> offsetConsumer =
+          consumerFactoryFn.apply(
+              KafkaIOUtils.getOffsetConsumerConfig(
+                  "commitOffset", offsetConsumerConfig, updatedConsumerConfig))) {
+        try {
+          offsetConsumer.commitSync(
+              Collections.singletonMap(
+                  element.getKey().getTopicPartition(),
+                  new OffsetAndMetadata(element.getValue() + 1)));
+        } catch (Exception e) {
+          // TODO: consider retrying.
+          LOG.warn("Getting exception when committing offset: {}", e.getMessage());
+        }
+      }
+    }
+
+    private Map<String, Object> overrideBootstrapServersConfig(
+        Map<String, Object> currentConfig, KafkaSourceDescriptor description) {
+      checkState(
+          currentConfig.containsKey(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG)
+              || description.getBootStrapServers() != null);
+      Map<String, Object> config = new HashMap<>(currentConfig);
+      if (description.getBootStrapServers() != null
+          && description.getBootStrapServers().size() > 0) {
+        config.put(
+            ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
+            String.join(",", description.getBootStrapServers()));
+      }
+      return config;
+    }
+  }
+
+  @Override
+  public PCollection<Void> expand(PCollection<KV<KafkaSourceDescriptor, KafkaRecord<K, V>>> input) {
+    try {
+      return input
+          .apply(
+              MapElements.into(new TypeDescriptor<KV<KafkaSourceDescriptor, Long>>() {})
+                  .via(element -> KV.of(element.getKey(), element.getValue().getOffset())))
+          .setCoder(
+              KvCoder.of(
+                  input
+                      .getPipeline()
+                      .getSchemaRegistry()
+                      .getSchemaCoder(KafkaSourceDescriptor.class),
+                  VarLongCoder.of()))
+          .apply(Window.into(FixedWindows.of(Duration.standardMinutes(5))))

Review comment:
       Could you elaborate a bit why `Window` is hardcoded to 5 mins? 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] boyuanzz commented on a change in pull request #12572: [BEAM-10123] Add Kafka Commit transform.

Posted by GitBox <gi...@apache.org>.
boyuanzz commented on a change in pull request #12572:
URL: https://github.com/apache/beam/pull/12572#discussion_r497052782



##########
File path: sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaCommitOffset.java
##########
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.kafka;
+
+import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.coders.VarLongCoder;
+import org.apache.beam.sdk.coders.VoidCoder;
+import org.apache.beam.sdk.schemas.NoSuchSchemaException;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.MapElements;
+import org.apache.beam.sdk.transforms.Max;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.transforms.windowing.FixedWindows;
+import org.apache.beam.sdk.transforms.windowing.Window;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.TypeDescriptor;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.joda.time.Duration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** A {@link PTransform} that commits offsets of {@link KafkaRecord}. */
+public class KafkaCommitOffset<K, V>
+    extends PTransform<
+        PCollection<KV<KafkaSourceDescriptor, KafkaRecord<K, V>>>, PCollection<Void>> {
+  private final KafkaIO.ReadSourceDescriptors<K, V> readSourceDescriptors;
+
+  KafkaCommitOffset(KafkaIO.ReadSourceDescriptors<K, V> readSourceDescriptors) {
+    this.readSourceDescriptors = readSourceDescriptors;
+  }
+
+  static class CommitOffsetDoFn extends DoFn<KV<KafkaSourceDescriptor, Long>, Void> {
+    private static final Logger LOG = LoggerFactory.getLogger(CommitOffsetDoFn.class);
+    private final Map<String, Object> offsetConsumerConfig;
+    private final Map<String, Object> consumerConfig;
+    private final SerializableFunction<Map<String, Object>, Consumer<byte[], byte[]>>
+        consumerFactoryFn;
+
+    private transient ConsumerSpEL consumerSpEL = null;
+
+    CommitOffsetDoFn(KafkaIO.ReadSourceDescriptors readSourceDescriptors) {
+      offsetConsumerConfig = readSourceDescriptors.getOffsetConsumerConfig();
+      consumerConfig = readSourceDescriptors.getConsumerConfig();
+      consumerFactoryFn = readSourceDescriptors.getConsumerFactoryFn();
+    }
+
+    @ProcessElement
+    public void processElement(@Element KV<KafkaSourceDescriptor, Long> element) {
+      Map<String, Object> updatedConsumerConfig =
+          overrideBootstrapServersConfig(consumerConfig, element.getKey());
+      try (Consumer<byte[], byte[]> offsetConsumer =
+          consumerFactoryFn.apply(
+              KafkaIOUtils.getOffsetConsumerConfig(
+                  "commitOffset", offsetConsumerConfig, updatedConsumerConfig))) {
+        try {
+          offsetConsumer.commitSync(
+              Collections.singletonMap(
+                  element.getKey().getTopicPartition(),
+                  new OffsetAndMetadata(element.getValue() + 1)));
+        } catch (Exception e) {
+          // TODO: consider retrying.
+          LOG.warn("Getting exception when committing offset: {}", e.getMessage());
+        }
+      }
+    }
+
+    private Map<String, Object> overrideBootstrapServersConfig(
+        Map<String, Object> currentConfig, KafkaSourceDescriptor description) {
+      checkState(
+          currentConfig.containsKey(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG)
+              || description.getBootStrapServers() != null);

Review comment:
       Yes, that's possible since we no longer force the user to provide bootstrap servers when constructing transforms. The bootstrap servers can also come from source descriptor.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] aromanenko-dev commented on a change in pull request #12572: [BEAM-10123] Add Kafka Commit transform.

Posted by GitBox <gi...@apache.org>.
aromanenko-dev commented on a change in pull request #12572:
URL: https://github.com/apache/beam/pull/12572#discussion_r497414578



##########
File path: sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaCommitOffset.java
##########
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.kafka;
+
+import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.coders.VarLongCoder;
+import org.apache.beam.sdk.coders.VoidCoder;
+import org.apache.beam.sdk.schemas.NoSuchSchemaException;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.MapElements;
+import org.apache.beam.sdk.transforms.Max;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.transforms.windowing.FixedWindows;
+import org.apache.beam.sdk.transforms.windowing.Window;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.TypeDescriptor;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.joda.time.Duration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** A {@link PTransform} that commits offsets of {@link KafkaRecord}. */
+public class KafkaCommitOffset<K, V>
+    extends PTransform<
+        PCollection<KV<KafkaSourceDescriptor, KafkaRecord<K, V>>>, PCollection<Void>> {
+  private final KafkaIO.ReadSourceDescriptors<K, V> readSourceDescriptors;
+
+  KafkaCommitOffset(KafkaIO.ReadSourceDescriptors<K, V> readSourceDescriptors) {
+    this.readSourceDescriptors = readSourceDescriptors;
+  }
+
+  static class CommitOffsetDoFn extends DoFn<KV<KafkaSourceDescriptor, Long>, Void> {
+    private static final Logger LOG = LoggerFactory.getLogger(CommitOffsetDoFn.class);
+    private final Map<String, Object> offsetConsumerConfig;
+    private final Map<String, Object> consumerConfig;
+    private final SerializableFunction<Map<String, Object>, Consumer<byte[], byte[]>>
+        consumerFactoryFn;
+
+    private transient ConsumerSpEL consumerSpEL = null;
+
+    CommitOffsetDoFn(KafkaIO.ReadSourceDescriptors readSourceDescriptors) {
+      offsetConsumerConfig = readSourceDescriptors.getOffsetConsumerConfig();
+      consumerConfig = readSourceDescriptors.getConsumerConfig();
+      consumerFactoryFn = readSourceDescriptors.getConsumerFactoryFn();
+    }
+
+    @ProcessElement
+    public void processElement(@Element KV<KafkaSourceDescriptor, Long> element) {
+      Map<String, Object> updatedConsumerConfig =
+          overrideBootstrapServersConfig(consumerConfig, element.getKey());
+      try (Consumer<byte[], byte[]> offsetConsumer =
+          consumerFactoryFn.apply(
+              KafkaIOUtils.getOffsetConsumerConfig(
+                  "commitOffset", offsetConsumerConfig, updatedConsumerConfig))) {
+        try {
+          offsetConsumer.commitSync(
+              Collections.singletonMap(
+                  element.getKey().getTopicPartition(),
+                  new OffsetAndMetadata(element.getValue() + 1)));
+        } catch (Exception e) {
+          // TODO: consider retrying.
+          LOG.warn("Getting exception when committing offset: {}", e.getMessage());
+        }
+      }
+    }
+
+    private Map<String, Object> overrideBootstrapServersConfig(
+        Map<String, Object> currentConfig, KafkaSourceDescriptor description) {
+      checkState(
+          currentConfig.containsKey(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG)
+              || description.getBootStrapServers() != null);
+      Map<String, Object> config = new HashMap<>(currentConfig);
+      if (description.getBootStrapServers() != null
+          && description.getBootStrapServers().size() > 0) {
+        config.put(
+            ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
+            String.join(",", description.getBootStrapServers()));
+      }
+      return config;
+    }
+  }
+
+  @Override
+  public PCollection<Void> expand(PCollection<KV<KafkaSourceDescriptor, KafkaRecord<K, V>>> input) {
+    try {
+      return input
+          .apply(
+              MapElements.into(new TypeDescriptor<KV<KafkaSourceDescriptor, Long>>() {})
+                  .via(element -> KV.of(element.getKey(), element.getValue().getOffset())))
+          .setCoder(
+              KvCoder.of(
+                  input
+                      .getPipeline()
+                      .getSchemaRegistry()
+                      .getSchemaCoder(KafkaSourceDescriptor.class),
+                  VarLongCoder.of()))
+          .apply(Window.into(FixedWindows.of(Duration.standardMinutes(5))))

Review comment:
       Thanks. Not for now, let's keep it as "it is".  I'm just thinking if a user would need to be able to configure this or not.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] boyuanzz commented on a change in pull request #12572: [BEAM-10123] Add Kafka Commit transform.

Posted by GitBox <gi...@apache.org>.
boyuanzz commented on a change in pull request #12572:
URL: https://github.com/apache/beam/pull/12572#discussion_r497125319



##########
File path: sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaCommitOffset.java
##########
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.kafka;
+
+import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.coders.VarLongCoder;
+import org.apache.beam.sdk.coders.VoidCoder;
+import org.apache.beam.sdk.schemas.NoSuchSchemaException;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.MapElements;
+import org.apache.beam.sdk.transforms.Max;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.transforms.windowing.FixedWindows;
+import org.apache.beam.sdk.transforms.windowing.Window;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.TypeDescriptor;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.joda.time.Duration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** A {@link PTransform} that commits offsets of {@link KafkaRecord}. */
+public class KafkaCommitOffset<K, V>
+    extends PTransform<
+        PCollection<KV<KafkaSourceDescriptor, KafkaRecord<K, V>>>, PCollection<Void>> {
+  private final KafkaIO.ReadSourceDescriptors<K, V> readSourceDescriptors;
+
+  KafkaCommitOffset(KafkaIO.ReadSourceDescriptors<K, V> readSourceDescriptors) {
+    this.readSourceDescriptors = readSourceDescriptors;
+  }
+
+  static class CommitOffsetDoFn extends DoFn<KV<KafkaSourceDescriptor, Long>, Void> {
+    private static final Logger LOG = LoggerFactory.getLogger(CommitOffsetDoFn.class);
+    private final Map<String, Object> offsetConsumerConfig;
+    private final Map<String, Object> consumerConfig;
+    private final SerializableFunction<Map<String, Object>, Consumer<byte[], byte[]>>
+        consumerFactoryFn;
+
+    private transient ConsumerSpEL consumerSpEL = null;
+
+    CommitOffsetDoFn(KafkaIO.ReadSourceDescriptors readSourceDescriptors) {
+      offsetConsumerConfig = readSourceDescriptors.getOffsetConsumerConfig();
+      consumerConfig = readSourceDescriptors.getConsumerConfig();
+      consumerFactoryFn = readSourceDescriptors.getConsumerFactoryFn();
+    }
+
+    @ProcessElement
+    public void processElement(@Element KV<KafkaSourceDescriptor, Long> element) {
+      Map<String, Object> updatedConsumerConfig =
+          overrideBootstrapServersConfig(consumerConfig, element.getKey());
+      try (Consumer<byte[], byte[]> offsetConsumer =
+          consumerFactoryFn.apply(
+              KafkaIOUtils.getOffsetConsumerConfig(
+                  "commitOffset", offsetConsumerConfig, updatedConsumerConfig))) {
+        try {
+          offsetConsumer.commitSync(
+              Collections.singletonMap(
+                  element.getKey().getTopicPartition(),
+                  new OffsetAndMetadata(element.getValue() + 1)));
+        } catch (Exception e) {
+          // TODO: consider retrying.
+          LOG.warn("Getting exception when committing offset: {}", e.getMessage());
+        }
+      }
+    }
+
+    private Map<String, Object> overrideBootstrapServersConfig(
+        Map<String, Object> currentConfig, KafkaSourceDescriptor description) {
+      checkState(
+          currentConfig.containsKey(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG)
+              || description.getBootStrapServers() != null);
+      Map<String, Object> config = new HashMap<>(currentConfig);
+      if (description.getBootStrapServers() != null
+          && description.getBootStrapServers().size() > 0) {
+        config.put(
+            ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
+            String.join(",", description.getBootStrapServers()));
+      }
+      return config;
+    }
+  }
+
+  @Override
+  public PCollection<Void> expand(PCollection<KV<KafkaSourceDescriptor, KafkaRecord<K, V>>> input) {
+    try {
+      return input
+          .apply(
+              MapElements.into(new TypeDescriptor<KV<KafkaSourceDescriptor, Long>>() {})
+                  .via(element -> KV.of(element.getKey(), element.getValue().getOffset())))
+          .setCoder(
+              KvCoder.of(
+                  input
+                      .getPipeline()
+                      .getSchemaRegistry()
+                      .getSchemaCoder(KafkaSourceDescriptor.class),
+                  VarLongCoder.of()))
+          .apply(Window.into(FixedWindows.of(Duration.standardMinutes(5))))

Review comment:
       That's my expectation of the time interval for committing. The reason for committing offset is to have a good start point when we restart the pipeline, so it not requires a real-time commtting. Do you have any suggestion on this time?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] aromanenko-dev commented on pull request #12572: [BEAM-10123] Add Kafka Commit transform.

Posted by GitBox <gi...@apache.org>.
aromanenko-dev commented on pull request #12572:
URL: https://github.com/apache/beam/pull/12572#issuecomment-699055049


   Thanks! I did a quick look on this - are you going to add some tests for this addition?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] aromanenko-dev commented on pull request #12572: [BEAM-10123] Add Kafka Commit transform.

Posted by GitBox <gi...@apache.org>.
aromanenko-dev commented on pull request #12572:
URL: https://github.com/apache/beam/pull/12572#issuecomment-721205595


   > How about, we check in this PR first if it looks to you, then I'll open another PR to setup the KafkaIO performance test properly?
   
   Sounds good me, thanks


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] aromanenko-dev merged pull request #12572: [BEAM-10123] Add Kafka Commit transform.

Posted by GitBox <gi...@apache.org>.
aromanenko-dev merged pull request #12572:
URL: https://github.com/apache/beam/pull/12572


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] aromanenko-dev commented on a change in pull request #12572: [BEAM-10123] Add Kafka Commit transform.

Posted by GitBox <gi...@apache.org>.
aromanenko-dev commented on a change in pull request #12572:
URL: https://github.com/apache/beam/pull/12572#discussion_r496100066



##########
File path: sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
##########
@@ -1620,14 +1635,43 @@ public void processElement(
       CoderRegistry coderRegistry = input.getPipeline().getCoderRegistry();
       Coder<K> keyCoder = getKeyCoder(coderRegistry);
       Coder<V> valueCoder = getValueCoder(coderRegistry);
-      Coder<KafkaRecord<K, V>> outputCoder = KafkaRecordCoder.of(keyCoder, valueCoder);
-      PCollection<KafkaRecord<K, V>> output =
-          input.apply(ParDo.of(new ReadFromKafkaDoFn<K, V>(this))).setCoder(outputCoder);
-      // TODO(BEAM-10123): Add CommitOffsetTransform to expansion.
-      if (isCommitOffsetEnabled() && !configuredKafkaCommit()) {
-        throw new IllegalStateException("Offset committed is not supported yet");
+      Coder<KafkaRecord<K, V>> recordCoder = KafkaRecordCoder.of(keyCoder, valueCoder);
+
+      try {
+        PCollection<KV<KafkaSourceDescriptor, KafkaRecord<K, V>>> outputWithDescriptor =
+            input
+                .apply(ParDo.of(new ReadFromKafkaDoFn<K, V>(this)))
+                .setCoder(
+                    KvCoder.of(
+                        input
+                            .getPipeline()
+                            .getSchemaRegistry()
+                            .getSchemaCoder(KafkaSourceDescriptor.class),
+                        recordCoder));
+        if (isCommitOffsetEnabled() && !configuredKafkaCommit()) {
+          outputWithDescriptor =
+              outputWithDescriptor
+                  .apply(Reshuffle.viaRandomKey())

Review comment:
       Why do we need `Reshuffle`? Won't it affect a performance? 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] sameerbhadouria commented on pull request #12572: [BEAM-10123] Add Kafka Commit transform.

Posted by GitBox <gi...@apache.org>.
sameerbhadouria commented on pull request #12572:
URL: https://github.com/apache/beam/pull/12572#issuecomment-761146614


   @boyuanzz  Any plans to support this for the Kafka Python sdk? 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] boyuanzz commented on pull request #12572: [BEAM-10123] Add Kafka Commit transform.

Posted by GitBox <gi...@apache.org>.
boyuanzz commented on pull request #12572:
URL: https://github.com/apache/beam/pull/12572#issuecomment-761148877


   > @boyuanzz Any plans to support this for the Kafka Python sdk?
   
   Hi, you can use KafkaIO in python with support of cross language pipeline. Here is one example pipeline: https://github.com/apache/beam/blob/master/sdks/python/apache_beam/examples/kafkataxi/kafka_taxi.py.
   
   More about cross language pipeline: https://beam.apache.org/documentation/programming-guide/#mulit-language-pipelines
   
   cc: @chamikaramj  


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] boyuanzz commented on pull request #12572: [BEAM-10123] Add Kafka Commit transform.

Posted by GitBox <gi...@apache.org>.
boyuanzz commented on pull request #12572:
URL: https://github.com/apache/beam/pull/12572#issuecomment-718267176


   I manually tested this transform with Dataflow runner v2 and I can confirm that the transform works as expected. But it's not easy to have an E2E test since the transform doesn't output values and it interacts with the external system.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] boyuanzz commented on a change in pull request #12572: [BEAM-10123] Add Kafka Commit transform.

Posted by GitBox <gi...@apache.org>.
boyuanzz commented on a change in pull request #12572:
URL: https://github.com/apache/beam/pull/12572#discussion_r497131119



##########
File path: sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
##########
@@ -1620,14 +1635,43 @@ public void processElement(
       CoderRegistry coderRegistry = input.getPipeline().getCoderRegistry();
       Coder<K> keyCoder = getKeyCoder(coderRegistry);
       Coder<V> valueCoder = getValueCoder(coderRegistry);
-      Coder<KafkaRecord<K, V>> outputCoder = KafkaRecordCoder.of(keyCoder, valueCoder);
-      PCollection<KafkaRecord<K, V>> output =
-          input.apply(ParDo.of(new ReadFromKafkaDoFn<K, V>(this))).setCoder(outputCoder);
-      // TODO(BEAM-10123): Add CommitOffsetTransform to expansion.
-      if (isCommitOffsetEnabled() && !configuredKafkaCommit()) {
-        throw new IllegalStateException("Offset committed is not supported yet");
+      Coder<KafkaRecord<K, V>> recordCoder = KafkaRecordCoder.of(keyCoder, valueCoder);
+
+      try {
+        PCollection<KV<KafkaSourceDescriptor, KafkaRecord<K, V>>> outputWithDescriptor =
+            input
+                .apply(ParDo.of(new ReadFromKafkaDoFn<K, V>(this)))
+                .setCoder(
+                    KvCoder.of(
+                        input
+                            .getPipeline()
+                            .getSchemaRegistry()
+                            .getSchemaCoder(KafkaSourceDescriptor.class),
+                        recordCoder));
+        if (isCommitOffsetEnabled() && !configuredKafkaCommit()) {
+          outputWithDescriptor =
+              outputWithDescriptor
+                  .apply(Reshuffle.viaRandomKey())

Review comment:
       The `Reshuffle` is used as a persistent layer which help us to guarantee that we will not re-read records priori to the committed offset. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] boyuanzz commented on pull request #12572: [BEAM-10123] Add Kafka Commit transform.

Posted by GitBox <gi...@apache.org>.
boyuanzz commented on pull request #12572:
URL: https://github.com/apache/beam/pull/12572#issuecomment-761161542


   > @boyuanzz I am using the cross language support from beam however the [ReadFromKafka](https://github.com/apache/beam/blob/0f36a72f80ecaac05d3e42f3874afae762c039d6/sdks/python/apache_beam/io/kafka.py#L110) transform does not return the Kafka record metadata.
   > It also needs support for `commitOffsetInFinalize` and `TimestampPolicyFactory`
   
   I see. From my understanding, we want to save duplicated work as long as we can leverage it to x-lang.
    It would be a good question for dev@beam.apache.org. Would you like to start a discussion around what you need? You may also want to share your pipeline code to help folks understand the case.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] boyuanzz commented on pull request #12572: [BEAM-10123] Add Kafka Commit transform.

Posted by GitBox <gi...@apache.org>.
boyuanzz commented on pull request #12572:
URL: https://github.com/apache/beam/pull/12572#issuecomment-701043637


   > Thanks, it looks fine in general for me. I left several questions, ptal.
   > 
   > My main concern is the following:
   > 
   > * Can we have a dataloss in case of failures during record processing while an offset of this partition is already committed in parallel pipeline's branch?
   > 
   That's the usage of `Reshuffle`. When there is a failure in record processing, the record will not be re-read from Kafka Read, instead it will be re-read from `Reshuffle`.
   
   > Also, the tests are very needed for this feature.
   I'm thinking about having tests with mock Kafka. Do you have suggestions/ideas around testing?
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] boyuanzz commented on a change in pull request #12572: [BEAM-10123] Add Kafka Commit transform.

Posted by GitBox <gi...@apache.org>.
boyuanzz commented on a change in pull request #12572:
URL: https://github.com/apache/beam/pull/12572#discussion_r497084530



##########
File path: sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaCommitOffset.java
##########
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.kafka;
+
+import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.coders.VarLongCoder;
+import org.apache.beam.sdk.coders.VoidCoder;
+import org.apache.beam.sdk.schemas.NoSuchSchemaException;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.MapElements;
+import org.apache.beam.sdk.transforms.Max;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.transforms.windowing.FixedWindows;
+import org.apache.beam.sdk.transforms.windowing.Window;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.TypeDescriptor;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.joda.time.Duration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** A {@link PTransform} that commits offsets of {@link KafkaRecord}. */
+public class KafkaCommitOffset<K, V>
+    extends PTransform<
+        PCollection<KV<KafkaSourceDescriptor, KafkaRecord<K, V>>>, PCollection<Void>> {
+  private final KafkaIO.ReadSourceDescriptors<K, V> readSourceDescriptors;
+
+  KafkaCommitOffset(KafkaIO.ReadSourceDescriptors<K, V> readSourceDescriptors) {
+    this.readSourceDescriptors = readSourceDescriptors;
+  }
+
+  static class CommitOffsetDoFn extends DoFn<KV<KafkaSourceDescriptor, Long>, Void> {
+    private static final Logger LOG = LoggerFactory.getLogger(CommitOffsetDoFn.class);
+    private final Map<String, Object> offsetConsumerConfig;
+    private final Map<String, Object> consumerConfig;
+    private final SerializableFunction<Map<String, Object>, Consumer<byte[], byte[]>>
+        consumerFactoryFn;
+
+    private transient ConsumerSpEL consumerSpEL = null;
+
+    CommitOffsetDoFn(KafkaIO.ReadSourceDescriptors readSourceDescriptors) {
+      offsetConsumerConfig = readSourceDescriptors.getOffsetConsumerConfig();
+      consumerConfig = readSourceDescriptors.getConsumerConfig();
+      consumerFactoryFn = readSourceDescriptors.getConsumerFactoryFn();
+    }
+
+    @ProcessElement
+    public void processElement(@Element KV<KafkaSourceDescriptor, Long> element) {
+      Map<String, Object> updatedConsumerConfig =
+          overrideBootstrapServersConfig(consumerConfig, element.getKey());

Review comment:
       How many uniq keys after `Max.longsPerKey() ` per bundle depends on the runner implementation. For dataflow specific, a bundle may contains many keys. The key I'm using here is a `KafkaSourceDescriptor`, which represents a unique Kafka connection(a topic + a partition + bootstrap servers).




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] sameerbhadouria commented on pull request #12572: [BEAM-10123] Add Kafka Commit transform.

Posted by GitBox <gi...@apache.org>.
sameerbhadouria commented on pull request #12572:
URL: https://github.com/apache/beam/pull/12572#issuecomment-761157308


   @boyuanzz I am using the cross language support from beam however the [ReadFromKafka](https://github.com/apache/beam/blob/0f36a72f80ecaac05d3e42f3874afae762c039d6/sdks/python/apache_beam/io/kafka.py#L110) transform does not return the Kafka record metadata. 
   It also needs support for `commitOffsetInFinalize` and `TimestampPolicyFactory`


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org