You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ch...@apache.org on 2022/04/28 16:17:35 UTC
[beam] branch master updated: [BEAM-14297] Enable nullable key and value arrays for xlang kafka io with metadata (#17348)
This is an automated email from the ASF dual-hosted git repository.
chamikara pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 2ba6eaed073 [BEAM-14297] Enable nullable key and value arrays for xlang kafka io with metadata (#17348)
2ba6eaed073 is described below
commit 2ba6eaed073a4f1a658ca77fb00060bc65a7f6a3
Author: johnjcasey <95...@users.noreply.github.com>
AuthorDate: Thu Apr 28 12:17:27 2022 -0400
[BEAM-14297] Enable nullable key and value arrays for xlang kafka io with metadata (#17348)
* [BEAM-14297] Enable nullable arrays for kafka io with metadata
* [BEAM-14297] run spotless
---
.../java/org/apache/beam/sdk/io/kafka/KafkaIO.java | 4 ++--
.../beam/sdk/io/kafka/KafkaIOExternalTest.java | 24 ++++++++++++++++++++++
2 files changed, 26 insertions(+), 2 deletions(-)
diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
index 314a6c49f23..a88df313ff9 100644
--- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
+++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
@@ -1679,8 +1679,8 @@ public class KafkaIO {
int partition,
long offset,
long timestamp,
- byte[] key,
- byte[] value,
+ byte @Nullable [] key,
+ byte @Nullable [] value,
@Nullable List<KafkaHeader> headers,
int timestampTypeId,
String timestampTypeName) {
diff --git a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOExternalTest.java b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOExternalTest.java
index d1a292c2334..98d5c15644d 100644
--- a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOExternalTest.java
+++ b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOExternalTest.java
@@ -20,6 +20,7 @@ package org.apache.beam.sdk.io.kafka;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
@@ -189,6 +190,29 @@ public class KafkaIOExternalTest {
assertEquals("dummyHeaderVal", new String(byteArrayKafkaRecord.headers.get(0).value, "UTF-8"));
}
+ @Test
+ public void testKafkaRecordToExternalKafkaRecordWithNullKeyAndValue() throws Exception {
+ RecordHeaders headers = new RecordHeaders();
+ headers.add("dummyHeaderKey", "dummyHeaderVal".getBytes(StandardCharsets.UTF_8));
+ KafkaRecord<byte[], byte[]> kafkaRecord =
+ new KafkaRecord(
+ "dummyTopic", 111, 222, 12345, KafkaTimestampType.LOG_APPEND_TIME, headers, null, null);
+
+ ByteArrayKafkaRecord byteArrayKafkaRecord = RowsWithMetadata.toExternalKafkaRecord(kafkaRecord);
+
+ assertEquals("dummyTopic", byteArrayKafkaRecord.topic);
+ assertEquals(111, byteArrayKafkaRecord.partition);
+ assertEquals(222, byteArrayKafkaRecord.offset);
+ assertEquals(12345, byteArrayKafkaRecord.timestamp);
+ assertEquals(KafkaTimestampType.LOG_APPEND_TIME.id, byteArrayKafkaRecord.timestampTypeId);
+ assertEquals(KafkaTimestampType.LOG_APPEND_TIME.name, byteArrayKafkaRecord.timestampTypeName);
+ assertNull(byteArrayKafkaRecord.key);
+ assertNull(byteArrayKafkaRecord.value);
+ assertEquals(1, byteArrayKafkaRecord.headers.size());
+ assertEquals("dummyHeaderKey", byteArrayKafkaRecord.headers.get(0).key);
+ assertEquals("dummyHeaderVal", new String(byteArrayKafkaRecord.headers.get(0).value, "UTF-8"));
+ }
+
@Test
public void testConstructKafkaReadWithoutMetadata() throws Exception {
List<String> topics = ImmutableList.of("topic1", "topic2");