You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ch...@apache.org on 2022/06/01 17:59:27 UTC

[beam] branch master updated: [BEAM-14297] add nullable annotations and an integration test (#17742)

This is an automated email from the ASF dual-hosted git repository.

chamikara pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new f765a91b1fd [BEAM-14297] add nullable annotations and an integration test (#17742)
f765a91b1fd is described below

commit f765a91b1fdf1fb7b9fe9aebdc2fde873a179835
Author: johnjcasey <95...@users.noreply.github.com>
AuthorDate: Wed Jun 1 13:59:22 2022 -0400

    [BEAM-14297] add nullable annotations and an integration test (#17742)
    
    * [BEAM-14297] add nullable annotations and an integration test
    
    * [BEAM-14297] verify null kv is returned from kafka
    
    * [BEAM-14297] spotless
---
 .../java/org/apache/beam/sdk/io/kafka/KafkaIO.java |  4 +-
 .../org/apache/beam/sdk/io/kafka/KafkaIOIT.java    | 60 ++++++++++++++++++++++
 2 files changed, 62 insertions(+), 2 deletions(-)

diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
index 34ea6ef51b9..509aa17103e 100644
--- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
+++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
@@ -1673,8 +1673,8 @@ public class KafkaIO {
     int partition;
     long offset;
     long timestamp;
-    byte[] key;
-    byte[] value;
+    byte @Nullable [] key;
+    byte @Nullable [] value;
     List<KafkaHeader> headers;
     int timestampTypeId;
     String timestampTypeName;
diff --git a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOIT.java b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOIT.java
index 173a10f1234..68ce1e44192 100644
--- a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOIT.java
+++ b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOIT.java
@@ -19,15 +19,20 @@ package org.apache.beam.sdk.io.kafka;
 
 import static org.apache.beam.sdk.io.synthetic.SyntheticOptions.fromJsonString;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
 
 import com.google.cloud.Timestamp;
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.UUID;
 import java.util.function.BiFunction;
 import org.apache.beam.sdk.PipelineResult;
+import org.apache.beam.sdk.coders.ByteArrayCoder;
+import org.apache.beam.sdk.coders.NullableCoder;
 import org.apache.beam.sdk.io.Read;
 import org.apache.beam.sdk.io.common.HashingFn;
 import org.apache.beam.sdk.io.common.IOITHelper;
@@ -48,13 +53,17 @@ import org.apache.beam.sdk.testutils.metrics.MetricsReader;
 import org.apache.beam.sdk.testutils.metrics.TimeMonitor;
 import org.apache.beam.sdk.testutils.publishing.InfluxDBSettings;
 import org.apache.beam.sdk.transforms.Combine;
+import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.MapElements;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.SimpleFunction;
+import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.Row;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableSet;
+import org.apache.kafka.common.serialization.ByteArrayDeserializer;
 import org.apache.kafka.common.serialization.ByteArraySerializer;
 import org.checkerframework.checker.nullness.qual.Nullable;
 import org.joda.time.Duration;
@@ -203,6 +212,57 @@ public class KafkaIOIT {
     }
   }
 
+  // This test roundtrips a single KV<Null,Null> to verify that externalWithMetadata
+  // can handle null keys and values correctly.
+  @Test
+  public void testKafkaIOExternalRoundtripWithMetadataAndNullKeysAndValues() {
+
+    List<byte[]> nullList = new ArrayList<>();
+    nullList.add(null);
+    writePipeline
+        .apply(Create.of(nullList))
+        .apply(
+            ParDo.of(
+                new DoFn<byte[], KV<byte[], byte[]>>() {
+                  @ProcessElement
+                  public void processElement(
+                      @Element byte[] element, OutputReceiver<KV<byte[], byte[]>> receiver) {
+                    receiver.output(KV.of(element, element));
+                  }
+                }))
+        .apply(
+            KafkaIO.<byte[], byte[]>write()
+                .withBootstrapServers(options.getKafkaBootstrapServerAddresses())
+                .withTopic(options.getKafkaTopic())
+                .withKeySerializer(ByteArraySerializer.class)
+                .withValueSerializer(ByteArraySerializer.class));
+
+    PipelineResult writeResult = writePipeline.run();
+    writeResult.waitUntilFinish();
+
+    PCollection<Row> rows =
+        readPipeline.apply(
+            KafkaIO.<byte[], byte[]>read()
+                .withBootstrapServers(options.getKafkaBootstrapServerAddresses())
+                .withTopic(options.getKafkaTopic())
+                .withKeyDeserializerAndCoder(
+                    ByteArrayDeserializer.class, NullableCoder.of(ByteArrayCoder.of()))
+                .withValueDeserializerAndCoder(
+                    ByteArrayDeserializer.class, NullableCoder.of(ByteArrayCoder.of()))
+                .externalWithMetadata());
+
+    PAssert.thatSingleton(rows)
+        .satisfies(
+            actualRow -> {
+              assertNull(actualRow.getString("key"));
+              assertNull(actualRow.getString("value"));
+              return null;
+            });
+
+    PipelineResult readResult = readPipeline.run();
+    readResult.waitUntilFinish();
+  }
+
   private long readElementMetric(PipelineResult result, String namespace, String name) {
     MetricsReader metricsReader = new MetricsReader(result, namespace);
     return metricsReader.getCounterMetric(name);