You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by ni...@apache.org on 2022/10/07 19:36:49 UTC

[pulsar] branch branch-2.11 updated: [fix][connector] Kinesis sink: fix NPE with KeyValue schema and no value (#17959)

This is an automated email from the ASF dual-hosted git repository.

nicoloboschi pushed a commit to branch branch-2.11
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.11 by this push:
     new 9d403e51cb3 [fix][connector] Kinesis sink: fix NPE with KeyValue schema and no value (#17959)
9d403e51cb3 is described below

commit 9d403e51cb39f29ce5819d501986922a1b1d62be
Author: Nicolò Boschi <bo...@gmail.com>
AuthorDate: Fri Oct 7 21:35:27 2022 +0200

    [fix][connector] Kinesis sink: fix NPE with KeyValue schema and no value (#17959)
    
    (cherry picked from commit 65fad77a98bb8526656adac1d5e07cc4381777bf)
---
 .../java/org/apache/pulsar/io/kinesis/Utils.java   | 12 ++--
 .../org/apache/pulsar/io/kinesis/UtilsTest.java    | 73 ++++++++++++++++++++++
 2 files changed, 81 insertions(+), 4 deletions(-)

diff --git a/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/Utils.java b/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/Utils.java
index a46e52806be..a3ebfb94be9 100644
--- a/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/Utils.java
+++ b/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/Utils.java
@@ -253,10 +253,14 @@ public class Utils {
                 org.apache.pulsar.common.schema.KeyValue<GenericObject, GenericObject> keyValue =
                         (org.apache.pulsar.common.schema.KeyValue<GenericObject, GenericObject>) val;
                 Map<String, Object> jsonKeyValue = new HashMap<>();
-                jsonKeyValue.put("key", toJsonSerializable(keyValueSchema.getKeySchema(),
-                        keyValue.getKey().getNativeObject()));
-                jsonKeyValue.put("value", toJsonSerializable(keyValueSchema.getValueSchema(),
-                        keyValue.getValue().getNativeObject()));
+                if (keyValue.getKey() != null) {
+                    jsonKeyValue.put("key", toJsonSerializable(keyValueSchema.getKeySchema(),
+                            keyValue.getKey().getNativeObject()));
+                }
+                if (keyValue.getValue() != null) {
+                    jsonKeyValue.put("value", toJsonSerializable(keyValueSchema.getValueSchema(),
+                            keyValue.getValue().getNativeObject()));
+                }
                 return jsonKeyValue;
             case AVRO:
                 return JsonConverter.toJson((org.apache.avro.generic.GenericRecord) val);
diff --git a/pulsar-io/kinesis/src/test/java/org/apache/pulsar/io/kinesis/UtilsTest.java b/pulsar-io/kinesis/src/test/java/org/apache/pulsar/io/kinesis/UtilsTest.java
index 3b128fcf65b..5458008f5f6 100644
--- a/pulsar-io/kinesis/src/test/java/org/apache/pulsar/io/kinesis/UtilsTest.java
+++ b/pulsar-io/kinesis/src/test/java/org/apache/pulsar/io/kinesis/UtilsTest.java
@@ -29,6 +29,7 @@ import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.gson.Gson;
 
 import java.nio.ByteBuffer;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Optional;
@@ -445,6 +446,78 @@ public class UtilsTest {
                 + ".a\":\"1\",\"payload.key.b\":1,\"properties.prop-key\":\"prop-value\",\"eventTime\":1648502845803}");
     }
 
+    @Test(dataProvider = "schemaType")
+    public void testKeyValueSerializeNoValue(SchemaType schemaType) throws Exception {
+        RecordSchemaBuilder keySchemaBuilder = org.apache.pulsar.client.api.schema.SchemaBuilder.record("key");
+        keySchemaBuilder.field("a").type(SchemaType.STRING).optional().defaultValue(null);
+        GenericSchema<GenericRecord> keySchema = Schema.generic(keySchemaBuilder.build(schemaType));
+
+        RecordSchemaBuilder valueSchemaBuilder = org.apache.pulsar.client.api.schema.SchemaBuilder.record("value");
+        valueSchemaBuilder.field("c").type(SchemaType.STRING).optional().defaultValue(null);
+        GenericSchema<GenericRecord> valueSchema = Schema.generic(valueSchemaBuilder.build(schemaType));
+
+        Schema<org.apache.pulsar.common.schema.KeyValue<GenericRecord, GenericRecord>> keyValueSchema =
+                Schema.KeyValue(keySchema, valueSchema, KeyValueEncodingType.INLINE);
+        org.apache.pulsar.common.schema.KeyValue<GenericRecord, GenericRecord>
+                keyValue = new org.apache.pulsar.common.schema.KeyValue<>(null, null);
+        GenericObject genericObject = new GenericObject() {
+            @Override
+            public SchemaType getSchemaType() {
+                return SchemaType.KEY_VALUE;
+            }
+
+            @Override
+            public Object getNativeObject() {
+                return keyValue;
+            }
+        };
+
+        Record<GenericObject> genericObjectRecord = new Record<>() {
+            @Override
+            public Optional<String> getTopicName() {
+                return Optional.of("data-ks1.table1");
+            }
+
+            @Override
+            public org.apache.pulsar.client.api.Schema getSchema() {
+                return keyValueSchema;
+            }
+
+            @Override
+            public Optional<String> getKey() {
+                return Optional.of("message-key");
+            }
+
+            @Override
+            public GenericObject getValue() {
+                return genericObject;
+            }
+
+            @Override
+            public Map<String, String> getProperties() {
+                return Collections.emptyMap();
+            }
+
+            @Override
+            public Optional<Long> getEventTime() {
+                return Optional.of(1648502845803L);
+            }
+        };
+
+        ObjectMapper objectMapper = new ObjectMapper().setSerializationInclusion(JsonInclude.Include.NON_NULL);
+        String json = Utils.serializeRecordToJsonExpandingValue(objectMapper, genericObjectRecord, false);
+
+        assertEquals(json, "{\"topicName\":\"data-ks1.table1\",\"key\":\"message-key\","
+                + "\"payload\":{},"
+                + "\"eventTime\":1648502845803}");
+
+        json = Utils.serializeRecordToJsonExpandingValue(objectMapper, genericObjectRecord, true);
+
+        assertEquals(json, "{\"topicName\":\"data-ks1.table1\",\"key\":\"message-key\","
+                + "\"payload\":{},"
+                + "\"eventTime\":1648502845803}");
+    }
+
     @Test
     public void testPrimitiveSerializeRecordToJsonExpandingValue() throws Exception {
         GenericObject genericObject = new GenericObject() {