You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by te...@apache.org on 2022/08/19 02:37:19 UTC
[pulsar] branch master updated: [fix][function] Use the schema set by the Function when it returns a Record (#17142)
This is an automated email from the ASF dual-hosted git repository.
technoboy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new ee004001e6b [fix][function] Use the schema set by the Function when it returns a Record (#17142)
ee004001e6b is described below
commit ee004001e6bd0e0110f795a12a9293bc2d531279
Author: Christophe Bornet <cb...@hotmail.com>
AuthorDate: Fri Aug 19 04:37:12 2022 +0200
[fix][function] Use the schema set by the Function when it returns a Record (#17142)
---
.../functions/instance/AbstractSinkRecord.java | 2 +
.../functions/instance/OutputRecordSinkRecord.java | 7 +-
.../pulsar/functions/instance/SinkRecord.java | 6 +
.../apache/pulsar/functions/sink/PulsarSink.java | 6 +-
.../functions/RemoveAvroFieldRecordFunction.java | 164 +++++++++++++++++++++
.../functions/PulsarFunctionsTestBase.java | 3 +
.../functions/java/PulsarFunctionsJavaTest.java | 14 +-
7 files changed, 197 insertions(+), 5 deletions(-)
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/AbstractSinkRecord.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/AbstractSinkRecord.java
index 2adcbd0065f..6758558d331 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/AbstractSinkRecord.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/AbstractSinkRecord.java
@@ -40,6 +40,8 @@ public abstract class AbstractSinkRecord<T> implements Record<T> {
public abstract boolean shouldAlwaysSetMessageProperties();
+ public abstract boolean shouldSetSchema();
+
public Record<?> getSourceRecord() {
return sourceRecord;
}
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/OutputRecordSinkRecord.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/OutputRecordSinkRecord.java
index 6220517414d..5b63c6e8896 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/OutputRecordSinkRecord.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/OutputRecordSinkRecord.java
@@ -28,7 +28,7 @@ import org.apache.pulsar.functions.api.Record;
@EqualsAndHashCode(callSuper = true)
@ToString
-class OutputRecordSinkRecord<T> extends AbstractSinkRecord<T> {
+public class OutputRecordSinkRecord<T> extends AbstractSinkRecord<T> {
private final Record<T> sinkRecord;
@@ -91,4 +91,9 @@ class OutputRecordSinkRecord<T> extends AbstractSinkRecord<T> {
public boolean shouldAlwaysSetMessageProperties() {
return true;
}
+
+ @Override
+ public boolean shouldSetSchema() {
+ return true;
+ }
}
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/SinkRecord.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/SinkRecord.java
index 8f64ed2ce09..ff07118eade 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/SinkRecord.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/SinkRecord.java
@@ -25,6 +25,7 @@ import lombok.ToString;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.functions.api.Record;
+import org.apache.pulsar.functions.source.PulsarRecord;
@EqualsAndHashCode(callSuper = true)
@ToString
@@ -92,4 +93,9 @@ public class SinkRecord<T> extends AbstractSinkRecord<T> {
public boolean shouldAlwaysSetMessageProperties() {
return false;
}
+
+ @Override
+ public boolean shouldSetSchema() {
+ return !(sourceRecord instanceof PulsarRecord);
+ }
}
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java
index 2f48efe33dc..cdd662bc75f 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java
@@ -236,8 +236,9 @@ public class PulsarSink<T> implements Sink<T> {
@Override
public TypedMessageBuilder<T> newMessage(AbstractSinkRecord<T> record) {
Schema<T> schemaToWrite = record.getSchema();
- if (record.getSourceRecord() instanceof PulsarRecord) {
+ if (!record.shouldSetSchema()) {
// we are receiving data directly from another Pulsar topic
+ // and the Function return type is not a Record
// we must use the destination topic schema
schemaToWrite = schema;
}
@@ -304,8 +305,9 @@ public class PulsarSink<T> implements Sink<T> {
"PartitionId needs to be specified for every record while in Effectively-once mode");
}
Schema<T> schemaToWrite = record.getSchema();
- if (record.getSourceRecord() instanceof PulsarRecord) {
+ if (!record.shouldSetSchema()) {
// we are receiving data directly from another Pulsar topic
+ // and the Function return type is not a Record
// we must use the destination topic schema
schemaToWrite = schema;
}
diff --git a/tests/docker-images/java-test-functions/src/main/java/org/apache/pulsar/tests/integration/functions/RemoveAvroFieldRecordFunction.java b/tests/docker-images/java-test-functions/src/main/java/org/apache/pulsar/tests/integration/functions/RemoveAvroFieldRecordFunction.java
new file mode 100644
index 00000000000..fb396773e76
--- /dev/null
+++ b/tests/docker-images/java-test-functions/src/main/java/org/apache/pulsar/tests/integration/functions/RemoveAvroFieldRecordFunction.java
@@ -0,0 +1,164 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.tests.integration.functions;
+
+import java.io.ByteArrayOutputStream;
+import java.util.stream.Collectors;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericDatumWriter;
+import org.apache.avro.io.BinaryEncoder;
+import org.apache.avro.io.EncoderFactory;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.schema.GenericObject;
+import org.apache.pulsar.client.api.schema.GenericRecord;
+import org.apache.pulsar.client.api.schema.KeyValueSchema;
+import org.apache.pulsar.common.schema.KeyValue;
+import org.apache.pulsar.common.schema.SchemaType;
+import org.apache.pulsar.functions.api.Context;
+import org.apache.pulsar.functions.api.Function;
+import org.apache.pulsar.functions.api.Record;
+
+/**
+ * This function removes a "field" from a AVRO message.
+ */
+@Slf4j
+public class RemoveAvroFieldRecordFunction implements Function<GenericObject, Record<GenericObject>> {
+
+ private static final String FIELD_TO_REMOVE = "age";
+
+ @Override
+ public Record<GenericObject> process(GenericObject genericObject, Context context) throws Exception {
+ Record<?> currentRecord = context.getCurrentRecord();
+ log.info("apply to {} {}", genericObject, genericObject.getNativeObject());
+ log.info("record with schema {} version {} {}", currentRecord.getSchema(),
+ currentRecord.getMessage().get().getSchemaVersion(),
+ currentRecord);
+ Object nativeObject = genericObject.getNativeObject();
+ Schema<?> schema = currentRecord.getSchema();
+
+ Schema outputSchema = schema;
+ Object outputObject = genericObject.getNativeObject();
+ boolean someThingDone = false;
+ if (schema instanceof KeyValueSchema && nativeObject instanceof KeyValue) {
+ KeyValueSchema kvSchema = (KeyValueSchema) schema;
+
+ Schema keySchema = kvSchema.getKeySchema();
+ Schema valueSchema = kvSchema.getValueSchema();
+ // remove a column "age" from the "valueSchema"
+ if (valueSchema.getSchemaInfo().getType() == SchemaType.AVRO) {
+
+ org.apache.avro.Schema avroSchema = (org.apache.avro.Schema) valueSchema.getNativeSchema().get();
+ if (avroSchema.getField(FIELD_TO_REMOVE) != null) {
+ org.apache.avro.Schema.Parser parser = new org.apache.avro.Schema.Parser();
+ org.apache.avro.Schema originalAvroSchema = parser.parse(avroSchema.toString(false));
+ org.apache.avro.Schema modified = org.apache.avro.Schema.createRecord(
+ originalAvroSchema.getName(), originalAvroSchema.getDoc(), originalAvroSchema.getNamespace(),
+ originalAvroSchema.isError(),
+ originalAvroSchema.getFields().
+ stream()
+ .filter(f -> !f.name().equals(FIELD_TO_REMOVE))
+ .map(f -> new org.apache.avro.Schema.Field(f.name(), f.schema(), f.doc(), f.defaultVal(),
+ f.order()))
+ .collect(Collectors.toList()));
+
+ KeyValue originalObject = (KeyValue) nativeObject;
+
+ GenericRecord value = (GenericRecord) originalObject.getValue();
+ org.apache.avro.generic.GenericRecord genericRecord =
+ (org.apache.avro.generic.GenericRecord) value.getNativeObject();
+
+ org.apache.avro.generic.GenericRecord newRecord = new GenericData.Record(modified);
+ for (org.apache.avro.Schema.Field field : modified.getFields()) {
+ newRecord.put(field.name(), genericRecord.get(field.name()));
+ }
+ GenericDatumWriter writer = new GenericDatumWriter(modified);
+ ByteArrayOutputStream oo = new ByteArrayOutputStream();
+ BinaryEncoder encoder = EncoderFactory.get().directBinaryEncoder(oo, null);
+ writer.write(newRecord, encoder);
+ Object newValue = oo.toByteArray();
+
+ Schema newValueSchema = Schema.NATIVE_AVRO(modified);
+ outputSchema = Schema.KeyValue(keySchema, newValueSchema, kvSchema.getKeyValueEncodingType());
+ outputObject = new KeyValue(originalObject.getKey(), newValue);
+ someThingDone = true;
+ }
+ }
+ } else if (schema.getSchemaInfo().getType() == SchemaType.AVRO) {
+ org.apache.avro.Schema avroSchema = (org.apache.avro.Schema) schema.getNativeSchema().get();
+ if (avroSchema.getField(FIELD_TO_REMOVE) != null) {
+ org.apache.avro.Schema.Parser parser = new org.apache.avro.Schema.Parser();
+ org.apache.avro.Schema originalAvroSchema = parser.parse(avroSchema.toString(false));
+ org.apache.avro.Schema modified = org.apache.avro.Schema.createRecord(
+ originalAvroSchema.getName(), originalAvroSchema.getDoc(), originalAvroSchema.getNamespace(),
+ originalAvroSchema.isError(),
+ originalAvroSchema.getFields().
+ stream()
+ .filter(f -> !f.name().equals(FIELD_TO_REMOVE))
+ .map(f -> new org.apache.avro.Schema.Field(f.name(), f.schema(), f.doc(), f.defaultVal(),
+ f.order()))
+ .collect(Collectors.toList()));
+
+ org.apache.avro.generic.GenericRecord genericRecord =
+ (org.apache.avro.generic.GenericRecord) nativeObject;
+ org.apache.avro.generic.GenericRecord newRecord = new GenericData.Record(modified);
+ for (org.apache.avro.Schema.Field field : modified.getFields()) {
+ newRecord.put(field.name(), genericRecord.get(field.name()));
+ }
+ GenericDatumWriter writer = new GenericDatumWriter(modified);
+ ByteArrayOutputStream oo = new ByteArrayOutputStream();
+ BinaryEncoder encoder = EncoderFactory.get().directBinaryEncoder(oo, null);
+ writer.write(newRecord, encoder);
+
+ Schema newValueSchema = Schema.NATIVE_AVRO(modified);
+ outputSchema = newValueSchema;
+ outputObject = oo.toByteArray();
+ someThingDone = true;
+ }
+ }
+
+ if (!someThingDone) {
+ // do some processing...
+ final boolean isStruct;
+ switch (currentRecord.getSchema().getSchemaInfo().getType()) {
+ case AVRO:
+ case JSON:
+ case PROTOBUF_NATIVE:
+ isStruct = true;
+ break;
+ default:
+ isStruct = false;
+ break;
+ }
+ if (isStruct) {
+ // GenericRecord must stay wrapped
+ outputObject = currentRecord.getValue();
+ } else {
+ // primitives and KeyValue must be unwrapped
+ outputObject = nativeObject;
+ }
+ }
+ log.info("output {} schema {}", outputObject, outputSchema);
+
+ return context.newOutputRecordBuilder()
+ .schema(outputSchema)
+ .value(outputObject)
+ .build();
+ }
+}
\ No newline at end of file
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTestBase.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTestBase.java
index cbcdf89612a..e80f811fd5e 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTestBase.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTestBase.java
@@ -48,6 +48,9 @@ public abstract class PulsarFunctionsTestBase extends PulsarTestSuite {
public static final String REMOVE_AVRO_FIELD_FUNCTION_JAVA_CLASS =
"org.apache.pulsar.tests.integration.functions.RemoveAvroFieldFunction";
+ public static final String REMOVE_AVRO_FIELD_RECORD_FUNCTION_JAVA_CLASS =
+ "org.apache.pulsar.tests.integration.functions.RemoveAvroFieldRecordFunction";
+
public static final String SERDE_JAVA_CLASS =
"org.apache.pulsar.functions.api.examples.CustomBaseToBaseFunction";
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/java/PulsarFunctionsJavaTest.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/java/PulsarFunctionsJavaTest.java
index 3c3fe4254e0..7e1c6500e3c 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/java/PulsarFunctionsJavaTest.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/java/PulsarFunctionsJavaTest.java
@@ -174,20 +174,30 @@ public class PulsarFunctionsJavaTest extends PulsarFunctionsTest {
}
@Test(groups = {"java_function", "function"})
- public void testGenericObjectRemoveFiledFunction() throws Exception {
+ public void testGenericObjectRemoveFieldFunction() throws Exception {
testGenericObjectFunction(REMOVE_AVRO_FIELD_FUNCTION_JAVA_CLASS, true, false);
}
+ @Test(groups = {"java_function", "function"})
+ public void testGenericObjectRemoveFieldRecordFunction() throws Exception {
+ testGenericObjectFunction(REMOVE_AVRO_FIELD_RECORD_FUNCTION_JAVA_CLASS, true, false);
+ }
+
@Test(groups = {"java_function", "function"})
public void testGenericObjectFunctionKeyValue() throws Exception {
testGenericObjectFunction(GENERIC_OBJECT_FUNCTION_JAVA_CLASS, false, true);
}
@Test(groups = {"java_function", "function"})
- public void testGenericObjectRemoveFiledFunctionKeyValue() throws Exception {
+ public void testGenericObjectRemoveFieldFunctionKeyValue() throws Exception {
testGenericObjectFunction(REMOVE_AVRO_FIELD_FUNCTION_JAVA_CLASS, true, true);
}
+ @Test(groups = {"java_function", "function"})
+ public void testGenericObjectRemoveFieldRecordFunctionKeyValue() throws Exception {
+ testGenericObjectFunction(REMOVE_AVRO_FIELD_RECORD_FUNCTION_JAVA_CLASS, true, true);
+ }
+
@Test(groups = {"java_function", "function"})
public void testRecordFunctionTest() throws Exception {
testRecordFunction();