You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by eo...@apache.org on 2022/04/01 08:55:37 UTC
[pulsar] branch master updated: Pulsar Functions: allow a Function to access the original Schema of the Message and use it (#14847)
This is an automated email from the ASF dual-hosted git repository.
eolivelli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 193f5b2 Pulsar Functions: allow a Function<GenericObject,?> to access the original Schema of the Message and use it (#14847)
193f5b2 is described below
commit 193f5b2f74e919c05d0df651981cef439f55472f
Author: Enrico Olivelli <eo...@apache.org>
AuthorDate: Fri Apr 1 10:53:37 2022 +0200
Pulsar Functions: allow a Function<GenericObject,?> to access the original Schema of the Message and use it (#14847)
---
.../client/impl/schema/AutoConsumeSchema.java | 12 ++
.../pulsar/functions/source/PulsarSource.java | 11 ++
.../functions/GenericObjectFunction.java | 64 ++++++++
.../functions/RemoveAvroFieldFunction.java | 159 ++++++++++++++++++++
.../integration/functions/PulsarFunctionsTest.java | 163 ++++++++++++++++++---
.../functions/PulsarFunctionsTestBase.java | 6 +
.../functions/java/PulsarFunctionsJavaTest.java | 20 +++
7 files changed, 416 insertions(+), 19 deletions(-)
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AutoConsumeSchema.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AutoConsumeSchema.java
index 0533ccd..5e7568b 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AutoConsumeSchema.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AutoConsumeSchema.java
@@ -299,6 +299,18 @@ public class AutoConsumeSchema implements Schema<GenericRecord> {
}
/**
+ * Get a specific schema version, fetching from the Registry if it is not loaded yet.
+ * This method is not intended to be used by applications.
+ * @param schemaVersion the version
+ * @return the Schema at the specific version
+ * @see #atSchemaVersion(byte[])
+ */
+ public Schema<?> unwrapInternalSchema(byte[] schemaVersion) {
+ fetchSchemaIfNeeded(BytesSchemaVersion.of(schemaVersion));
+ return getInternalSchema(schemaVersion);
+ }
+
+ /**
* It may happen that the schema is not loaded but we need it, for instance in order to call getSchemaInfo()
* We cannot call this method in getSchemaInfo, because getSchemaInfo is called in many
* places and we will introduce lots of deadlocks.
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java
index ca12ddc..652c682 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java
@@ -32,6 +32,7 @@ import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.impl.MessageImpl;
import org.apache.pulsar.client.impl.TopicMessageImpl;
+import org.apache.pulsar.client.impl.schema.AutoConsumeSchema;
import org.apache.pulsar.common.functions.ConsumerConfig;
import org.apache.pulsar.common.functions.FunctionConfig;
import org.apache.pulsar.functions.api.Record;
@@ -117,6 +118,16 @@ public abstract class PulsarSource<T> implements Source<T> {
TopicMessageImpl impl = (TopicMessageImpl) message;
schema = impl.getSchemaInternal();
}
+
+ // we don't want the Function/Sink to see AutoConsumeSchema
+ if (schema instanceof AutoConsumeSchema) {
+ AutoConsumeSchema autoConsumeSchema = (AutoConsumeSchema) schema;
+ // we cannot use atSchemaVersion, because atSchemaVersion is only
+ // able to decode data, here we want a Schema that
+ // is able to re-encode the payload when needed.
+ schema = (Schema<T>) autoConsumeSchema
+ .unwrapInternalSchema(message.getSchemaVersion());
+ }
return PulsarRecord.<T>builder()
.message(message)
.schema(schema)
diff --git a/tests/docker-images/java-test-functions/src/main/java/org/apache/pulsar/tests/integration/functions/GenericObjectFunction.java b/tests/docker-images/java-test-functions/src/main/java/org/apache/pulsar/tests/integration/functions/GenericObjectFunction.java
new file mode 100644
index 0000000..3c5edaa
--- /dev/null
+++ b/tests/docker-images/java-test-functions/src/main/java/org/apache/pulsar/tests/integration/functions/GenericObjectFunction.java
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.tests.integration.functions;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.schema.GenericObject;
+import org.apache.pulsar.functions.api.Context;
+import org.apache.pulsar.functions.api.Function;
+import org.apache.pulsar.functions.api.Record;
+
+/**
+ * This function processes any message with any schema,
+ * and outputs the message with the same schema to another topic.
+ */
+@Slf4j
+public class GenericObjectFunction implements Function<GenericObject, Void> {
+
+ @Override
+ public Void process(GenericObject genericObject, Context context) throws Exception {
+ Record<?> currentRecord = context.getCurrentRecord();
+ log.info("apply to {} {}", genericObject, genericObject.getNativeObject());
+ log.info("record with schema {} {}", currentRecord.getSchema(), currentRecord);
+ // do some processing...
+ final boolean isStruct;
+ switch (currentRecord.getSchema().getSchemaInfo().getType()) {
+ case AVRO:
+ case JSON:
+ case PROTOBUF_NATIVE:
+ isStruct = true;
+ break;
+ default:
+ isStruct = false;
+ break;
+ }
+ if (isStruct) {
+ // GenericRecord must stay wrapped
+ context.newOutputMessage(context.getOutputTopic(), (Schema) currentRecord.getSchema())
+ .value(genericObject).send();
+ } else {
+ // primitives and KeyValue must be unwrapped
+ context.newOutputMessage(context.getOutputTopic(), (Schema) currentRecord.getSchema())
+ .value(genericObject.getNativeObject()).send();
+ }
+ return null;
+ }
+}
+
diff --git a/tests/docker-images/java-test-functions/src/main/java/org/apache/pulsar/tests/integration/functions/RemoveAvroFieldFunction.java b/tests/docker-images/java-test-functions/src/main/java/org/apache/pulsar/tests/integration/functions/RemoveAvroFieldFunction.java
new file mode 100644
index 0000000..cc2a81b
--- /dev/null
+++ b/tests/docker-images/java-test-functions/src/main/java/org/apache/pulsar/tests/integration/functions/RemoveAvroFieldFunction.java
@@ -0,0 +1,159 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.tests.integration.functions;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericDatumWriter;
+import org.apache.avro.io.BinaryEncoder;
+import org.apache.avro.io.EncoderFactory;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.schema.GenericObject;
+import org.apache.pulsar.client.api.schema.GenericRecord;
+import org.apache.pulsar.client.api.schema.KeyValueSchema;
+import org.apache.pulsar.common.schema.KeyValue;
+import org.apache.pulsar.common.schema.SchemaType;
+import org.apache.pulsar.functions.api.Context;
+import org.apache.pulsar.functions.api.Function;
+import org.apache.pulsar.functions.api.Record;
+
+import java.io.ByteArrayOutputStream;
+import java.util.stream.Collectors;
+
+/**
+ * This function removes a "field" from a AVRO message
+ */
+@Slf4j
+public class RemoveAvroFieldFunction implements Function<GenericObject, Void> {
+
+ private static final String FIELD_TO_REMOVE = "age";
+
+ @Override
+ public Void process(GenericObject genericObject, Context context) throws Exception {
+ Record<?> currentRecord = context.getCurrentRecord();
+ log.info("apply to {} {}", genericObject, genericObject.getNativeObject());
+ log.info("record with schema {} version {} {}", currentRecord.getSchema(),
+ currentRecord.getMessage().get().getSchemaVersion(),
+ currentRecord);
+ Object nativeObject = genericObject.getNativeObject();
+ Schema<?> schema = currentRecord.getSchema();
+
+ Schema outputSchema = schema;
+ Object outputObject = genericObject.getNativeObject();
+ boolean someThingDone = false;
+ if (schema instanceof KeyValueSchema && nativeObject instanceof KeyValue) {
+ KeyValueSchema kvSchema = (KeyValueSchema) schema;
+
+ Schema keySchema = kvSchema.getKeySchema();
+ Schema valueSchema = kvSchema.getValueSchema();
+ // remove a column "age" from the "valueSchema"
+ if (valueSchema.getSchemaInfo().getType() == SchemaType.AVRO) {
+
+ org.apache.avro.Schema avroSchema = (org.apache.avro.Schema) valueSchema.getNativeSchema().get();
+ if (avroSchema.getField(FIELD_TO_REMOVE) != null) {
+ org.apache.avro.Schema.Parser parser = new org.apache.avro.Schema.Parser();
+ org.apache.avro.Schema originalAvroSchema = parser.parse(avroSchema.toString(false));
+ org.apache.avro.Schema modified = org.apache.avro.Schema.createRecord(
+ originalAvroSchema.getName(), originalAvroSchema.getDoc(), originalAvroSchema.getNamespace(), originalAvroSchema.isError(),
+ originalAvroSchema.getFields().
+ stream()
+ .filter(f->!f.name().equals(FIELD_TO_REMOVE))
+ .map(f-> new org.apache.avro.Schema.Field(f.name(), f.schema(), f.doc(), f.defaultVal(), f.order()))
+ .collect(Collectors.toList()));
+
+ KeyValue originalObject = (KeyValue) nativeObject;
+
+ GenericRecord value = (GenericRecord) originalObject.getValue();
+ org.apache.avro.generic.GenericRecord genericRecord
+ = (org.apache.avro.generic.GenericRecord) value.getNativeObject();
+
+ org.apache.avro.generic.GenericRecord newRecord = new GenericData.Record(modified);
+ for (org.apache.avro.Schema.Field field : modified.getFields()) {
+ newRecord.put(field.name(), genericRecord.get(field.name()));
+ }
+ GenericDatumWriter writer = new GenericDatumWriter(modified);
+ ByteArrayOutputStream oo = new ByteArrayOutputStream();
+ BinaryEncoder encoder = EncoderFactory.get().directBinaryEncoder(oo, null);
+ writer.write(newRecord, encoder);
+ Object newValue = oo.toByteArray();
+
+ Schema newValueSchema = Schema.NATIVE_AVRO(modified);
+ outputSchema = Schema.KeyValue(keySchema, newValueSchema, kvSchema.getKeyValueEncodingType());
+ outputObject = new KeyValue(originalObject.getKey(), newValue);
+ someThingDone = true;
+ }
+ }
+ } else if (schema.getSchemaInfo().getType() == SchemaType.AVRO) {
+ org.apache.avro.Schema avroSchema = (org.apache.avro.Schema) schema.getNativeSchema().get();
+ if (avroSchema.getField(FIELD_TO_REMOVE) != null) {
+ org.apache.avro.Schema.Parser parser = new org.apache.avro.Schema.Parser();
+ org.apache.avro.Schema originalAvroSchema = parser.parse(avroSchema.toString(false));
+ org.apache.avro.Schema modified = org.apache.avro.Schema.createRecord(
+ originalAvroSchema.getName(), originalAvroSchema.getDoc(), originalAvroSchema.getNamespace(), originalAvroSchema.isError(),
+ originalAvroSchema.getFields().
+ stream()
+ .filter(f -> !f.name().equals(FIELD_TO_REMOVE))
+ .map(f -> new org.apache.avro.Schema.Field(f.name(), f.schema(), f.doc(), f.defaultVal(), f.order()))
+ .collect(Collectors.toList()));
+
+ org.apache.avro.generic.GenericRecord genericRecord
+ = (org.apache.avro.generic.GenericRecord) nativeObject;
+ org.apache.avro.generic.GenericRecord newRecord = new GenericData.Record(modified);
+ for (org.apache.avro.Schema.Field field : modified.getFields()) {
+ newRecord.put(field.name(), genericRecord.get(field.name()));
+ }
+ GenericDatumWriter writer = new GenericDatumWriter(modified);
+ ByteArrayOutputStream oo = new ByteArrayOutputStream();
+ BinaryEncoder encoder = EncoderFactory.get().directBinaryEncoder(oo, null);
+ writer.write(newRecord, encoder);
+
+ Schema newValueSchema = Schema.NATIVE_AVRO(modified);
+ outputSchema = newValueSchema;
+ outputObject = oo.toByteArray();
+ someThingDone = true;
+ }
+ }
+
+ if (!someThingDone) {
+ // do some processing...
+ final boolean isStruct;
+ switch (currentRecord.getSchema().getSchemaInfo().getType()) {
+ case AVRO:
+ case JSON:
+ case PROTOBUF_NATIVE:
+ isStruct = true;
+ break;
+ default:
+ isStruct = false;
+ break;
+ }
+ if (isStruct) {
+ // GenericRecord must stay wrapped
+ outputObject = currentRecord.getValue();
+ } else {
+ // primitives and KeyValue must be unwrapped
+ outputObject = nativeObject;
+ }
+ }
+ log.info("output {} schema {}", outputObject, outputSchema);
+ context.newOutputMessage(context.getOutputTopic(), outputSchema)
+ .value(outputObject).send();
+ return null;
+ }
+}
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
index 460ea22..f9b90b4 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
@@ -21,6 +21,7 @@ package org.apache.pulsar.tests.integration.functions;
import static java.nio.charset.StandardCharsets.UTF_8;
import static org.assertj.core.api.Assertions.assertThat;
import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
@@ -1590,6 +1591,125 @@ public abstract class PulsarFunctionsTest extends PulsarFunctionsTestBase {
client.close();
}
+
+ protected void testGenericObjectFunction(String function, boolean removeAgeField, boolean keyValue) throws Exception {
+ log.info("start {} function test ...", function);
+
+ String ns = "public/ns-genericobject-" + randomName(8);
+ @Cleanup
+ PulsarAdmin pulsarAdmin = getPulsarAdmin();
+ pulsarAdmin.namespaces().createNamespace(ns);
+
+ @Cleanup
+ PulsarClient pulsarClient = getPulsarClient();
+
+ final int numMessages = 10;
+ final String inputTopic = ns + "/test-object-input-" + randomName(8);
+ final String outputTopic = ns + "/test-object-output" + randomName(8);
+ @Cleanup
+ Consumer<GenericRecord> consumer = pulsarClient
+ .newConsumer(Schema.AUTO_CONSUME())
+ .subscriptionName("test")
+ .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
+ .topic(outputTopic)
+ .subscribe();
+
+ final String functionName = "test-generic-fn-" + randomName(8);
+ submitFunction(
+ Runtime.JAVA,
+ inputTopic,
+ outputTopic,
+ functionName,
+ null,
+ function,
+ Schema.AUTO_CONSUME(),
+ null,
+ null,
+ SchemaType.NONE.name(),
+ SubscriptionInitialPosition.Earliest);
+ try {
+ if (keyValue) {
+ @Cleanup
+ Producer<KeyValue<Users.UserV1, Users.UserV1>> producer = pulsarClient
+ .newProducer(Schema.KeyValue(
+ Schema.AVRO(Users.UserV1.class),
+ Schema.AVRO(Users.UserV1.class), KeyValueEncodingType.SEPARATED))
+ .topic(inputTopic)
+ .create();
+ for (int i = 0; i < numMessages; i++) {
+ producer.send(new KeyValue<>(new Users.UserV1("foo" + i, i),
+ new Users.UserV1("bar" + i, i + 100)));
+ }
+ } else {
+ @Cleanup
+ Producer<Users.UserV1> producer = pulsarClient
+ .newProducer(Schema.AVRO(Users.UserV1.class))
+ .topic(inputTopic)
+ .create();
+ for (int i = 0; i < numMessages; i++) {
+ producer.send(new Users.UserV1("bar" + i, i + 100));
+ }
+ }
+
+ getFunctionInfoSuccess(functionName);
+
+ getFunctionStatus(functionName, numMessages, true);
+
+ int i = 0;
+ Message<GenericRecord> message;
+ do {
+ message = consumer.receive(30, TimeUnit.SECONDS);
+ if (message != null) {
+ GenericRecord genericRecord = message.getValue();
+ if (keyValue) {
+ KeyValue<GenericRecord, GenericRecord> keyValueObject = (KeyValue<GenericRecord, GenericRecord>) genericRecord.getNativeObject();
+ GenericRecord key = keyValueObject.getKey();
+ GenericRecord value = keyValueObject.getValue();
+ key.getFields().forEach(f-> {
+ log.info("key field {} value {}", f.getName(), key.getField(f.getName()));
+ });
+ value.getFields().forEach(f-> {
+ log.info("value field {} value {}", f.getName(), value.getField(f.getName()));
+ });
+ assertEquals(i, key.getField("age"));
+ assertEquals("foo" + i, key.getField("name"));
+
+ if (removeAgeField) {
+ // field "age" is removed from the schema
+ assertFalse(value.getFields().stream().anyMatch(f -> f.getName().equals("age")));
+ } else {
+ assertEquals(i + 100, value.getField("age"));
+ }
+ assertEquals("bar" + i, value.getField("name"));
+ } else {
+ GenericRecord value = genericRecord;
+ log.info("received value {}", value);
+ value.getFields().forEach(f-> {
+ log.info("value field {} value {}", f.getName(), value.getField(f.getName()));
+ });
+
+ if (removeAgeField) {
+ // field "age" is removed from the schema
+ assertFalse(value.getFields().stream().anyMatch(f -> f.getName().equals("age")));
+ } else {
+ assertEquals(i + 100, value.getField("age"));
+ }
+ assertEquals("bar" + i, value.getField("name"));
+ }
+
+ consumer.acknowledge(message);
+ i++;
+ }
+ } while (message != null);
+ } finally {
+ pulsarCluster.dumpFunctionLogs(functionName);
+ }
+
+ deleteFunction(functionName);
+
+ getFunctionInfoNotFound(functionName);
+ }
+
protected void testMergeFunction() throws Exception {
log.info("start merge function test ...");
@@ -1636,26 +1756,31 @@ public abstract class PulsarFunctionsTest extends PulsarFunctionsTestBase {
getFunctionStatus(functionName, topicMsgCntMap.keySet().size() * messagePerTopic, true);
- Message<GenericRecord> message;
- do {
- message = consumer.receive(30, TimeUnit.SECONDS);
- if (message != null) {
- String baseTopic = message.getProperty("baseTopic");
- GenericRecord genericRecord = message.getValue();
- log.info("receive msg baseTopic: {}, schemaType: {}, nativeClass: {}, nativeObject: {}",
- baseTopic,
- genericRecord.getSchemaType(),
- genericRecord.getNativeObject().getClass(),
- genericRecord.getNativeObject());
- checkSchemaForAutoSchema(message, baseTopic);
- topicMsgCntMap.get(baseTopic).decrementAndGet();
- consumer.acknowledge(message);
- }
- } while (message != null);
+ try {
- for (Map.Entry<String, AtomicInteger> entry : topicMsgCntMap.entrySet()) {
- assertEquals(entry.getValue().get(), 0,
- "topic " + entry.getKey() + " left message cnt is not 0.");
+ Message<GenericRecord> message;
+ do {
+ message = consumer.receive(30, TimeUnit.SECONDS);
+ if (message != null) {
+ String baseTopic = message.getProperty("baseTopic");
+ GenericRecord genericRecord = message.getValue();
+ log.info("receive msg baseTopic: {}, schemaType: {}, nativeClass: {}, nativeObject: {}",
+ baseTopic,
+ genericRecord.getSchemaType(),
+ genericRecord.getNativeObject().getClass(),
+ genericRecord.getNativeObject());
+ checkSchemaForAutoSchema(message, baseTopic);
+ topicMsgCntMap.get(baseTopic).decrementAndGet();
+ consumer.acknowledge(message);
+ }
+ } while (message != null);
+
+ for (Map.Entry<String, AtomicInteger> entry : topicMsgCntMap.entrySet()) {
+ assertEquals(entry.getValue().get(), 0,
+ "topic " + entry.getKey() + " left message cnt is not 0.");
+ }
+ } finally {
+ pulsarCluster.dumpFunctionLogs(functionName);
}
deleteFunction(functionName);
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTestBase.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTestBase.java
index e594923..cbcdf89 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTestBase.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTestBase.java
@@ -42,6 +42,12 @@ public abstract class PulsarFunctionsTestBase extends PulsarTestSuite {
public static final String EXCEPTION_JAVA_CLASS =
"org.apache.pulsar.tests.integration.functions.ExceptionFunction";
+ public static final String GENERIC_OBJECT_FUNCTION_JAVA_CLASS =
+ "org.apache.pulsar.tests.integration.functions.GenericObjectFunction";
+
+ public static final String REMOVE_AVRO_FIELD_FUNCTION_JAVA_CLASS =
+ "org.apache.pulsar.tests.integration.functions.RemoveAvroFieldFunction";
+
public static final String SERDE_JAVA_CLASS =
"org.apache.pulsar.functions.api.examples.CustomBaseToBaseFunction";
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/java/PulsarFunctionsJavaTest.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/java/PulsarFunctionsJavaTest.java
index 2e35a70..46cb158 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/java/PulsarFunctionsJavaTest.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/java/PulsarFunctionsJavaTest.java
@@ -168,4 +168,24 @@ public class PulsarFunctionsJavaTest extends PulsarFunctionsTest {
testMergeFunction();
}
+ @Test(groups = {"java_function", "function"})
+ public void testGenericObjectFunction() throws Exception {
+ testGenericObjectFunction(GENERIC_OBJECT_FUNCTION_JAVA_CLASS, false, false);
+ }
+
+ @Test(groups = {"java_function", "function"})
+ public void testGenericObjectRemoveFiledFunction() throws Exception {
+ testGenericObjectFunction(REMOVE_AVRO_FIELD_FUNCTION_JAVA_CLASS, true, false);
+ }
+
+ @Test(groups = {"java_function", "function"})
+ public void testGenericObjectFunctionKeyValue() throws Exception {
+ testGenericObjectFunction(GENERIC_OBJECT_FUNCTION_JAVA_CLASS, false, true);
+ }
+
+ @Test(groups = {"java_function", "function"})
+ public void testGenericObjectRemoveFiledFunctionKeyValue() throws Exception {
+ testGenericObjectFunction(REMOVE_AVRO_FIELD_FUNCTION_JAVA_CLASS, true, true);
+ }
+
}