You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by te...@apache.org on 2022/08/19 02:39:04 UTC

[pulsar] branch branch-2.11 updated: [fix][function] Use the schema set by the Function when it returns a Record (#17142)

This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch branch-2.11
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.11 by this push:
     new 5affcd796ad [fix][function] Use the schema set by the Function when it returns a Record (#17142)
5affcd796ad is described below

commit 5affcd796ad26e19ff1c0e90b1e7016f5e757ad6
Author: Christophe Bornet <cb...@hotmail.com>
AuthorDate: Fri Aug 19 04:37:12 2022 +0200

    [fix][function] Use the schema set by the Function when it returns a Record (#17142)
---
 .../functions/instance/AbstractSinkRecord.java     |   2 +
 .../functions/instance/OutputRecordSinkRecord.java |   7 +-
 .../pulsar/functions/instance/SinkRecord.java      |   6 +
 .../apache/pulsar/functions/sink/PulsarSink.java   |   6 +-
 .../functions/RemoveAvroFieldRecordFunction.java   | 164 +++++++++++++++++++++
 .../functions/PulsarFunctionsTestBase.java         |   3 +
 .../functions/java/PulsarFunctionsJavaTest.java    |  14 +-
 7 files changed, 197 insertions(+), 5 deletions(-)

diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/AbstractSinkRecord.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/AbstractSinkRecord.java
index 2adcbd0065f..6758558d331 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/AbstractSinkRecord.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/AbstractSinkRecord.java
@@ -40,6 +40,8 @@ public abstract class AbstractSinkRecord<T> implements Record<T> {
 
     public abstract boolean shouldAlwaysSetMessageProperties();
 
+    public abstract boolean shouldSetSchema();
+
     public Record<?> getSourceRecord() {
         return sourceRecord;
     }
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/OutputRecordSinkRecord.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/OutputRecordSinkRecord.java
index 6220517414d..5b63c6e8896 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/OutputRecordSinkRecord.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/OutputRecordSinkRecord.java
@@ -28,7 +28,7 @@ import org.apache.pulsar.functions.api.Record;
 
 @EqualsAndHashCode(callSuper = true)
 @ToString
-class OutputRecordSinkRecord<T> extends AbstractSinkRecord<T> {
+public class OutputRecordSinkRecord<T> extends AbstractSinkRecord<T> {
 
     private final Record<T> sinkRecord;
 
@@ -91,4 +91,9 @@ class OutputRecordSinkRecord<T> extends AbstractSinkRecord<T> {
     public boolean shouldAlwaysSetMessageProperties() {
         return true;
     }
+
+    @Override
+    public boolean shouldSetSchema() {
+        return true;
+    }
 }
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/SinkRecord.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/SinkRecord.java
index 8f64ed2ce09..ff07118eade 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/SinkRecord.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/SinkRecord.java
@@ -25,6 +25,7 @@ import lombok.ToString;
 import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.functions.api.Record;
+import org.apache.pulsar.functions.source.PulsarRecord;
 
 @EqualsAndHashCode(callSuper = true)
 @ToString
@@ -92,4 +93,9 @@ public class SinkRecord<T> extends AbstractSinkRecord<T> {
     public boolean shouldAlwaysSetMessageProperties() {
         return false;
     }
+
+    @Override
+    public boolean shouldSetSchema() {
+        return !(sourceRecord instanceof PulsarRecord);
+    }
 }
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java
index 2f48efe33dc..cdd662bc75f 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java
@@ -236,8 +236,9 @@ public class PulsarSink<T> implements Sink<T> {
         @Override
         public TypedMessageBuilder<T> newMessage(AbstractSinkRecord<T> record) {
             Schema<T> schemaToWrite = record.getSchema();
-            if (record.getSourceRecord() instanceof PulsarRecord) {
+            if (!record.shouldSetSchema()) {
                 // we are receiving data directly from another Pulsar topic
+                // and the Function return type is not a Record
                 // we must use the destination topic schema
                 schemaToWrite = schema;
             }
@@ -304,8 +305,9 @@ public class PulsarSink<T> implements Sink<T> {
                         "PartitionId needs to be specified for every record while in Effectively-once mode");
             }
             Schema<T> schemaToWrite = record.getSchema();
-            if (record.getSourceRecord() instanceof PulsarRecord) {
+            if (!record.shouldSetSchema()) {
                 // we are receiving data directly from another Pulsar topic
+                // and the Function return type is not a Record
                 // we must use the destination topic schema
                 schemaToWrite = schema;
             }
diff --git a/tests/docker-images/java-test-functions/src/main/java/org/apache/pulsar/tests/integration/functions/RemoveAvroFieldRecordFunction.java b/tests/docker-images/java-test-functions/src/main/java/org/apache/pulsar/tests/integration/functions/RemoveAvroFieldRecordFunction.java
new file mode 100644
index 00000000000..fb396773e76
--- /dev/null
+++ b/tests/docker-images/java-test-functions/src/main/java/org/apache/pulsar/tests/integration/functions/RemoveAvroFieldRecordFunction.java
@@ -0,0 +1,164 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.tests.integration.functions;
+
+import java.io.ByteArrayOutputStream;
+import java.util.stream.Collectors;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericDatumWriter;
+import org.apache.avro.io.BinaryEncoder;
+import org.apache.avro.io.EncoderFactory;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.schema.GenericObject;
+import org.apache.pulsar.client.api.schema.GenericRecord;
+import org.apache.pulsar.client.api.schema.KeyValueSchema;
+import org.apache.pulsar.common.schema.KeyValue;
+import org.apache.pulsar.common.schema.SchemaType;
+import org.apache.pulsar.functions.api.Context;
+import org.apache.pulsar.functions.api.Function;
+import org.apache.pulsar.functions.api.Record;
+
+/**
+ * This function removes a "field" from a AVRO message.
+ */
+@Slf4j
+public class RemoveAvroFieldRecordFunction implements Function<GenericObject, Record<GenericObject>> {
+
+    private static final String FIELD_TO_REMOVE = "age";
+
+    @Override
+    public Record<GenericObject> process(GenericObject genericObject, Context context) throws Exception {
+        Record<?> currentRecord = context.getCurrentRecord();
+        log.info("apply to {} {}", genericObject, genericObject.getNativeObject());
+        log.info("record with schema {} version {} {}", currentRecord.getSchema(),
+            currentRecord.getMessage().get().getSchemaVersion(),
+            currentRecord);
+        Object nativeObject = genericObject.getNativeObject();
+        Schema<?> schema = currentRecord.getSchema();
+
+        Schema outputSchema = schema;
+        Object outputObject = genericObject.getNativeObject();
+        boolean someThingDone = false;
+        if (schema instanceof KeyValueSchema && nativeObject instanceof KeyValue) {
+            KeyValueSchema kvSchema = (KeyValueSchema) schema;
+
+            Schema keySchema = kvSchema.getKeySchema();
+            Schema valueSchema = kvSchema.getValueSchema();
+            // remove a column "age" from the "valueSchema"
+            if (valueSchema.getSchemaInfo().getType() == SchemaType.AVRO) {
+
+                org.apache.avro.Schema avroSchema = (org.apache.avro.Schema) valueSchema.getNativeSchema().get();
+                if (avroSchema.getField(FIELD_TO_REMOVE) != null) {
+                    org.apache.avro.Schema.Parser parser = new org.apache.avro.Schema.Parser();
+                    org.apache.avro.Schema originalAvroSchema = parser.parse(avroSchema.toString(false));
+                    org.apache.avro.Schema modified = org.apache.avro.Schema.createRecord(
+                        originalAvroSchema.getName(), originalAvroSchema.getDoc(), originalAvroSchema.getNamespace(),
+                        originalAvroSchema.isError(),
+                        originalAvroSchema.getFields().
+                            stream()
+                            .filter(f -> !f.name().equals(FIELD_TO_REMOVE))
+                            .map(f -> new org.apache.avro.Schema.Field(f.name(), f.schema(), f.doc(), f.defaultVal(),
+                                f.order()))
+                            .collect(Collectors.toList()));
+
+                    KeyValue originalObject = (KeyValue) nativeObject;
+
+                    GenericRecord value = (GenericRecord) originalObject.getValue();
+                    org.apache.avro.generic.GenericRecord genericRecord =
+                        (org.apache.avro.generic.GenericRecord) value.getNativeObject();
+
+                    org.apache.avro.generic.GenericRecord newRecord = new GenericData.Record(modified);
+                    for (org.apache.avro.Schema.Field field : modified.getFields()) {
+                        newRecord.put(field.name(), genericRecord.get(field.name()));
+                    }
+                    GenericDatumWriter writer = new GenericDatumWriter(modified);
+                    ByteArrayOutputStream oo = new ByteArrayOutputStream();
+                    BinaryEncoder encoder = EncoderFactory.get().directBinaryEncoder(oo, null);
+                    writer.write(newRecord, encoder);
+                    Object newValue = oo.toByteArray();
+
+                    Schema newValueSchema = Schema.NATIVE_AVRO(modified);
+                    outputSchema = Schema.KeyValue(keySchema, newValueSchema, kvSchema.getKeyValueEncodingType());
+                    outputObject = new KeyValue(originalObject.getKey(), newValue);
+                    someThingDone = true;
+                }
+            }
+        } else if (schema.getSchemaInfo().getType() == SchemaType.AVRO) {
+            org.apache.avro.Schema avroSchema = (org.apache.avro.Schema) schema.getNativeSchema().get();
+            if (avroSchema.getField(FIELD_TO_REMOVE) != null) {
+                org.apache.avro.Schema.Parser parser = new org.apache.avro.Schema.Parser();
+                org.apache.avro.Schema originalAvroSchema = parser.parse(avroSchema.toString(false));
+                org.apache.avro.Schema modified = org.apache.avro.Schema.createRecord(
+                    originalAvroSchema.getName(), originalAvroSchema.getDoc(), originalAvroSchema.getNamespace(),
+                    originalAvroSchema.isError(),
+                    originalAvroSchema.getFields().
+                        stream()
+                        .filter(f -> !f.name().equals(FIELD_TO_REMOVE))
+                        .map(f -> new org.apache.avro.Schema.Field(f.name(), f.schema(), f.doc(), f.defaultVal(),
+                            f.order()))
+                        .collect(Collectors.toList()));
+
+                org.apache.avro.generic.GenericRecord genericRecord =
+                    (org.apache.avro.generic.GenericRecord) nativeObject;
+                org.apache.avro.generic.GenericRecord newRecord = new GenericData.Record(modified);
+                for (org.apache.avro.Schema.Field field : modified.getFields()) {
+                    newRecord.put(field.name(), genericRecord.get(field.name()));
+                }
+                GenericDatumWriter writer = new GenericDatumWriter(modified);
+                ByteArrayOutputStream oo = new ByteArrayOutputStream();
+                BinaryEncoder encoder = EncoderFactory.get().directBinaryEncoder(oo, null);
+                writer.write(newRecord, encoder);
+
+                Schema newValueSchema = Schema.NATIVE_AVRO(modified);
+                outputSchema = newValueSchema;
+                outputObject = oo.toByteArray();
+                someThingDone = true;
+            }
+        }
+
+        if (!someThingDone) {
+            // do some processing...
+            final boolean isStruct;
+            switch (currentRecord.getSchema().getSchemaInfo().getType()) {
+                case AVRO:
+                case JSON:
+                case PROTOBUF_NATIVE:
+                    isStruct = true;
+                    break;
+                default:
+                    isStruct = false;
+                    break;
+            }
+            if (isStruct) {
+                // GenericRecord must stay wrapped
+                outputObject = currentRecord.getValue();
+            } else {
+                // primitives and KeyValue must be unwrapped
+                outputObject = nativeObject;
+            }
+        }
+        log.info("output {} schema {}", outputObject, outputSchema);
+
+        return context.newOutputRecordBuilder()
+            .schema(outputSchema)
+            .value(outputObject)
+            .build();
+    }
+}
\ No newline at end of file
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTestBase.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTestBase.java
index cbcdf89612a..e80f811fd5e 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTestBase.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTestBase.java
@@ -48,6 +48,9 @@ public abstract class PulsarFunctionsTestBase extends PulsarTestSuite {
     public static final String REMOVE_AVRO_FIELD_FUNCTION_JAVA_CLASS =
             "org.apache.pulsar.tests.integration.functions.RemoveAvroFieldFunction";
 
+    public static final String REMOVE_AVRO_FIELD_RECORD_FUNCTION_JAVA_CLASS =
+        "org.apache.pulsar.tests.integration.functions.RemoveAvroFieldRecordFunction";
+
     public static final String SERDE_JAVA_CLASS =
             "org.apache.pulsar.functions.api.examples.CustomBaseToBaseFunction";
 
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/java/PulsarFunctionsJavaTest.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/java/PulsarFunctionsJavaTest.java
index 3c3fe4254e0..7e1c6500e3c 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/java/PulsarFunctionsJavaTest.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/java/PulsarFunctionsJavaTest.java
@@ -174,20 +174,30 @@ public class PulsarFunctionsJavaTest extends PulsarFunctionsTest {
     }
 
     @Test(groups = {"java_function", "function"})
-    public void testGenericObjectRemoveFiledFunction() throws Exception {
+    public void testGenericObjectRemoveFieldFunction() throws Exception {
         testGenericObjectFunction(REMOVE_AVRO_FIELD_FUNCTION_JAVA_CLASS, true, false);
     }
 
+    @Test(groups = {"java_function", "function"})
+    public void testGenericObjectRemoveFieldRecordFunction() throws Exception {
+        testGenericObjectFunction(REMOVE_AVRO_FIELD_RECORD_FUNCTION_JAVA_CLASS, true, false);
+    }
+
     @Test(groups = {"java_function", "function"})
     public void testGenericObjectFunctionKeyValue() throws Exception {
         testGenericObjectFunction(GENERIC_OBJECT_FUNCTION_JAVA_CLASS, false, true);
     }
 
     @Test(groups = {"java_function", "function"})
-    public void testGenericObjectRemoveFiledFunctionKeyValue() throws Exception {
+    public void testGenericObjectRemoveFieldFunctionKeyValue() throws Exception {
         testGenericObjectFunction(REMOVE_AVRO_FIELD_FUNCTION_JAVA_CLASS, true, true);
     }
 
+    @Test(groups = {"java_function", "function"})
+    public void testGenericObjectRemoveFieldRecordFunctionKeyValue() throws Exception {
+        testGenericObjectFunction(REMOVE_AVRO_FIELD_RECORD_FUNCTION_JAVA_CLASS, true, true);
+    }
+
     @Test(groups = {"java_function", "function"})
     public void testRecordFunctionTest() throws Exception {
         testRecordFunction();