You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by bo...@apache.org on 2021/01/22 19:14:40 UTC
[beam] branch master updated: Expose commit_offset_in_finalize and
timestamp_policy to ReadFromKafka.
This is an automated email from the ASF dual-hosted git repository.
boyuanz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 04ae01c Expose commit_offset_in_finalize and timestamp_policy to ReadFromKafka.
new 8b759d1 Merge pull request #13779 from [BEAM-11677] Expose commit_offset_in_finalize and timestamp_policy to ReadFromKafka
04ae01c is described below
commit 04ae01ca97e0258d831d80bc5216ec160605a981
Author: Boyuan Zhang <bo...@google.com>
AuthorDate: Tue Jan 19 15:49:34 2021 -0800
Expose commit_offset_in_finalize and timestamp_policy to ReadFromKafka.
---
.../java/org/apache/beam/sdk/io/kafka/KafkaIO.java | 29 ++++++++++++++++--
.../beam/sdk/io/kafka/KafkaIOExternalTest.java | 6 +++-
sdks/python/apache_beam/io/kafka.py | 34 +++++++++++++++-------
.../runners/portability/flink_runner_test.py | 5 +++-
4 files changed, 60 insertions(+), 14 deletions(-)
diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
index 60608e0..d77d4fa 100644
--- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
+++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
@@ -586,8 +586,23 @@ public class KafkaIO {
setMaxReadTime(Duration.standardSeconds(config.maxReadTime));
}
setMaxNumRecords(config.maxNumRecords == null ? Long.MAX_VALUE : config.maxNumRecords);
- setCommitOffsetsInFinalizeEnabled(false);
- setTimestampPolicyFactory(TimestampPolicyFactory.withProcessingTime());
+
+ // Set committing offset configuration.
+ setCommitOffsetsInFinalizeEnabled(config.commitOffsetInFinalize);
+
+ // Set timestamp policy with built-in types.
+ String timestampPolicy = config.timestampPolicy;
+ if (timestampPolicy.equals("ProcessingTime")) {
+ setTimestampPolicyFactory(TimestampPolicyFactory.withProcessingTime());
+ } else if (timestampPolicy.equals("CreateTime")) {
+ setTimestampPolicyFactory(TimestampPolicyFactory.withCreateTime(Duration.ZERO));
+ } else if (timestampPolicy.equals("LogAppendTime")) {
+ setTimestampPolicyFactory(TimestampPolicyFactory.withLogAppendTime());
+ } else {
+ throw new IllegalArgumentException(
+ "timestampPolicy should be one of (ProcessingTime, CreateTime, LogAppendTime)");
+ }
+
if (config.startReadTime != null) {
setStartReadTime(Instant.ofEpochMilli(config.startReadTime));
}
@@ -645,6 +660,8 @@ public class KafkaIO {
private Long startReadTime;
private Long maxNumRecords;
private Long maxReadTime;
+ private Boolean commitOffsetInFinalize;
+ private String timestampPolicy;
public void setConsumerConfig(Map<String, String> consumerConfig) {
this.consumerConfig = consumerConfig;
@@ -673,6 +690,14 @@ public class KafkaIO {
public void setMaxReadTime(Long maxReadTime) {
this.maxReadTime = maxReadTime;
}
+
+ public void setCommitOffsetInFinalize(Boolean commitOffsetInFinalize) {
+ this.commitOffsetInFinalize = commitOffsetInFinalize;
+ }
+
+ public void setTimestampPolicy(String timestampPolicy) {
+ this.timestampPolicy = timestampPolicy;
+ }
}
}
diff --git a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOExternalTest.java b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOExternalTest.java
index 82c67eb..39b2981 100644
--- a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOExternalTest.java
+++ b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOExternalTest.java
@@ -89,12 +89,16 @@ public class KafkaIOExternalTest {
"consumer_config", FieldType.map(FieldType.STRING, FieldType.STRING)),
Field.of("key_deserializer", FieldType.STRING),
Field.of("value_deserializer", FieldType.STRING),
- Field.of("start_read_time", FieldType.INT64)))
+ Field.of("start_read_time", FieldType.INT64),
+ Field.of("commit_offset_in_finalize", FieldType.BOOLEAN),
+ Field.of("timestamp_policy", FieldType.STRING)))
.withFieldValue("topics", topics)
.withFieldValue("consumer_config", consumerConfig)
.withFieldValue("key_deserializer", keyDeserializer)
.withFieldValue("value_deserializer", valueDeserializer)
.withFieldValue("start_read_time", startReadTime)
+ .withFieldValue("commit_offset_in_finalize", false)
+ .withFieldValue("timestamp_policy", "ProcessingTime")
.build());
RunnerApi.Components defaultInstance = RunnerApi.Components.getDefaultInstance();
diff --git a/sdks/python/apache_beam/io/kafka.py b/sdks/python/apache_beam/io/kafka.py
index b36c791..9a97737 100644
--- a/sdks/python/apache_beam/io/kafka.py
+++ b/sdks/python/apache_beam/io/kafka.py
@@ -92,15 +92,12 @@ from apache_beam.transforms.external import NamedTupleBasedPayloadBuilder
ReadFromKafkaSchema = typing.NamedTuple(
'ReadFromKafkaSchema',
- [
- ('consumer_config', typing.Mapping[unicode, unicode]),
- ('topics', typing.List[unicode]),
- ('key_deserializer', unicode),
- ('value_deserializer', unicode),
- ('start_read_time', typing.Optional[int]),
- ('max_num_records', typing.Optional[int]),
- ('max_read_time', typing.Optional[int]),
- ])
+ [('consumer_config', typing.Mapping[unicode, unicode]),
+ ('topics', typing.List[unicode]), ('key_deserializer', unicode),
+ ('value_deserializer', unicode), ('start_read_time', typing.Optional[int]),
+ ('max_num_records', typing.Optional[int]),
+ ('max_read_time', typing.Optional[int]),
+ ('commit_offset_in_finalize', bool), ('timestamp_policy', str)])
def default_io_expansion_service():
@@ -120,6 +117,10 @@ class ReadFromKafka(ExternalTransform):
byte_array_deserializer = (
'org.apache.kafka.common.serialization.ByteArrayDeserializer')
+ processing_time_policy = 'ProcessingTime'
+ create_time_policy = 'CreateTime'
+ log_append_time = 'LogAppendTime'
+
URN = 'beam:external:java:kafka:read:v1'
def __init__(
@@ -131,6 +132,8 @@ class ReadFromKafka(ExternalTransform):
start_read_time=None,
max_num_records=None,
max_read_time=None,
+ commit_offset_in_finalize=False,
+ timestamp_policy=processing_time_policy,
expansion_service=None,
):
"""
@@ -152,8 +155,18 @@ class ReadFromKafka(ExternalTransform):
for tests and demo applications.
:param max_read_time: Maximum amount of time in seconds the transform
executes. Mainly used for tests and demo applications.
+ :param commit_offset_in_finalize: Whether to commit offsets when finalizing.
+ :param timestamp_policy: The built-in timestamp policy which is used for
+ extracting timestamp from KafkaRecord.
:param expansion_service: The address (host:port) of the ExpansionService.
"""
+ if timestamp_policy not in [ReadFromKafka.processing_time_policy,
+ ReadFromKafka.create_time_policy,
+ ReadFromKafka.log_append_time]:
+ raise ValueError(
+ 'timestamp_policy should be one of '
+ '[ProcessingTime, CreateTime, LogAppendTime]')
+
super(ReadFromKafka, self).__init__(
self.URN,
NamedTupleBasedPayloadBuilder(
@@ -165,7 +178,8 @@ class ReadFromKafka(ExternalTransform):
max_num_records=max_num_records,
max_read_time=max_read_time,
start_read_time=start_read_time,
- )),
+ commit_offset_in_finalize=commit_offset_in_finalize,
+ timestamp_policy=timestamp_policy)),
expansion_service or default_io_expansion_service())
diff --git a/sdks/python/apache_beam/runners/portability/flink_runner_test.py b/sdks/python/apache_beam/runners/portability/flink_runner_test.py
index a5f6ac3..2b4c8a6 100644
--- a/sdks/python/apache_beam/runners/portability/flink_runner_test.py
+++ b/sdks/python/apache_beam/runners/portability/flink_runner_test.py
@@ -238,7 +238,8 @@ class FlinkRunnerTest(portable_runner_test.PortableRunnerTest):
p
| ReadFromKafka(
consumer_config={
- 'bootstrap.servers': 'notvalid1:7777, notvalid2:3531'
+ 'bootstrap.servers': 'notvalid1:7777, notvalid2:3531',
+ 'group.id': 'any_group'
},
topics=['topic1', 'topic2'],
key_deserializer='org.apache.kafka.'
@@ -247,6 +248,8 @@ class FlinkRunnerTest(portable_runner_test.PortableRunnerTest):
value_deserializer='org.apache.kafka.'
'common.serialization.'
'LongDeserializer',
+ commit_offset_in_finalize=True,
+ timestamp_policy=ReadFromKafka.create_time_policy,
expansion_service=self.get_expansion_service()))
self.assertTrue(
'No resolvable bootstrap urls given in bootstrap.servers' in str(