You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by ni...@apache.org on 2022/06/09 07:15:48 UTC
[pulsar] 02/04: [pulsar-io] KCA: properly handle KeyValue that getNativeObject() returns: corrected type + support for KeyValue (#15025)
This is an automated email from the ASF dual-hosted git repository.
nicoloboschi pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 515a9bfd46ec0f2ffa3bb3d2d9cd646575574565
Author: Andrey Yegorov <86...@users.noreply.github.com>
AuthorDate: Thu Apr 14 08:55:23 2022 -0700
[pulsar-io] KCA: properly handle KeyValue that getNativeObject() returns: corrected type + support for KeyValue<GenericRecord, GenericRecord> (#15025)
(cherry picked from commit d76b5d40da2c9055102b3ecf3e5f6b358ac52732)
---
.../pulsar/io/kafka/connect/KafkaConnectSink.java | 18 ++-
.../io/kafka/connect/schema/KafkaConnectData.java | 20 +++
.../io/kafka/connect/KafkaConnectSinkTest.java | 136 +++++++++++++++++++--
.../kafka/connect/SchemaedFileStreamSinkTask.java | 47 ++++---
4 files changed, 193 insertions(+), 28 deletions(-)
diff --git a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSink.java b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSink.java
index 502154065d9..31f7cbf6399 100644
--- a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSink.java
+++ b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSink.java
@@ -51,9 +51,9 @@ import org.apache.kafka.connect.sink.SinkTask;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.api.schema.GenericObject;
import org.apache.pulsar.client.api.schema.KeyValueSchema;
+import org.apache.pulsar.common.schema.KeyValue;
import org.apache.pulsar.common.schema.SchemaType;
import org.apache.pulsar.functions.api.Record;
-import org.apache.pulsar.io.core.KeyValue;
import org.apache.pulsar.io.core.Sink;
import org.apache.pulsar.io.core.SinkContext;
import org.apache.pulsar.io.kafka.connect.schema.KafkaConnectData;
@@ -255,11 +255,21 @@ public class KafkaConnectSink implements Sink<GenericObject> {
&& sourceRecord.getSchema().getSchemaInfo() != null
&& sourceRecord.getSchema().getSchemaInfo().getType() == SchemaType.KEY_VALUE) {
KeyValueSchema kvSchema = (KeyValueSchema) sourceRecord.getSchema();
- KeyValue kv = (KeyValue) sourceRecord.getValue().getNativeObject();
keySchema = PulsarSchemaToKafkaSchema.getKafkaConnectSchema(kvSchema.getKeySchema());
valueSchema = PulsarSchemaToKafkaSchema.getKafkaConnectSchema(kvSchema.getValueSchema());
- key = kv.getKey();
- value = kv.getValue();
+
+ Object nativeObject = sourceRecord.getValue().getNativeObject();
+
+ if (nativeObject instanceof KeyValue) {
+ KeyValue kv = (KeyValue) nativeObject;
+ key = KafkaConnectData.getKafkaConnectData(kv.getKey(), keySchema);
+ value = KafkaConnectData.getKafkaConnectData(kv.getValue(), valueSchema);
+ } else if (nativeObject != null) {
+ throw new IllegalStateException("Cannot extract KeyValue data from " + nativeObject.getClass());
+ } else {
+ key = null;
+ value = null;
+ }
} else {
if (sourceRecord.getMessage().get().hasBase64EncodedKey()) {
key = sourceRecord.getMessage().get().getKeyBytes();
diff --git a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/schema/KafkaConnectData.java b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/schema/KafkaConnectData.java
index e39ce086a53..8374dd24bf7 100644
--- a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/schema/KafkaConnectData.java
+++ b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/schema/KafkaConnectData.java
@@ -33,6 +33,7 @@ import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.errors.DataException;
+import org.apache.pulsar.client.api.schema.GenericRecord;
@Slf4j
public class KafkaConnectData {
@@ -47,6 +48,10 @@ public class KafkaConnectData {
} else if (nativeObject instanceof GenericData.Record) {
GenericData.Record avroRecord = (GenericData.Record) nativeObject;
return avroAsConnectData(avroRecord, kafkaSchema);
+ } else if (nativeObject instanceof GenericRecord) {
+ // Pulsar's GenericRecord
+ GenericRecord pulsarGenericRecord = (GenericRecord) nativeObject;
+ return pulsarGenericRecordAsConnectData(pulsarGenericRecord, kafkaSchema);
}
return castToKafkaSchema(nativeObject, kafkaSchema);
@@ -134,6 +139,21 @@ public class KafkaConnectData {
return struct;
}
+ static Object pulsarGenericRecordAsConnectData(GenericRecord genericRecord, Schema kafkaSchema) {
+ if (kafkaSchema == null) {
+ if (genericRecord == null) {
+ return null;
+ }
+ throw new DataException("Don't know how to convert " + genericRecord + " to Connect data (schema is null).");
+ }
+
+ Struct struct = new Struct(kafkaSchema);
+ for (Field field : kafkaSchema.fields()) {
+ struct.put(field, getKafkaConnectData(genericRecord.getField(field.name()), field.schema()));
+ }
+ return struct;
+ }
+
// with some help of
// https://github.com/apache/kafka/blob/trunk/connect/json/src/main/java/org/apache/kafka/connect/json/JsonConverter.java
static Object jsonAsConnectData(JsonNode jsonNode, Schema kafkaSchema) {
diff --git a/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSinkTest.java b/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSinkTest.java
index 14c7dcd7ef8..23d9f1b5ce2 100644
--- a/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSinkTest.java
+++ b/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSinkTest.java
@@ -33,6 +33,7 @@ import org.apache.pulsar.client.api.ProducerConsumerBase;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.client.api.schema.Field;
import org.apache.pulsar.client.api.schema.GenericObject;
import org.apache.pulsar.client.api.schema.GenericRecord;
import org.apache.pulsar.client.api.schema.SchemaDefinition;
@@ -40,10 +41,11 @@ import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.client.impl.MessageImpl;
import org.apache.pulsar.client.impl.schema.AvroSchema;
import org.apache.pulsar.client.impl.schema.JSONSchema;
+import org.apache.pulsar.client.impl.schema.generic.GenericAvroRecord;
import org.apache.pulsar.client.util.MessageIdUtils;
+import org.apache.pulsar.common.schema.KeyValue;
import org.apache.pulsar.functions.api.Record;
import org.apache.pulsar.functions.source.PulsarRecord;
-import org.apache.pulsar.io.core.KeyValue;
import org.apache.pulsar.io.core.SinkContext;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
@@ -58,12 +60,14 @@ import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
+import java.util.AbstractMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
+import java.util.stream.Collectors;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doAnswer;
@@ -331,10 +335,10 @@ public class KafkaConnectSinkTest extends ProducerConsumerBase {
ObjectMapper om = new ObjectMapper();
Map<String, Object> result = om.readValue(lines.get(0), new TypeReference<Map<String, Object>>(){});
- assertEquals(result.get("key"), expectedKey);
- assertEquals(result.get("value"), expected);
- assertEquals(result.get("keySchema"), expectedKeySchema);
- assertEquals(result.get("valueSchema"), expectedSchema);
+ assertEquals(expectedKey, result.get("key"));
+ assertEquals(expected, result.get("value"));
+ assertEquals(expectedKeySchema, result.get("keySchema"));
+ assertEquals(expectedSchema, result.get("valueSchema"));
if (schema.getSchemaInfo().getType().isPrimitive()) {
// to test cast of primitive values
@@ -362,8 +366,18 @@ public class KafkaConnectSinkTest extends ProducerConsumerBase {
private GenericRecord getGenericRecord(Object value, Schema schema) {
final GenericRecord rec;
- if(value instanceof GenericRecord) {
+ if (value instanceof GenericRecord) {
rec = (GenericRecord) value;
+ } else if (value instanceof org.apache.avro.generic.GenericRecord) {
+ org.apache.avro.generic.GenericRecord avroRecord =
+ (org.apache.avro.generic.GenericRecord) value;
+ org.apache.avro.Schema avroSchema = (org.apache.avro.Schema) schema.getNativeSchema().get();
+ List<Field> fields = avroSchema.getFields()
+ .stream()
+ .map(f -> new Field(f.name(), f.pos()))
+ .collect(Collectors.toList());
+
+ return new GenericAvroRecord(new byte[]{ 1 }, avroSchema, fields, avroRecord);
} else {
rec = MockGenericObjectWrapper.builder()
.nativeObject(value)
@@ -592,7 +606,7 @@ public class KafkaConnectSinkTest extends ProducerConsumerBase {
}
@Test
- public void KeyValueSchemaTest() throws Exception {
+ public void schemaKeyValueSchemaTest() throws Exception {
KeyValue<Integer, String> kv = new KeyValue<>(11, "value");
SinkRecord sinkRecord = recordSchemaTest(kv, Schema.KeyValue(Schema.INT32, Schema.STRING), 11, "INT32", "value", "STRING");
String val = (String) sinkRecord.value();
@@ -601,6 +615,114 @@ public class KafkaConnectSinkTest extends ProducerConsumerBase {
Assert.assertEquals(key, 11);
}
+ @Test
+ public void schemaKeyValueAvroSchemaTest() throws Exception {
+ AvroSchema<PulsarSchemaToKafkaSchemaTest.StructWithAnnotations> pulsarAvroSchema
+ = AvroSchema.of(PulsarSchemaToKafkaSchemaTest.StructWithAnnotations.class);
+
+ final GenericData.Record key = new GenericData.Record(pulsarAvroSchema.getAvroSchema());
+ key.put("field1", 11);
+ key.put("field2", "key");
+ key.put("field3", 101L);
+
+ final GenericData.Record value = new GenericData.Record(pulsarAvroSchema.getAvroSchema());
+ value.put("field1", 10);
+ value.put("field2", "value");
+ value.put("field3", 100L);
+
+ Map<String, Object> expectedKey = new LinkedHashMap<>();
+ expectedKey.put("field1", 11);
+ expectedKey.put("field2", "key");
+ // integer is coming back from ObjectMapper
+ expectedKey.put("field3", 101);
+
+ Map<String, Object> expectedValue = new LinkedHashMap<>();
+ expectedValue.put("field1", 10);
+ expectedValue.put("field2", "value");
+ // integer is coming back from ObjectMapper
+ expectedValue.put("field3", 100);
+
+ KeyValue<GenericRecord, GenericRecord> kv = new KeyValue<>(getGenericRecord(key, pulsarAvroSchema),
+ getGenericRecord(value, pulsarAvroSchema));
+
+ SinkRecord sinkRecord = recordSchemaTest(kv, Schema.KeyValue(pulsarAvroSchema, pulsarAvroSchema),
+ expectedKey, "STRUCT", expectedValue, "STRUCT");
+
+ Struct outValue = (Struct) sinkRecord.value();
+ Assert.assertEquals((int)outValue.get("field1"), 10);
+ Assert.assertEquals((String)outValue.get("field2"), "value");
+ Assert.assertEquals((long)outValue.get("field3"), 100L);
+
+ Struct outKey = (Struct) sinkRecord.key();
+ Assert.assertEquals((int)outKey.get("field1"), 11);
+ Assert.assertEquals((String)outKey.get("field2"), "key");
+ Assert.assertEquals((long)outKey.get("field3"), 101L);
+ }
+
+ @Test
+ public void nullKeyValueSchemaTest() throws Exception {
+ props.put("kafkaConnectorSinkClass", SchemaedFileStreamSinkConnector.class.getCanonicalName());
+
+ KafkaConnectSink sink = new KafkaConnectSink();
+ sink.open(props, context);
+
+ Message msg = mock(MessageImpl.class);
+ // value is null
+ when(msg.getValue()).thenReturn(null);
+ when(msg.getKey()).thenReturn("key");
+ when(msg.hasKey()).thenReturn(true);
+ when(msg.getMessageId()).thenReturn(new MessageIdImpl(1, 0, 0));
+
+ final AtomicInteger status = new AtomicInteger(0);
+ Record<GenericObject> record = PulsarRecord.<String>builder()
+ .topicName("fake-topic")
+ .message(msg)
+ .schema(Schema.KeyValue(Schema.INT32, Schema.STRING))
+ .ackFunction(status::incrementAndGet)
+ .failFunction(status::decrementAndGet)
+ .build();
+
+ sink.write(record);
+ sink.flush();
+
+ // expect fail
+ assertEquals(status.get(), -1);
+
+ sink.close();
+ }
+
+ @Test
+ public void wrongKeyValueSchemaTest() throws Exception {
+ props.put("kafkaConnectorSinkClass", SchemaedFileStreamSinkConnector.class.getCanonicalName());
+
+ KafkaConnectSink sink = new KafkaConnectSink();
+ sink.open(props, context);
+
+ Message msg = mock(MessageImpl.class);
+ // value is of a wrong/unsupported type
+ when(msg.getValue()).thenReturn(new AbstractMap.SimpleEntry<>(11, "value"));
+ when(msg.getKey()).thenReturn("key");
+ when(msg.hasKey()).thenReturn(true);
+ when(msg.getMessageId()).thenReturn(new MessageIdImpl(1, 0, 0));
+
+ final AtomicInteger status = new AtomicInteger(0);
+ Record<GenericObject> record = PulsarRecord.<String>builder()
+ .topicName("fake-topic")
+ .message(msg)
+ .schema(Schema.KeyValue(Schema.INT32, Schema.STRING))
+ .ackFunction(status::incrementAndGet)
+ .failFunction(status::decrementAndGet)
+ .build();
+
+ sink.write(record);
+ sink.flush();
+
+ // expect fail
+ assertEquals(status.get(), -1);
+
+ sink.close();
+ }
+
@Test
public void offsetTest() throws Exception {
final AtomicLong entryId = new AtomicLong(0L);
diff --git a/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/SchemaedFileStreamSinkTask.java b/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/SchemaedFileStreamSinkTask.java
index 9821a58eb22..07b9117d2e4 100644
--- a/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/SchemaedFileStreamSinkTask.java
+++ b/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/SchemaedFileStreamSinkTask.java
@@ -21,6 +21,7 @@ package org.apache.pulsar.io.kafka.connect;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.Lists;
+import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.Struct;
@@ -37,6 +38,7 @@ import java.util.Map;
* A FileStreamSinkTask for testing that writes data other than just a value, i.e.:
* key, value, key and value schemas.
*/
+@Slf4j
public class SchemaedFileStreamSinkTask extends FileStreamSinkTask {
@Override
@@ -49,32 +51,28 @@ public class SchemaedFileStreamSinkTask extends FileStreamSinkTask {
? new String((byte[]) record.value(), StandardCharsets.US_ASCII)
: record.value();
+ Object key = record.keySchema() == Schema.BYTES_SCHEMA
+ ? new String((byte[]) record.key(), StandardCharsets.US_ASCII)
+ : record.key();
+
Map<String, Object> recOut = Maps.newHashMap();
recOut.put("keySchema", record.keySchema().type().toString());
recOut.put("valueSchema", record.valueSchema().type().toString());
- recOut.put("key", record.key());
- if (val instanceof Struct) {
- Map<String, Object> map = Maps.newHashMap();
- Struct struct = (Struct)val;
-
- // no recursion needed for tests
- for (Field f: struct.schema().fields()) {
- map.put(f.name(), struct.get(f));
- }
-
- recOut.put("value", map);
- } else {
- recOut.put("value", val);
- }
+ recOut.put("key", toWritableValue(key));
+ recOut.put("value", toWritableValue(val));
ObjectMapper om = new ObjectMapper();
try {
+ String valueAsString = om.writeValueAsString(recOut);
+
+ log.info("FileSink writing {}", valueAsString);
+
SinkRecord toSink = new SinkRecord(record.topic(),
record.kafkaPartition(),
- record.keySchema(),
- record.key(),
Schema.STRING_SCHEMA,
- om.writeValueAsString(recOut),
+ "", // blank key, real one is serialized with recOut
+ Schema.STRING_SCHEMA,
+ valueAsString,
record.kafkaOffset(),
record.timestamp(),
record.timestampType());
@@ -87,4 +85,19 @@ public class SchemaedFileStreamSinkTask extends FileStreamSinkTask {
super.put(out);
}
+ private Object toWritableValue(Object val) {
+ if (val instanceof Struct) {
+ Map<String, Object> map = Maps.newHashMap();
+ Struct struct = (Struct) val;
+
+ // no recursion needed for tests
+ for (Field f: struct.schema().fields()) {
+ map.put(f.name(), struct.get(f));
+ }
+ return map;
+ } else {
+ return val;
+ }
+ }
+
}