You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by bo...@apache.org on 2021/01/22 19:14:40 UTC

[beam] branch master updated: Expose commit_offset_in_finalize and timestamp_policy to ReadFromKafka.

This is an automated email from the ASF dual-hosted git repository.

boyuanz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 04ae01c  Expose commit_offset_in_finalize and timestamp_policy to ReadFromKafka.
     new 8b759d1  Merge pull request #13779 from [BEAM-11677] Expose commit_offset_in_finalize and timestamp_policy to ReadFromKafka
04ae01c is described below

commit 04ae01ca97e0258d831d80bc5216ec160605a981
Author: Boyuan Zhang <bo...@google.com>
AuthorDate: Tue Jan 19 15:49:34 2021 -0800

    Expose commit_offset_in_finalize and timestamp_policy to ReadFromKafka.
---
 .../java/org/apache/beam/sdk/io/kafka/KafkaIO.java | 29 ++++++++++++++++--
 .../beam/sdk/io/kafka/KafkaIOExternalTest.java     |  6 +++-
 sdks/python/apache_beam/io/kafka.py                | 34 +++++++++++++++-------
 .../runners/portability/flink_runner_test.py       |  5 +++-
 4 files changed, 60 insertions(+), 14 deletions(-)

diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
index 60608e0..d77d4fa 100644
--- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
+++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
@@ -586,8 +586,23 @@ public class KafkaIO {
           setMaxReadTime(Duration.standardSeconds(config.maxReadTime));
         }
         setMaxNumRecords(config.maxNumRecords == null ? Long.MAX_VALUE : config.maxNumRecords);
-        setCommitOffsetsInFinalizeEnabled(false);
-        setTimestampPolicyFactory(TimestampPolicyFactory.withProcessingTime());
+
+        // Set committing offset configuration.
+        setCommitOffsetsInFinalizeEnabled(config.commitOffsetInFinalize);
+
+        // Set timestamp policy with built-in types.
+        String timestampPolicy = config.timestampPolicy;
+        if (timestampPolicy.equals("ProcessingTime")) {
+          setTimestampPolicyFactory(TimestampPolicyFactory.withProcessingTime());
+        } else if (timestampPolicy.equals("CreateTime")) {
+          setTimestampPolicyFactory(TimestampPolicyFactory.withCreateTime(Duration.ZERO));
+        } else if (timestampPolicy.equals("LogAppendTime")) {
+          setTimestampPolicyFactory(TimestampPolicyFactory.withLogAppendTime());
+        } else {
+          throw new IllegalArgumentException(
+              "timestampPolicy should be one of (ProcessingTime, CreateTime, LogAppendTime)");
+        }
+
         if (config.startReadTime != null) {
           setStartReadTime(Instant.ofEpochMilli(config.startReadTime));
         }
@@ -645,6 +660,8 @@ public class KafkaIO {
         private Long startReadTime;
         private Long maxNumRecords;
         private Long maxReadTime;
+        private Boolean commitOffsetInFinalize;
+        private String timestampPolicy;
 
         public void setConsumerConfig(Map<String, String> consumerConfig) {
           this.consumerConfig = consumerConfig;
@@ -673,6 +690,14 @@ public class KafkaIO {
         public void setMaxReadTime(Long maxReadTime) {
           this.maxReadTime = maxReadTime;
         }
+
+        public void setCommitOffsetInFinalize(Boolean commitOffsetInFinalize) {
+          this.commitOffsetInFinalize = commitOffsetInFinalize;
+        }
+
+        public void setTimestampPolicy(String timestampPolicy) {
+          this.timestampPolicy = timestampPolicy;
+        }
       }
     }
 
diff --git a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOExternalTest.java b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOExternalTest.java
index 82c67eb..39b2981 100644
--- a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOExternalTest.java
+++ b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOExternalTest.java
@@ -89,12 +89,16 @@ public class KafkaIOExternalTest {
                             "consumer_config", FieldType.map(FieldType.STRING, FieldType.STRING)),
                         Field.of("key_deserializer", FieldType.STRING),
                         Field.of("value_deserializer", FieldType.STRING),
-                        Field.of("start_read_time", FieldType.INT64)))
+                        Field.of("start_read_time", FieldType.INT64),
+                        Field.of("commit_offset_in_finalize", FieldType.BOOLEAN),
+                        Field.of("timestamp_policy", FieldType.STRING)))
                 .withFieldValue("topics", topics)
                 .withFieldValue("consumer_config", consumerConfig)
                 .withFieldValue("key_deserializer", keyDeserializer)
                 .withFieldValue("value_deserializer", valueDeserializer)
                 .withFieldValue("start_read_time", startReadTime)
+                .withFieldValue("commit_offset_in_finalize", false)
+                .withFieldValue("timestamp_policy", "ProcessingTime")
                 .build());
 
     RunnerApi.Components defaultInstance = RunnerApi.Components.getDefaultInstance();
diff --git a/sdks/python/apache_beam/io/kafka.py b/sdks/python/apache_beam/io/kafka.py
index b36c791..9a97737 100644
--- a/sdks/python/apache_beam/io/kafka.py
+++ b/sdks/python/apache_beam/io/kafka.py
@@ -92,15 +92,12 @@ from apache_beam.transforms.external import NamedTupleBasedPayloadBuilder
 
 ReadFromKafkaSchema = typing.NamedTuple(
     'ReadFromKafkaSchema',
-    [
-        ('consumer_config', typing.Mapping[unicode, unicode]),
-        ('topics', typing.List[unicode]),
-        ('key_deserializer', unicode),
-        ('value_deserializer', unicode),
-        ('start_read_time', typing.Optional[int]),
-        ('max_num_records', typing.Optional[int]),
-        ('max_read_time', typing.Optional[int]),
-    ])
+    [('consumer_config', typing.Mapping[unicode, unicode]),
+     ('topics', typing.List[unicode]), ('key_deserializer', unicode),
+     ('value_deserializer', unicode), ('start_read_time', typing.Optional[int]),
+     ('max_num_records', typing.Optional[int]),
+     ('max_read_time', typing.Optional[int]),
+     ('commit_offset_in_finalize', bool), ('timestamp_policy', str)])
 
 
 def default_io_expansion_service():
@@ -120,6 +117,10 @@ class ReadFromKafka(ExternalTransform):
   byte_array_deserializer = (
       'org.apache.kafka.common.serialization.ByteArrayDeserializer')
 
+  processing_time_policy = 'ProcessingTime'
+  create_time_policy = 'CreateTime'
+  log_append_time = 'LogAppendTime'
+
   URN = 'beam:external:java:kafka:read:v1'
 
   def __init__(
@@ -131,6 +132,8 @@ class ReadFromKafka(ExternalTransform):
       start_read_time=None,
       max_num_records=None,
       max_read_time=None,
+      commit_offset_in_finalize=False,
+      timestamp_policy=processing_time_policy,
       expansion_service=None,
   ):
     """
@@ -152,8 +155,18 @@ class ReadFromKafka(ExternalTransform):
         for tests and demo applications.
     :param max_read_time: Maximum amount of time in seconds the transform
         executes. Mainly used for tests and demo applications.
+    :param commit_offset_in_finalize: Whether to commit offsets when finalizing.
+    :param timestamp_policy: The built-in timestamp policy which is used for
+        extracting timestamp from KafkaRecord.
     :param expansion_service: The address (host:port) of the ExpansionService.
     """
+    if timestamp_policy not in [ReadFromKafka.processing_time_policy,
+                                ReadFromKafka.create_time_policy,
+                                ReadFromKafka.log_append_time]:
+      raise ValueError(
+          'timestamp_policy should be one of '
+          '[ProcessingTime, CreateTime, LogAppendTime]')
+
     super(ReadFromKafka, self).__init__(
         self.URN,
         NamedTupleBasedPayloadBuilder(
@@ -165,7 +178,8 @@ class ReadFromKafka(ExternalTransform):
                 max_num_records=max_num_records,
                 max_read_time=max_read_time,
                 start_read_time=start_read_time,
-            )),
+                commit_offset_in_finalize=commit_offset_in_finalize,
+                timestamp_policy=timestamp_policy)),
         expansion_service or default_io_expansion_service())
 
 
diff --git a/sdks/python/apache_beam/runners/portability/flink_runner_test.py b/sdks/python/apache_beam/runners/portability/flink_runner_test.py
index a5f6ac3..2b4c8a6 100644
--- a/sdks/python/apache_beam/runners/portability/flink_runner_test.py
+++ b/sdks/python/apache_beam/runners/portability/flink_runner_test.py
@@ -238,7 +238,8 @@ class FlinkRunnerTest(portable_runner_test.PortableRunnerTest):
             p
             | ReadFromKafka(
                 consumer_config={
-                    'bootstrap.servers': 'notvalid1:7777, notvalid2:3531'
+                    'bootstrap.servers': 'notvalid1:7777, notvalid2:3531',
+                    'group.id': 'any_group'
                 },
                 topics=['topic1', 'topic2'],
                 key_deserializer='org.apache.kafka.'
@@ -247,6 +248,8 @@ class FlinkRunnerTest(portable_runner_test.PortableRunnerTest):
                 value_deserializer='org.apache.kafka.'
                 'common.serialization.'
                 'LongDeserializer',
+                commit_offset_in_finalize=True,
+                timestamp_policy=ReadFromKafka.create_time_policy,
                 expansion_service=self.get_expansion_service()))
     self.assertTrue(
         'No resolvable bootstrap urls given in bootstrap.servers' in str(