You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by ni...@apache.org on 2022/06/09 07:15:48 UTC

[pulsar] 02/04: [pulsar-io] KCA: properly handle KeyValue that getNativeObject() returns: corrected type + support for KeyValue (#15025)

This is an automated email from the ASF dual-hosted git repository.

nicoloboschi pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 515a9bfd46ec0f2ffa3bb3d2d9cd646575574565
Author: Andrey Yegorov <86...@users.noreply.github.com>
AuthorDate: Thu Apr 14 08:55:23 2022 -0700

    [pulsar-io] KCA: properly handle KeyValue that getNativeObject() returns: corrected type + support for KeyValue<GenericRecord, GenericRecord> (#15025)
    
    (cherry picked from commit d76b5d40da2c9055102b3ecf3e5f6b358ac52732)
---
 .../pulsar/io/kafka/connect/KafkaConnectSink.java  |  18 ++-
 .../io/kafka/connect/schema/KafkaConnectData.java  |  20 +++
 .../io/kafka/connect/KafkaConnectSinkTest.java     | 136 +++++++++++++++++++--
 .../kafka/connect/SchemaedFileStreamSinkTask.java  |  47 ++++---
 4 files changed, 193 insertions(+), 28 deletions(-)

diff --git a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSink.java b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSink.java
index 502154065d9..31f7cbf6399 100644
--- a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSink.java
+++ b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSink.java
@@ -51,9 +51,9 @@ import org.apache.kafka.connect.sink.SinkTask;
 import org.apache.pulsar.client.api.SubscriptionType;
 import org.apache.pulsar.client.api.schema.GenericObject;
 import org.apache.pulsar.client.api.schema.KeyValueSchema;
+import org.apache.pulsar.common.schema.KeyValue;
 import org.apache.pulsar.common.schema.SchemaType;
 import org.apache.pulsar.functions.api.Record;
-import org.apache.pulsar.io.core.KeyValue;
 import org.apache.pulsar.io.core.Sink;
 import org.apache.pulsar.io.core.SinkContext;
 import org.apache.pulsar.io.kafka.connect.schema.KafkaConnectData;
@@ -255,11 +255,21 @@ public class KafkaConnectSink implements Sink<GenericObject> {
                 && sourceRecord.getSchema().getSchemaInfo() != null
                 && sourceRecord.getSchema().getSchemaInfo().getType() == SchemaType.KEY_VALUE) {
             KeyValueSchema kvSchema = (KeyValueSchema) sourceRecord.getSchema();
-            KeyValue kv = (KeyValue) sourceRecord.getValue().getNativeObject();
             keySchema = PulsarSchemaToKafkaSchema.getKafkaConnectSchema(kvSchema.getKeySchema());
             valueSchema = PulsarSchemaToKafkaSchema.getKafkaConnectSchema(kvSchema.getValueSchema());
-            key = kv.getKey();
-            value = kv.getValue();
+
+            Object nativeObject = sourceRecord.getValue().getNativeObject();
+
+            if (nativeObject instanceof KeyValue) {
+                KeyValue kv = (KeyValue) nativeObject;
+                key = KafkaConnectData.getKafkaConnectData(kv.getKey(), keySchema);
+                value = KafkaConnectData.getKafkaConnectData(kv.getValue(), valueSchema);
+            } else if (nativeObject != null) {
+                throw new IllegalStateException("Cannot extract KeyValue data from " + nativeObject.getClass());
+            } else {
+                key = null;
+                value = null;
+            }
         } else {
             if (sourceRecord.getMessage().get().hasBase64EncodedKey()) {
                 key = sourceRecord.getMessage().get().getKeyBytes();
diff --git a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/schema/KafkaConnectData.java b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/schema/KafkaConnectData.java
index e39ce086a53..8374dd24bf7 100644
--- a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/schema/KafkaConnectData.java
+++ b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/schema/KafkaConnectData.java
@@ -33,6 +33,7 @@ import org.apache.kafka.connect.data.Field;
 import org.apache.kafka.connect.data.Schema;
 import org.apache.kafka.connect.data.Struct;
 import org.apache.kafka.connect.errors.DataException;
+import org.apache.pulsar.client.api.schema.GenericRecord;
 
 @Slf4j
 public class KafkaConnectData {
@@ -47,6 +48,10 @@ public class KafkaConnectData {
         } else if (nativeObject instanceof GenericData.Record) {
             GenericData.Record avroRecord = (GenericData.Record) nativeObject;
             return avroAsConnectData(avroRecord, kafkaSchema);
+        } else if (nativeObject instanceof GenericRecord) {
+            // Pulsar's GenericRecord
+            GenericRecord pulsarGenericRecord = (GenericRecord) nativeObject;
+            return pulsarGenericRecordAsConnectData(pulsarGenericRecord, kafkaSchema);
         }
 
         return castToKafkaSchema(nativeObject, kafkaSchema);
@@ -134,6 +139,21 @@ public class KafkaConnectData {
         return struct;
     }
 
+    static Object pulsarGenericRecordAsConnectData(GenericRecord genericRecord, Schema kafkaSchema) {
+        if (kafkaSchema == null) {
+            if (genericRecord == null) {
+                return null;
+            }
+            throw new DataException("Don't know how to convert " + genericRecord + " to Connect data (schema is null).");
+        }
+
+        Struct struct = new Struct(kafkaSchema);
+        for (Field field : kafkaSchema.fields()) {
+            struct.put(field, getKafkaConnectData(genericRecord.getField(field.name()), field.schema()));
+        }
+        return struct;
+    }
+
     // with some help of
     // https://github.com/apache/kafka/blob/trunk/connect/json/src/main/java/org/apache/kafka/connect/json/JsonConverter.java
     static Object jsonAsConnectData(JsonNode jsonNode, Schema kafkaSchema) {
diff --git a/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSinkTest.java b/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSinkTest.java
index 14c7dcd7ef8..23d9f1b5ce2 100644
--- a/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSinkTest.java
+++ b/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSinkTest.java
@@ -33,6 +33,7 @@ import org.apache.pulsar.client.api.ProducerConsumerBase;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.client.api.schema.Field;
 import org.apache.pulsar.client.api.schema.GenericObject;
 import org.apache.pulsar.client.api.schema.GenericRecord;
 import org.apache.pulsar.client.api.schema.SchemaDefinition;
@@ -40,10 +41,11 @@ import org.apache.pulsar.client.impl.MessageIdImpl;
 import org.apache.pulsar.client.impl.MessageImpl;
 import org.apache.pulsar.client.impl.schema.AvroSchema;
 import org.apache.pulsar.client.impl.schema.JSONSchema;
+import org.apache.pulsar.client.impl.schema.generic.GenericAvroRecord;
 import org.apache.pulsar.client.util.MessageIdUtils;
+import org.apache.pulsar.common.schema.KeyValue;
 import org.apache.pulsar.functions.api.Record;
 import org.apache.pulsar.functions.source.PulsarRecord;
-import org.apache.pulsar.io.core.KeyValue;
 import org.apache.pulsar.io.core.SinkContext;
 import org.mockito.Mockito;
 import org.mockito.invocation.InvocationOnMock;
@@ -58,12 +60,14 @@ import java.nio.charset.StandardCharsets;
 import java.nio.file.Files;
 import java.nio.file.Path;
 import java.nio.file.Paths;
+import java.util.AbstractMap;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.UUID;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
+import java.util.stream.Collectors;
 
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.Mockito.doAnswer;
@@ -331,10 +335,10 @@ public class KafkaConnectSinkTest extends ProducerConsumerBase  {
         ObjectMapper om = new ObjectMapper();
         Map<String, Object> result = om.readValue(lines.get(0), new TypeReference<Map<String, Object>>(){});
 
-        assertEquals(result.get("key"), expectedKey);
-        assertEquals(result.get("value"), expected);
-        assertEquals(result.get("keySchema"), expectedKeySchema);
-        assertEquals(result.get("valueSchema"), expectedSchema);
+        assertEquals(expectedKey, result.get("key"));
+        assertEquals(expected, result.get("value"));
+        assertEquals(expectedKeySchema, result.get("keySchema"));
+        assertEquals(expectedSchema, result.get("valueSchema"));
 
         if (schema.getSchemaInfo().getType().isPrimitive()) {
             // to test cast of primitive values
@@ -362,8 +366,18 @@ public class KafkaConnectSinkTest extends ProducerConsumerBase  {
 
     private GenericRecord getGenericRecord(Object value, Schema schema) {
         final GenericRecord rec;
-        if(value instanceof GenericRecord) {
+        if (value instanceof GenericRecord) {
             rec = (GenericRecord) value;
+        } else if (value instanceof org.apache.avro.generic.GenericRecord) {
+            org.apache.avro.generic.GenericRecord avroRecord =
+                    (org.apache.avro.generic.GenericRecord) value;
+            org.apache.avro.Schema avroSchema = (org.apache.avro.Schema) schema.getNativeSchema().get();
+            List<Field> fields = avroSchema.getFields()
+                    .stream()
+                    .map(f -> new Field(f.name(), f.pos()))
+                    .collect(Collectors.toList());
+
+            return new GenericAvroRecord(new byte[]{ 1 }, avroSchema, fields, avroRecord);
         } else {
             rec = MockGenericObjectWrapper.builder()
                     .nativeObject(value)
@@ -592,7 +606,7 @@ public class KafkaConnectSinkTest extends ProducerConsumerBase  {
     }
 
     @Test
-    public void KeyValueSchemaTest() throws Exception {
+    public void schemaKeyValueSchemaTest() throws Exception {
         KeyValue<Integer, String> kv = new KeyValue<>(11, "value");
         SinkRecord sinkRecord = recordSchemaTest(kv, Schema.KeyValue(Schema.INT32, Schema.STRING), 11, "INT32", "value", "STRING");
         String val = (String) sinkRecord.value();
@@ -601,6 +615,114 @@ public class KafkaConnectSinkTest extends ProducerConsumerBase  {
         Assert.assertEquals(key, 11);
     }
 
+    @Test
+    public void schemaKeyValueAvroSchemaTest() throws Exception {
+        AvroSchema<PulsarSchemaToKafkaSchemaTest.StructWithAnnotations> pulsarAvroSchema
+                = AvroSchema.of(PulsarSchemaToKafkaSchemaTest.StructWithAnnotations.class);
+
+        final GenericData.Record key = new GenericData.Record(pulsarAvroSchema.getAvroSchema());
+        key.put("field1", 11);
+        key.put("field2", "key");
+        key.put("field3", 101L);
+
+        final GenericData.Record value = new GenericData.Record(pulsarAvroSchema.getAvroSchema());
+        value.put("field1", 10);
+        value.put("field2", "value");
+        value.put("field3", 100L);
+
+        Map<String, Object> expectedKey = new LinkedHashMap<>();
+        expectedKey.put("field1", 11);
+        expectedKey.put("field2", "key");
+        // integer is coming back from ObjectMapper
+        expectedKey.put("field3", 101);
+
+        Map<String, Object> expectedValue = new LinkedHashMap<>();
+        expectedValue.put("field1", 10);
+        expectedValue.put("field2", "value");
+        // integer is coming back from ObjectMapper
+        expectedValue.put("field3", 100);
+
+        KeyValue<GenericRecord, GenericRecord> kv = new KeyValue<>(getGenericRecord(key, pulsarAvroSchema),
+                            getGenericRecord(value, pulsarAvroSchema));
+
+        SinkRecord sinkRecord = recordSchemaTest(kv, Schema.KeyValue(pulsarAvroSchema, pulsarAvroSchema),
+                expectedKey, "STRUCT", expectedValue, "STRUCT");
+
+        Struct outValue = (Struct) sinkRecord.value();
+        Assert.assertEquals((int)outValue.get("field1"), 10);
+        Assert.assertEquals((String)outValue.get("field2"), "value");
+        Assert.assertEquals((long)outValue.get("field3"), 100L);
+
+        Struct outKey = (Struct) sinkRecord.key();
+        Assert.assertEquals((int)outKey.get("field1"), 11);
+        Assert.assertEquals((String)outKey.get("field2"), "key");
+        Assert.assertEquals((long)outKey.get("field3"), 101L);
+    }
+
+    @Test
+    public void nullKeyValueSchemaTest() throws Exception {
+        props.put("kafkaConnectorSinkClass", SchemaedFileStreamSinkConnector.class.getCanonicalName());
+
+        KafkaConnectSink sink = new KafkaConnectSink();
+        sink.open(props, context);
+
+        Message msg = mock(MessageImpl.class);
+        // value is null
+        when(msg.getValue()).thenReturn(null);
+        when(msg.getKey()).thenReturn("key");
+        when(msg.hasKey()).thenReturn(true);
+        when(msg.getMessageId()).thenReturn(new MessageIdImpl(1, 0, 0));
+
+        final AtomicInteger status = new AtomicInteger(0);
+        Record<GenericObject> record = PulsarRecord.<String>builder()
+                .topicName("fake-topic")
+                .message(msg)
+                .schema(Schema.KeyValue(Schema.INT32, Schema.STRING))
+                .ackFunction(status::incrementAndGet)
+                .failFunction(status::decrementAndGet)
+                .build();
+
+        sink.write(record);
+        sink.flush();
+
+        // expect fail
+        assertEquals(status.get(), -1);
+
+        sink.close();
+    }
+
+    @Test
+    public void wrongKeyValueSchemaTest() throws Exception {
+        props.put("kafkaConnectorSinkClass", SchemaedFileStreamSinkConnector.class.getCanonicalName());
+
+        KafkaConnectSink sink = new KafkaConnectSink();
+        sink.open(props, context);
+
+        Message msg = mock(MessageImpl.class);
+        // value is of a wrong/unsupported type
+        when(msg.getValue()).thenReturn(new AbstractMap.SimpleEntry<>(11, "value"));
+        when(msg.getKey()).thenReturn("key");
+        when(msg.hasKey()).thenReturn(true);
+        when(msg.getMessageId()).thenReturn(new MessageIdImpl(1, 0, 0));
+
+        final AtomicInteger status = new AtomicInteger(0);
+        Record<GenericObject> record = PulsarRecord.<String>builder()
+                .topicName("fake-topic")
+                .message(msg)
+                .schema(Schema.KeyValue(Schema.INT32, Schema.STRING))
+                .ackFunction(status::incrementAndGet)
+                .failFunction(status::decrementAndGet)
+                .build();
+
+        sink.write(record);
+        sink.flush();
+
+        // expect fail
+        assertEquals(status.get(), -1);
+
+        sink.close();
+    }
+
     @Test
     public void offsetTest() throws Exception {
         final AtomicLong entryId = new AtomicLong(0L);
diff --git a/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/SchemaedFileStreamSinkTask.java b/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/SchemaedFileStreamSinkTask.java
index 9821a58eb22..07b9117d2e4 100644
--- a/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/SchemaedFileStreamSinkTask.java
+++ b/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/SchemaedFileStreamSinkTask.java
@@ -21,6 +21,7 @@ package org.apache.pulsar.io.kafka.connect;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.collect.Lists;
+import lombok.extern.slf4j.Slf4j;
 import org.apache.kafka.connect.data.Field;
 import org.apache.kafka.connect.data.Schema;
 import org.apache.kafka.connect.data.Struct;
@@ -37,6 +38,7 @@ import java.util.Map;
  * A FileStreamSinkTask for testing that writes data other than just a value, i.e.:
  * key, value, key and value schemas.
  */
+@Slf4j
 public class SchemaedFileStreamSinkTask extends FileStreamSinkTask {
 
     @Override
@@ -49,32 +51,28 @@ public class SchemaedFileStreamSinkTask extends FileStreamSinkTask {
                     ? new String((byte[]) record.value(), StandardCharsets.US_ASCII)
                     : record.value();
 
+            Object key = record.keySchema() == Schema.BYTES_SCHEMA
+                    ? new String((byte[]) record.key(), StandardCharsets.US_ASCII)
+                    : record.key();
+
             Map<String, Object> recOut = Maps.newHashMap();
             recOut.put("keySchema", record.keySchema().type().toString());
             recOut.put("valueSchema", record.valueSchema().type().toString());
-            recOut.put("key", record.key());
-            if (val instanceof Struct) {
-                Map<String, Object> map = Maps.newHashMap();
-                Struct struct = (Struct)val;
-
-                // no recursion needed for tests
-                for (Field f: struct.schema().fields()) {
-                    map.put(f.name(), struct.get(f));
-                }
-
-                recOut.put("value", map);
-            } else {
-                recOut.put("value", val);
-            }
+            recOut.put("key", toWritableValue(key));
+            recOut.put("value", toWritableValue(val));
 
             ObjectMapper om = new ObjectMapper();
             try {
+                String valueAsString = om.writeValueAsString(recOut);
+
+                log.info("FileSink writing {}", valueAsString);
+
                 SinkRecord toSink = new SinkRecord(record.topic(),
                         record.kafkaPartition(),
-                        record.keySchema(),
-                        record.key(),
                         Schema.STRING_SCHEMA,
-                        om.writeValueAsString(recOut),
+                        "", // blank key, real one is serialized with recOut
+                        Schema.STRING_SCHEMA,
+                        valueAsString,
                         record.kafkaOffset(),
                         record.timestamp(),
                         record.timestampType());
@@ -87,4 +85,19 @@ public class SchemaedFileStreamSinkTask extends FileStreamSinkTask {
         super.put(out);
     }
 
+    private Object toWritableValue(Object val) {
+        if (val instanceof Struct) {
+            Map<String, Object> map = Maps.newHashMap();
+            Struct struct = (Struct) val;
+
+            // no recursion needed for tests
+            for (Field f: struct.schema().fields()) {
+                map.put(f.name(), struct.get(f));
+            }
+            return map;
+        } else {
+            return val;
+        }
+    }
+
 }