You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ch...@apache.org on 2022/06/01 17:59:27 UTC
[beam] branch master updated: [BEAM-14297] add nullable annotations and an integration test (#17742)
This is an automated email from the ASF dual-hosted git repository.
chamikara pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new f765a91b1fd [BEAM-14297] add nullable annotations and an integration test (#17742)
f765a91b1fd is described below
commit f765a91b1fdf1fb7b9fe9aebdc2fde873a179835
Author: johnjcasey <95...@users.noreply.github.com>
AuthorDate: Wed Jun 1 13:59:22 2022 -0400
[BEAM-14297] add nullable annotations and an integration test (#17742)
* [BEAM-14297] add nullable annotations and an integration test
* [BEAM-14297] verify null kv is returned from kafka
* [BEAM-14297] spotless
---
.../java/org/apache/beam/sdk/io/kafka/KafkaIO.java | 4 +-
.../org/apache/beam/sdk/io/kafka/KafkaIOIT.java | 60 ++++++++++++++++++++++
2 files changed, 62 insertions(+), 2 deletions(-)
diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
index 34ea6ef51b9..509aa17103e 100644
--- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
+++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
@@ -1673,8 +1673,8 @@ public class KafkaIO {
int partition;
long offset;
long timestamp;
- byte[] key;
- byte[] value;
+ byte @Nullable [] key;
+ byte @Nullable [] value;
List<KafkaHeader> headers;
int timestampTypeId;
String timestampTypeName;
diff --git a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOIT.java b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOIT.java
index 173a10f1234..68ce1e44192 100644
--- a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOIT.java
+++ b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOIT.java
@@ -19,15 +19,20 @@ package org.apache.beam.sdk.io.kafka;
import static org.apache.beam.sdk.io.synthetic.SyntheticOptions.fromJsonString;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
import com.google.cloud.Timestamp;
import java.io.IOException;
+import java.util.ArrayList;
import java.util.Arrays;
+import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.function.BiFunction;
import org.apache.beam.sdk.PipelineResult;
+import org.apache.beam.sdk.coders.ByteArrayCoder;
+import org.apache.beam.sdk.coders.NullableCoder;
import org.apache.beam.sdk.io.Read;
import org.apache.beam.sdk.io.common.HashingFn;
import org.apache.beam.sdk.io.common.IOITHelper;
@@ -48,13 +53,17 @@ import org.apache.beam.sdk.testutils.metrics.MetricsReader;
import org.apache.beam.sdk.testutils.metrics.TimeMonitor;
import org.apache.beam.sdk.testutils.publishing.InfluxDBSettings;
import org.apache.beam.sdk.transforms.Combine;
+import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.SimpleFunction;
+import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.Row;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableSet;
+import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.joda.time.Duration;
@@ -203,6 +212,57 @@ public class KafkaIOIT {
}
}
+ // This test roundtrips a single KV<Null,Null> to verify that externalWithMetadata
+ // can handle null keys and values correctly.
+ @Test
+ public void testKafkaIOExternalRoundtripWithMetadataAndNullKeysAndValues() {
+
+ List<byte[]> nullList = new ArrayList<>();
+ nullList.add(null);
+ writePipeline
+ .apply(Create.of(nullList))
+ .apply(
+ ParDo.of(
+ new DoFn<byte[], KV<byte[], byte[]>>() {
+ @ProcessElement
+ public void processElement(
+ @Element byte[] element, OutputReceiver<KV<byte[], byte[]>> receiver) {
+ receiver.output(KV.of(element, element));
+ }
+ }))
+ .apply(
+ KafkaIO.<byte[], byte[]>write()
+ .withBootstrapServers(options.getKafkaBootstrapServerAddresses())
+ .withTopic(options.getKafkaTopic())
+ .withKeySerializer(ByteArraySerializer.class)
+ .withValueSerializer(ByteArraySerializer.class));
+
+ PipelineResult writeResult = writePipeline.run();
+ writeResult.waitUntilFinish();
+
+ PCollection<Row> rows =
+ readPipeline.apply(
+ KafkaIO.<byte[], byte[]>read()
+ .withBootstrapServers(options.getKafkaBootstrapServerAddresses())
+ .withTopic(options.getKafkaTopic())
+ .withKeyDeserializerAndCoder(
+ ByteArrayDeserializer.class, NullableCoder.of(ByteArrayCoder.of()))
+ .withValueDeserializerAndCoder(
+ ByteArrayDeserializer.class, NullableCoder.of(ByteArrayCoder.of()))
+ .externalWithMetadata());
+
+ PAssert.thatSingleton(rows)
+ .satisfies(
+ actualRow -> {
+ assertNull(actualRow.getString("key"));
+ assertNull(actualRow.getString("value"));
+ return null;
+ });
+
+ PipelineResult readResult = readPipeline.run();
+ readResult.waitUntilFinish();
+ }
+
private long readElementMetric(PipelineResult result, String namespace, String name) {
MetricsReader metricsReader = new MetricsReader(result, namespace);
return metricsReader.getCounterMetric(name);