You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ch...@apache.org on 2022/05/18 16:15:43 UTC
[beam] branch master updated: [BEAM-10529] update KafkaIO Xlang integration test to publish and receive null keys (#17319)
This is an automated email from the ASF dual-hosted git repository.
chamikara pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 4ed320c3fb3 [BEAM-10529] update KafkaIO Xlang integration test to publish and receive null keys (#17319)
4ed320c3fb3 is described below
commit 4ed320c3fb39b900ce1fffad64add7729f40cf11
Author: johnjcasey <95...@users.noreply.github.com>
AuthorDate: Wed May 18 12:15:37 2022 -0400
[BEAM-10529] update KafkaIO Xlang integration test to publish and receive null keys (#17319)
* [BEAM-10529] update test to publish and receive null keys
* [BEAM-10529] add test with a populated key to kafka xlang_kafkaio_it_test.py
---
.../io/external/xlang_kafkaio_it_test.py | 29 ++++++++++++++++------
1 file changed, 22 insertions(+), 7 deletions(-)
diff --git a/sdks/python/apache_beam/io/external/xlang_kafkaio_it_test.py b/sdks/python/apache_beam/io/external/xlang_kafkaio_it_test.py
index 7f75e2bf3de..39a83d6b694 100644
--- a/sdks/python/apache_beam/io/external/xlang_kafkaio_it_test.py
+++ b/sdks/python/apache_beam/io/external/xlang_kafkaio_it_test.py
@@ -64,9 +64,11 @@ class CollectingFn(beam.DoFn):
class CrossLanguageKafkaIO(object):
- def __init__(self, bootstrap_servers, topic, expansion_service=None):
+ def __init__(
+ self, bootstrap_servers, topic, null_key, expansion_service=None):
self.bootstrap_servers = bootstrap_servers
self.topic = topic
+ self.null_key = null_key
self.expansion_service = expansion_service
self.sum_counter = Metrics.counter('source', 'elements_sum')
@@ -74,9 +76,9 @@ class CrossLanguageKafkaIO(object):
_ = (
pipeline
| 'Generate' >> beam.Create(range(NUM_RECORDS)) # pylint: disable=bad-option-value
- | 'MakeKV' >> beam.Map(lambda x:
- (b'', str(x).encode())).with_output_types(
- typing.Tuple[bytes, bytes])
+ | 'MakeKV' >> beam.Map(
+ lambda x: (None if self.null_key else b'key', str(x).encode())).
+ with_output_types(typing.Tuple[typing.Optional[bytes], bytes])
| 'WriteToKafka' >> WriteToKafka(
producer_config={'bootstrap.servers': self.bootstrap_servers},
topic=self.topic,
@@ -112,13 +114,26 @@ class CrossLanguageKafkaIO(object):
os.environ.get('LOCAL_KAFKA_JAR'),
"LOCAL_KAFKA_JAR environment var is not provided.")
class CrossLanguageKafkaIOTest(unittest.TestCase):
- def test_kafkaio(self):
- kafka_topic = 'xlang_kafkaio_test_{}'.format(uuid.uuid4())
+ def test_kafkaio_populated_key(self):
+ kafka_topic = 'xlang_kafkaio_test_populated_key_{}'.format(uuid.uuid4())
local_kafka_jar = os.environ.get('LOCAL_KAFKA_JAR')
with self.local_kafka_service(local_kafka_jar) as kafka_port:
bootstrap_servers = '{}:{}'.format(
self.get_platform_localhost(), kafka_port)
- pipeline_creator = CrossLanguageKafkaIO(bootstrap_servers, kafka_topic)
+ pipeline_creator = CrossLanguageKafkaIO(
+ bootstrap_servers, kafka_topic, False)
+
+ self.run_kafka_write(pipeline_creator)
+ self.run_kafka_read(pipeline_creator)
+
+ def test_kafkaio_null_key(self):
+ kafka_topic = 'xlang_kafkaio_test_null_key_{}'.format(uuid.uuid4())
+ local_kafka_jar = os.environ.get('LOCAL_KAFKA_JAR')
+ with self.local_kafka_service(local_kafka_jar) as kafka_port:
+ bootstrap_servers = '{}:{}'.format(
+ self.get_platform_localhost(), kafka_port)
+ pipeline_creator = CrossLanguageKafkaIO(
+ bootstrap_servers, kafka_topic, True)
self.run_kafka_write(pipeline_creator)
self.run_kafka_read(pipeline_creator)