You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ch...@apache.org on 2022/05/18 16:15:43 UTC

[beam] branch master updated: [BEAM-10529] update KafkaIO Xlang integration test to publish and receive null keys (#17319)

This is an automated email from the ASF dual-hosted git repository.

chamikara pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 4ed320c3fb3 [BEAM-10529] update KafkaIO Xlang integration test to publish and receive null keys (#17319)
4ed320c3fb3 is described below

commit 4ed320c3fb39b900ce1fffad64add7729f40cf11
Author: johnjcasey <95...@users.noreply.github.com>
AuthorDate: Wed May 18 12:15:37 2022 -0400

    [BEAM-10529] update KafkaIO Xlang integration test to publish and receive null keys (#17319)
    
    * [BEAM-10529] update test to publish and receive null keys
    
    * [BEAM-10529] add test with a populated key to kafka xlang_kafkaio_it_test.py
---
 .../io/external/xlang_kafkaio_it_test.py           | 29 ++++++++++++++++------
 1 file changed, 22 insertions(+), 7 deletions(-)

diff --git a/sdks/python/apache_beam/io/external/xlang_kafkaio_it_test.py b/sdks/python/apache_beam/io/external/xlang_kafkaio_it_test.py
index 7f75e2bf3de..39a83d6b694 100644
--- a/sdks/python/apache_beam/io/external/xlang_kafkaio_it_test.py
+++ b/sdks/python/apache_beam/io/external/xlang_kafkaio_it_test.py
@@ -64,9 +64,11 @@ class CollectingFn(beam.DoFn):
 
 
 class CrossLanguageKafkaIO(object):
-  def __init__(self, bootstrap_servers, topic, expansion_service=None):
+  def __init__(
+      self, bootstrap_servers, topic, null_key, expansion_service=None):
     self.bootstrap_servers = bootstrap_servers
     self.topic = topic
+    self.null_key = null_key
     self.expansion_service = expansion_service
     self.sum_counter = Metrics.counter('source', 'elements_sum')
 
@@ -74,9 +76,9 @@ class CrossLanguageKafkaIO(object):
     _ = (
         pipeline
         | 'Generate' >> beam.Create(range(NUM_RECORDS))  # pylint: disable=bad-option-value
-        | 'MakeKV' >> beam.Map(lambda x:
-                               (b'', str(x).encode())).with_output_types(
-                                   typing.Tuple[bytes, bytes])
+        | 'MakeKV' >> beam.Map(
+            lambda x: (None if self.null_key else b'key', str(x).encode())).
+        with_output_types(typing.Tuple[typing.Optional[bytes], bytes])
         | 'WriteToKafka' >> WriteToKafka(
             producer_config={'bootstrap.servers': self.bootstrap_servers},
             topic=self.topic,
@@ -112,13 +114,26 @@ class CrossLanguageKafkaIO(object):
     os.environ.get('LOCAL_KAFKA_JAR'),
     "LOCAL_KAFKA_JAR environment var is not provided.")
 class CrossLanguageKafkaIOTest(unittest.TestCase):
-  def test_kafkaio(self):
-    kafka_topic = 'xlang_kafkaio_test_{}'.format(uuid.uuid4())
+  def test_kafkaio_populated_key(self):
+    kafka_topic = 'xlang_kafkaio_test_populated_key_{}'.format(uuid.uuid4())
     local_kafka_jar = os.environ.get('LOCAL_KAFKA_JAR')
     with self.local_kafka_service(local_kafka_jar) as kafka_port:
       bootstrap_servers = '{}:{}'.format(
           self.get_platform_localhost(), kafka_port)
-      pipeline_creator = CrossLanguageKafkaIO(bootstrap_servers, kafka_topic)
+      pipeline_creator = CrossLanguageKafkaIO(
+          bootstrap_servers, kafka_topic, False)
+
+      self.run_kafka_write(pipeline_creator)
+      self.run_kafka_read(pipeline_creator)
+
+  def test_kafkaio_null_key(self):
+    kafka_topic = 'xlang_kafkaio_test_null_key_{}'.format(uuid.uuid4())
+    local_kafka_jar = os.environ.get('LOCAL_KAFKA_JAR')
+    with self.local_kafka_service(local_kafka_jar) as kafka_port:
+      bootstrap_servers = '{}:{}'.format(
+          self.get_platform_localhost(), kafka_port)
+      pipeline_creator = CrossLanguageKafkaIO(
+          bootstrap_servers, kafka_topic, True)
 
       self.run_kafka_write(pipeline_creator)
       self.run_kafka_read(pipeline_creator)