You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ch...@apache.org on 2022/04/28 16:17:35 UTC

[beam] branch master updated: [BEAM-14297] Enable nullable key and value arrays for xlang kafka io with metadata (#17348)

This is an automated email from the ASF dual-hosted git repository.

chamikara pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 2ba6eaed073 [BEAM-14297] Enable nullable key and value arrays for xlang kafka io with metadata (#17348)
2ba6eaed073 is described below

commit 2ba6eaed073a4f1a658ca77fb00060bc65a7f6a3
Author: johnjcasey <95...@users.noreply.github.com>
AuthorDate: Thu Apr 28 12:17:27 2022 -0400

    [BEAM-14297] Enable nullable key and value arrays for xlang kafka io with metadata (#17348)
    
    * [BEAM-14297] Enable nullable arrays for kafka io with metadata
    
    * [BEAM-14297] run spotless
---
 .../java/org/apache/beam/sdk/io/kafka/KafkaIO.java |  4 ++--
 .../beam/sdk/io/kafka/KafkaIOExternalTest.java     | 24 ++++++++++++++++++++++
 2 files changed, 26 insertions(+), 2 deletions(-)

diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
index 314a6c49f23..a88df313ff9 100644
--- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
+++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
@@ -1679,8 +1679,8 @@ public class KafkaIO {
         int partition,
         long offset,
         long timestamp,
-        byte[] key,
-        byte[] value,
+        byte @Nullable [] key,
+        byte @Nullable [] value,
         @Nullable List<KafkaHeader> headers,
         int timestampTypeId,
         String timestampTypeName) {
diff --git a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOExternalTest.java b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOExternalTest.java
index d1a292c2334..98d5c15644d 100644
--- a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOExternalTest.java
+++ b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOExternalTest.java
@@ -20,6 +20,7 @@ package org.apache.beam.sdk.io.kafka;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
 
 import java.io.IOException;
 import java.nio.charset.StandardCharsets;
@@ -189,6 +190,29 @@ public class KafkaIOExternalTest {
     assertEquals("dummyHeaderVal", new String(byteArrayKafkaRecord.headers.get(0).value, "UTF-8"));
   }
 
+  @Test
+  public void testKafkaRecordToExternalKafkaRecordWithNullKeyAndValue() throws Exception {
+    RecordHeaders headers = new RecordHeaders();
+    headers.add("dummyHeaderKey", "dummyHeaderVal".getBytes(StandardCharsets.UTF_8));
+    KafkaRecord<byte[], byte[]> kafkaRecord =
+        new KafkaRecord(
+            "dummyTopic", 111, 222, 12345, KafkaTimestampType.LOG_APPEND_TIME, headers, null, null);
+
+    ByteArrayKafkaRecord byteArrayKafkaRecord = RowsWithMetadata.toExternalKafkaRecord(kafkaRecord);
+
+    assertEquals("dummyTopic", byteArrayKafkaRecord.topic);
+    assertEquals(111, byteArrayKafkaRecord.partition);
+    assertEquals(222, byteArrayKafkaRecord.offset);
+    assertEquals(12345, byteArrayKafkaRecord.timestamp);
+    assertEquals(KafkaTimestampType.LOG_APPEND_TIME.id, byteArrayKafkaRecord.timestampTypeId);
+    assertEquals(KafkaTimestampType.LOG_APPEND_TIME.name, byteArrayKafkaRecord.timestampTypeName);
+    assertNull(byteArrayKafkaRecord.key);
+    assertNull(byteArrayKafkaRecord.value);
+    assertEquals(1, byteArrayKafkaRecord.headers.size());
+    assertEquals("dummyHeaderKey", byteArrayKafkaRecord.headers.get(0).key);
+    assertEquals("dummyHeaderVal", new String(byteArrayKafkaRecord.headers.get(0).value, "UTF-8"));
+  }
+
   @Test
   public void testConstructKafkaReadWithoutMetadata() throws Exception {
     List<String> topics = ImmutableList.of("topic1", "topic2");