You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by ni...@apache.org on 2022/10/07 19:36:49 UTC
[pulsar] branch branch-2.11 updated: [fix][connector] Kinesis sink: fix NPE with KeyValue schema and no value (#17959)
This is an automated email from the ASF dual-hosted git repository.
nicoloboschi pushed a commit to branch branch-2.11
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.11 by this push:
new 9d403e51cb3 [fix][connector] Kinesis sink: fix NPE with KeyValue schema and no value (#17959)
9d403e51cb3 is described below
commit 9d403e51cb39f29ce5819d501986922a1b1d62be
Author: Nicolò Boschi <bo...@gmail.com>
AuthorDate: Fri Oct 7 21:35:27 2022 +0200
[fix][connector] Kinesis sink: fix NPE with KeyValue schema and no value (#17959)
(cherry picked from commit 65fad77a98bb8526656adac1d5e07cc4381777bf)
---
.../java/org/apache/pulsar/io/kinesis/Utils.java | 12 ++--
.../org/apache/pulsar/io/kinesis/UtilsTest.java | 73 ++++++++++++++++++++++
2 files changed, 81 insertions(+), 4 deletions(-)
diff --git a/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/Utils.java b/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/Utils.java
index a46e52806be..a3ebfb94be9 100644
--- a/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/Utils.java
+++ b/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/Utils.java
@@ -253,10 +253,14 @@ public class Utils {
org.apache.pulsar.common.schema.KeyValue<GenericObject, GenericObject> keyValue =
(org.apache.pulsar.common.schema.KeyValue<GenericObject, GenericObject>) val;
Map<String, Object> jsonKeyValue = new HashMap<>();
- jsonKeyValue.put("key", toJsonSerializable(keyValueSchema.getKeySchema(),
- keyValue.getKey().getNativeObject()));
- jsonKeyValue.put("value", toJsonSerializable(keyValueSchema.getValueSchema(),
- keyValue.getValue().getNativeObject()));
+ if (keyValue.getKey() != null) {
+ jsonKeyValue.put("key", toJsonSerializable(keyValueSchema.getKeySchema(),
+ keyValue.getKey().getNativeObject()));
+ }
+ if (keyValue.getValue() != null) {
+ jsonKeyValue.put("value", toJsonSerializable(keyValueSchema.getValueSchema(),
+ keyValue.getValue().getNativeObject()));
+ }
return jsonKeyValue;
case AVRO:
return JsonConverter.toJson((org.apache.avro.generic.GenericRecord) val);
diff --git a/pulsar-io/kinesis/src/test/java/org/apache/pulsar/io/kinesis/UtilsTest.java b/pulsar-io/kinesis/src/test/java/org/apache/pulsar/io/kinesis/UtilsTest.java
index 3b128fcf65b..5458008f5f6 100644
--- a/pulsar-io/kinesis/src/test/java/org/apache/pulsar/io/kinesis/UtilsTest.java
+++ b/pulsar-io/kinesis/src/test/java/org/apache/pulsar/io/kinesis/UtilsTest.java
@@ -29,6 +29,7 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.gson.Gson;
import java.nio.ByteBuffer;
+import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
@@ -445,6 +446,78 @@ public class UtilsTest {
+ ".a\":\"1\",\"payload.key.b\":1,\"properties.prop-key\":\"prop-value\",\"eventTime\":1648502845803}");
}
+ @Test(dataProvider = "schemaType")
+ public void testKeyValueSerializeNoValue(SchemaType schemaType) throws Exception {
+ RecordSchemaBuilder keySchemaBuilder = org.apache.pulsar.client.api.schema.SchemaBuilder.record("key");
+ keySchemaBuilder.field("a").type(SchemaType.STRING).optional().defaultValue(null);
+ GenericSchema<GenericRecord> keySchema = Schema.generic(keySchemaBuilder.build(schemaType));
+
+ RecordSchemaBuilder valueSchemaBuilder = org.apache.pulsar.client.api.schema.SchemaBuilder.record("value");
+ valueSchemaBuilder.field("c").type(SchemaType.STRING).optional().defaultValue(null);
+ GenericSchema<GenericRecord> valueSchema = Schema.generic(valueSchemaBuilder.build(schemaType));
+
+ Schema<org.apache.pulsar.common.schema.KeyValue<GenericRecord, GenericRecord>> keyValueSchema =
+ Schema.KeyValue(keySchema, valueSchema, KeyValueEncodingType.INLINE);
+ org.apache.pulsar.common.schema.KeyValue<GenericRecord, GenericRecord>
+ keyValue = new org.apache.pulsar.common.schema.KeyValue<>(null, null);
+ GenericObject genericObject = new GenericObject() {
+ @Override
+ public SchemaType getSchemaType() {
+ return SchemaType.KEY_VALUE;
+ }
+
+ @Override
+ public Object getNativeObject() {
+ return keyValue;
+ }
+ };
+
+ Record<GenericObject> genericObjectRecord = new Record<>() {
+ @Override
+ public Optional<String> getTopicName() {
+ return Optional.of("data-ks1.table1");
+ }
+
+ @Override
+ public org.apache.pulsar.client.api.Schema getSchema() {
+ return keyValueSchema;
+ }
+
+ @Override
+ public Optional<String> getKey() {
+ return Optional.of("message-key");
+ }
+
+ @Override
+ public GenericObject getValue() {
+ return genericObject;
+ }
+
+ @Override
+ public Map<String, String> getProperties() {
+ return Collections.emptyMap();
+ }
+
+ @Override
+ public Optional<Long> getEventTime() {
+ return Optional.of(1648502845803L);
+ }
+ };
+
+ ObjectMapper objectMapper = new ObjectMapper().setSerializationInclusion(JsonInclude.Include.NON_NULL);
+ String json = Utils.serializeRecordToJsonExpandingValue(objectMapper, genericObjectRecord, false);
+
+ assertEquals(json, "{\"topicName\":\"data-ks1.table1\",\"key\":\"message-key\","
+ + "\"payload\":{},"
+ + "\"eventTime\":1648502845803}");
+
+ json = Utils.serializeRecordToJsonExpandingValue(objectMapper, genericObjectRecord, true);
+
+ assertEquals(json, "{\"topicName\":\"data-ks1.table1\",\"key\":\"message-key\","
+ + "\"payload\":{},"
+ + "\"eventTime\":1648502845803}");
+ }
+
@Test
public void testPrimitiveSerializeRecordToJsonExpandingValue() throws Exception {
GenericObject genericObject = new GenericObject() {