You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by eo...@apache.org on 2022/04/01 08:55:37 UTC

[pulsar] branch master updated: Pulsar Functions: allow a Function to access the original Schema of the Message and use it (#14847)

This is an automated email from the ASF dual-hosted git repository.

eolivelli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 193f5b2  Pulsar Functions: allow a Function<GenericObject,?> to access the original Schema of the Message and use it (#14847)
193f5b2 is described below

commit 193f5b2f74e919c05d0df651981cef439f55472f
Author: Enrico Olivelli <eo...@apache.org>
AuthorDate: Fri Apr 1 10:53:37 2022 +0200

    Pulsar Functions: allow a Function<GenericObject,?> to access the original Schema of the Message and use it (#14847)
---
 .../client/impl/schema/AutoConsumeSchema.java      |  12 ++
 .../pulsar/functions/source/PulsarSource.java      |  11 ++
 .../functions/GenericObjectFunction.java           |  64 ++++++++
 .../functions/RemoveAvroFieldFunction.java         | 159 ++++++++++++++++++++
 .../integration/functions/PulsarFunctionsTest.java | 163 ++++++++++++++++++---
 .../functions/PulsarFunctionsTestBase.java         |   6 +
 .../functions/java/PulsarFunctionsJavaTest.java    |  20 +++
 7 files changed, 416 insertions(+), 19 deletions(-)

diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AutoConsumeSchema.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AutoConsumeSchema.java
index 0533ccd..5e7568b 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AutoConsumeSchema.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AutoConsumeSchema.java
@@ -299,6 +299,18 @@ public class AutoConsumeSchema implements Schema<GenericRecord> {
     }
 
     /**
+     * Get a specific schema version, fetching from the Registry if it is not loaded yet.
+     * This method is not intended to be used by applications.
+     * @param schemaVersion the version
+     * @return the Schema at the specific version
+     * @see #atSchemaVersion(byte[])
+     */
+    public Schema<?> unwrapInternalSchema(byte[] schemaVersion) {
+        fetchSchemaIfNeeded(BytesSchemaVersion.of(schemaVersion));
+        return getInternalSchema(schemaVersion);
+    }
+
+    /**
      * It may happen that the schema is not loaded but we need it, for instance in order to call getSchemaInfo()
      * We cannot call this method in getSchemaInfo, because getSchemaInfo is called in many
      * places and we will introduce lots of deadlocks.
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java
index ca12ddc..652c682 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java
@@ -32,6 +32,7 @@ import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.impl.MessageImpl;
 import org.apache.pulsar.client.impl.TopicMessageImpl;
+import org.apache.pulsar.client.impl.schema.AutoConsumeSchema;
 import org.apache.pulsar.common.functions.ConsumerConfig;
 import org.apache.pulsar.common.functions.FunctionConfig;
 import org.apache.pulsar.functions.api.Record;
@@ -117,6 +118,16 @@ public abstract class PulsarSource<T> implements Source<T> {
             TopicMessageImpl impl = (TopicMessageImpl) message;
             schema = impl.getSchemaInternal();
         }
+
+        // we don't want the Function/Sink to see AutoConsumeSchema
+        if (schema instanceof AutoConsumeSchema) {
+            AutoConsumeSchema autoConsumeSchema = (AutoConsumeSchema) schema;
+            // we cannot use atSchemaVersion, because atSchemaVersion is only
+            // able to decode data, here we want a Schema that
+            // is able to re-encode the payload when needed.
+            schema = (Schema<T>) autoConsumeSchema
+                    .unwrapInternalSchema(message.getSchemaVersion());
+        }
         return PulsarRecord.<T>builder()
                 .message(message)
                 .schema(schema)
diff --git a/tests/docker-images/java-test-functions/src/main/java/org/apache/pulsar/tests/integration/functions/GenericObjectFunction.java b/tests/docker-images/java-test-functions/src/main/java/org/apache/pulsar/tests/integration/functions/GenericObjectFunction.java
new file mode 100644
index 0000000..3c5edaa
--- /dev/null
+++ b/tests/docker-images/java-test-functions/src/main/java/org/apache/pulsar/tests/integration/functions/GenericObjectFunction.java
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.tests.integration.functions;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.schema.GenericObject;
+import org.apache.pulsar.functions.api.Context;
+import org.apache.pulsar.functions.api.Function;
+import org.apache.pulsar.functions.api.Record;
+
+/**
+ * This function processes any message with any schema,
+ * and outputs the message with the same schema to another topic.
+ */
+@Slf4j
+public class GenericObjectFunction implements Function<GenericObject, Void> {
+
+    @Override
+    public Void process(GenericObject genericObject, Context context) throws Exception {
+        Record<?> currentRecord = context.getCurrentRecord();
+        log.info("apply to {} {}", genericObject, genericObject.getNativeObject());
+        log.info("record with schema {} {}", currentRecord.getSchema(), currentRecord);
+        // do some processing...
+        final boolean isStruct;
+        switch (currentRecord.getSchema().getSchemaInfo().getType()) {
+            case AVRO:
+            case JSON:
+            case PROTOBUF_NATIVE:
+                isStruct = true;
+                break;
+            default:
+                isStruct = false;
+                break;
+        }
+        if (isStruct) {
+            // GenericRecord must stay wrapped
+            context.newOutputMessage(context.getOutputTopic(), (Schema) currentRecord.getSchema())
+                    .value(genericObject).send();
+        } else {
+            // primitives and KeyValue must be unwrapped
+            context.newOutputMessage(context.getOutputTopic(), (Schema) currentRecord.getSchema())
+                    .value(genericObject.getNativeObject()).send();
+        }
+        return null;
+    }
+}
+
diff --git a/tests/docker-images/java-test-functions/src/main/java/org/apache/pulsar/tests/integration/functions/RemoveAvroFieldFunction.java b/tests/docker-images/java-test-functions/src/main/java/org/apache/pulsar/tests/integration/functions/RemoveAvroFieldFunction.java
new file mode 100644
index 0000000..cc2a81b
--- /dev/null
+++ b/tests/docker-images/java-test-functions/src/main/java/org/apache/pulsar/tests/integration/functions/RemoveAvroFieldFunction.java
@@ -0,0 +1,159 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.tests.integration.functions;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericDatumWriter;
+import org.apache.avro.io.BinaryEncoder;
+import org.apache.avro.io.EncoderFactory;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.schema.GenericObject;
+import org.apache.pulsar.client.api.schema.GenericRecord;
+import org.apache.pulsar.client.api.schema.KeyValueSchema;
+import org.apache.pulsar.common.schema.KeyValue;
+import org.apache.pulsar.common.schema.SchemaType;
+import org.apache.pulsar.functions.api.Context;
+import org.apache.pulsar.functions.api.Function;
+import org.apache.pulsar.functions.api.Record;
+
+import java.io.ByteArrayOutputStream;
+import java.util.stream.Collectors;
+
+/**
+ * This function removes a "field" from a AVRO message
+ */
+@Slf4j
+public class RemoveAvroFieldFunction implements Function<GenericObject, Void> {
+
+    private static final String FIELD_TO_REMOVE = "age";
+
+    @Override
+    public Void process(GenericObject genericObject, Context context) throws Exception {
+        Record<?> currentRecord = context.getCurrentRecord();
+        log.info("apply to {} {}", genericObject, genericObject.getNativeObject());
+        log.info("record with schema {} version {} {}", currentRecord.getSchema(),
+                currentRecord.getMessage().get().getSchemaVersion(),
+                currentRecord);
+        Object nativeObject = genericObject.getNativeObject();
+        Schema<?> schema = currentRecord.getSchema();
+
+        Schema outputSchema = schema;
+        Object outputObject = genericObject.getNativeObject();
+        boolean someThingDone = false;
+        if (schema instanceof KeyValueSchema && nativeObject instanceof KeyValue)  {
+            KeyValueSchema kvSchema = (KeyValueSchema) schema;
+
+            Schema keySchema = kvSchema.getKeySchema();
+            Schema valueSchema = kvSchema.getValueSchema();
+            // remove a column "age" from the "valueSchema"
+            if (valueSchema.getSchemaInfo().getType() == SchemaType.AVRO) {
+
+                org.apache.avro.Schema avroSchema = (org.apache.avro.Schema) valueSchema.getNativeSchema().get();
+                if (avroSchema.getField(FIELD_TO_REMOVE) != null) {
+                    org.apache.avro.Schema.Parser parser = new org.apache.avro.Schema.Parser();
+                    org.apache.avro.Schema originalAvroSchema = parser.parse(avroSchema.toString(false));
+                    org.apache.avro.Schema modified = org.apache.avro.Schema.createRecord(
+                            originalAvroSchema.getName(), originalAvroSchema.getDoc(), originalAvroSchema.getNamespace(), originalAvroSchema.isError(),
+                            originalAvroSchema.getFields().
+                                    stream()
+                                    .filter(f->!f.name().equals(FIELD_TO_REMOVE))
+                                    .map(f-> new org.apache.avro.Schema.Field(f.name(), f.schema(), f.doc(), f.defaultVal(), f.order()))
+                                    .collect(Collectors.toList()));
+
+                    KeyValue originalObject = (KeyValue) nativeObject;
+
+                    GenericRecord value = (GenericRecord) originalObject.getValue();
+                    org.apache.avro.generic.GenericRecord genericRecord
+                            = (org.apache.avro.generic.GenericRecord) value.getNativeObject();
+
+                    org.apache.avro.generic.GenericRecord newRecord = new GenericData.Record(modified);
+                    for (org.apache.avro.Schema.Field field : modified.getFields()) {
+                        newRecord.put(field.name(), genericRecord.get(field.name()));
+                    }
+                    GenericDatumWriter writer = new GenericDatumWriter(modified);
+                    ByteArrayOutputStream oo = new ByteArrayOutputStream();
+                    BinaryEncoder encoder = EncoderFactory.get().directBinaryEncoder(oo, null);
+                    writer.write(newRecord, encoder);
+                    Object newValue = oo.toByteArray();
+
+                    Schema newValueSchema = Schema.NATIVE_AVRO(modified);
+                    outputSchema = Schema.KeyValue(keySchema, newValueSchema, kvSchema.getKeyValueEncodingType());
+                    outputObject = new KeyValue(originalObject.getKey(), newValue);
+                    someThingDone = true;
+                }
+            }
+        } else if (schema.getSchemaInfo().getType() == SchemaType.AVRO) {
+            org.apache.avro.Schema avroSchema = (org.apache.avro.Schema) schema.getNativeSchema().get();
+            if (avroSchema.getField(FIELD_TO_REMOVE) != null) {
+                org.apache.avro.Schema.Parser parser = new org.apache.avro.Schema.Parser();
+                org.apache.avro.Schema originalAvroSchema = parser.parse(avroSchema.toString(false));
+                org.apache.avro.Schema modified = org.apache.avro.Schema.createRecord(
+                        originalAvroSchema.getName(), originalAvroSchema.getDoc(), originalAvroSchema.getNamespace(), originalAvroSchema.isError(),
+                        originalAvroSchema.getFields().
+                                stream()
+                                .filter(f -> !f.name().equals(FIELD_TO_REMOVE))
+                                .map(f -> new org.apache.avro.Schema.Field(f.name(), f.schema(), f.doc(), f.defaultVal(), f.order()))
+                                .collect(Collectors.toList()));
+
+                org.apache.avro.generic.GenericRecord genericRecord
+                        = (org.apache.avro.generic.GenericRecord) nativeObject;
+                org.apache.avro.generic.GenericRecord newRecord = new GenericData.Record(modified);
+                for (org.apache.avro.Schema.Field field : modified.getFields()) {
+                    newRecord.put(field.name(), genericRecord.get(field.name()));
+                }
+                GenericDatumWriter writer = new GenericDatumWriter(modified);
+                ByteArrayOutputStream oo = new ByteArrayOutputStream();
+                BinaryEncoder encoder = EncoderFactory.get().directBinaryEncoder(oo, null);
+                writer.write(newRecord, encoder);
+
+                Schema newValueSchema = Schema.NATIVE_AVRO(modified);
+                outputSchema = newValueSchema;
+                outputObject = oo.toByteArray();
+                someThingDone = true;
+            }
+        }
+
+        if (!someThingDone) {
+            // do some processing...
+            final boolean isStruct;
+            switch (currentRecord.getSchema().getSchemaInfo().getType()) {
+                case AVRO:
+                case JSON:
+                case PROTOBUF_NATIVE:
+                    isStruct = true;
+                    break;
+                default:
+                    isStruct = false;
+                    break;
+            }
+            if (isStruct) {
+                // GenericRecord must stay wrapped
+                outputObject = currentRecord.getValue();
+            } else {
+                // primitives and KeyValue must be unwrapped
+                outputObject = nativeObject;
+            }
+        }
+        log.info("output {} schema {}", outputObject, outputSchema);
+        context.newOutputMessage(context.getOutputTopic(), outputSchema)
+                .value(outputObject).send();
+        return null;
+    }
+}
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
index 460ea22..f9b90b4 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
@@ -21,6 +21,7 @@ package org.apache.pulsar.tests.integration.functions;
 import static java.nio.charset.StandardCharsets.UTF_8;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
 import static org.testng.Assert.assertTrue;
 import static org.testng.Assert.fail;
 
@@ -1590,6 +1591,125 @@ public abstract class PulsarFunctionsTest extends PulsarFunctionsTestBase {
         client.close();
     }
 
+
+    protected void testGenericObjectFunction(String function, boolean removeAgeField, boolean keyValue) throws Exception {
+        log.info("start {} function test ...", function);
+
+        String ns = "public/ns-genericobject-" + randomName(8);
+        @Cleanup
+        PulsarAdmin pulsarAdmin = getPulsarAdmin();
+        pulsarAdmin.namespaces().createNamespace(ns);
+
+        @Cleanup
+        PulsarClient pulsarClient = getPulsarClient();
+
+        final int numMessages = 10;
+        final String inputTopic = ns + "/test-object-input-" + randomName(8);
+        final String outputTopic = ns + "/test-object-output" + randomName(8);
+        @Cleanup
+        Consumer<GenericRecord> consumer = pulsarClient
+                .newConsumer(Schema.AUTO_CONSUME())
+                .subscriptionName("test")
+                .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
+                .topic(outputTopic)
+                .subscribe();
+
+        final String functionName = "test-generic-fn-" + randomName(8);
+        submitFunction(
+                Runtime.JAVA,
+                inputTopic,
+                outputTopic,
+                functionName,
+                null,
+                function,
+                Schema.AUTO_CONSUME(),
+                null,
+                null,
+                SchemaType.NONE.name(),
+                SubscriptionInitialPosition.Earliest);
+        try {
+            if (keyValue) {
+                @Cleanup
+                Producer<KeyValue<Users.UserV1, Users.UserV1>> producer = pulsarClient
+                        .newProducer(Schema.KeyValue(
+                                Schema.AVRO(Users.UserV1.class),
+                                Schema.AVRO(Users.UserV1.class), KeyValueEncodingType.SEPARATED))
+                        .topic(inputTopic)
+                        .create();
+                for (int i = 0; i < numMessages; i++) {
+                    producer.send(new KeyValue<>(new Users.UserV1("foo" + i, i),
+                            new Users.UserV1("bar" + i, i + 100)));
+                }
+            } else {
+                @Cleanup
+                Producer<Users.UserV1> producer = pulsarClient
+                        .newProducer(Schema.AVRO(Users.UserV1.class))
+                        .topic(inputTopic)
+                        .create();
+                for (int i = 0; i < numMessages; i++) {
+                    producer.send(new Users.UserV1("bar" + i, i + 100));
+                }
+            }
+
+            getFunctionInfoSuccess(functionName);
+
+            getFunctionStatus(functionName, numMessages, true);
+
+            int i = 0;
+            Message<GenericRecord> message;
+            do {
+                message = consumer.receive(30, TimeUnit.SECONDS);
+                if (message != null) {
+                    GenericRecord genericRecord = message.getValue();
+                    if (keyValue) {
+                        KeyValue<GenericRecord, GenericRecord> keyValueObject = (KeyValue<GenericRecord, GenericRecord>) genericRecord.getNativeObject();
+                        GenericRecord key = keyValueObject.getKey();
+                        GenericRecord value = keyValueObject.getValue();
+                        key.getFields().forEach(f-> {
+                            log.info("key field {} value {}", f.getName(), key.getField(f.getName()));
+                        });
+                        value.getFields().forEach(f-> {
+                            log.info("value field {} value {}", f.getName(), value.getField(f.getName()));
+                        });
+                        assertEquals(i, key.getField("age"));
+                        assertEquals("foo" + i, key.getField("name"));
+
+                        if (removeAgeField) {
+                            // field "age" is removed from the schema
+                            assertFalse(value.getFields().stream().anyMatch(f -> f.getName().equals("age")));
+                        } else {
+                            assertEquals(i + 100, value.getField("age"));
+                        }
+                        assertEquals("bar" + i, value.getField("name"));
+                    } else {
+                        GenericRecord value = genericRecord;
+                        log.info("received value {}", value);
+                        value.getFields().forEach(f-> {
+                            log.info("value field {} value {}", f.getName(), value.getField(f.getName()));
+                        });
+
+                        if (removeAgeField) {
+                            // field "age" is removed from the schema
+                            assertFalse(value.getFields().stream().anyMatch(f -> f.getName().equals("age")));
+                        } else {
+                            assertEquals(i + 100, value.getField("age"));
+                        }
+                        assertEquals("bar" + i, value.getField("name"));
+                    }
+
+                    consumer.acknowledge(message);
+                    i++;
+                }
+            } while (message != null);
+        } finally {
+            pulsarCluster.dumpFunctionLogs(functionName);
+        }
+
+        deleteFunction(functionName);
+
+        getFunctionInfoNotFound(functionName);
+    }
+
     protected void testMergeFunction() throws Exception {
         log.info("start merge function test ...");
 
@@ -1636,26 +1756,31 @@ public abstract class PulsarFunctionsTest extends PulsarFunctionsTestBase {
 
         getFunctionStatus(functionName, topicMsgCntMap.keySet().size() * messagePerTopic, true);
 
-        Message<GenericRecord> message;
-        do {
-            message = consumer.receive(30, TimeUnit.SECONDS);
-            if (message != null) {
-                String baseTopic = message.getProperty("baseTopic");
-                GenericRecord genericRecord = message.getValue();
-                log.info("receive msg baseTopic: {}, schemaType: {}, nativeClass: {}, nativeObject: {}",
-                        baseTopic,
-                        genericRecord.getSchemaType(),
-                        genericRecord.getNativeObject().getClass(),
-                        genericRecord.getNativeObject());
-                checkSchemaForAutoSchema(message, baseTopic);
-                topicMsgCntMap.get(baseTopic).decrementAndGet();
-                consumer.acknowledge(message);
-            }
-        } while (message != null);
+        try {
 
-        for (Map.Entry<String, AtomicInteger> entry : topicMsgCntMap.entrySet()) {
-            assertEquals(entry.getValue().get(), 0,
-                    "topic " + entry.getKey() + " left message cnt is not 0.");
+            Message<GenericRecord> message;
+            do {
+                message = consumer.receive(30, TimeUnit.SECONDS);
+                if (message != null) {
+                    String baseTopic = message.getProperty("baseTopic");
+                    GenericRecord genericRecord = message.getValue();
+                    log.info("receive msg baseTopic: {}, schemaType: {}, nativeClass: {}, nativeObject: {}",
+                            baseTopic,
+                            genericRecord.getSchemaType(),
+                            genericRecord.getNativeObject().getClass(),
+                            genericRecord.getNativeObject());
+                    checkSchemaForAutoSchema(message, baseTopic);
+                    topicMsgCntMap.get(baseTopic).decrementAndGet();
+                    consumer.acknowledge(message);
+                }
+            } while (message != null);
+
+            for (Map.Entry<String, AtomicInteger> entry : topicMsgCntMap.entrySet()) {
+                assertEquals(entry.getValue().get(), 0,
+                        "topic " + entry.getKey() + " left message cnt is not 0.");
+            }
+        } finally {
+            pulsarCluster.dumpFunctionLogs(functionName);
         }
 
         deleteFunction(functionName);
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTestBase.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTestBase.java
index e594923..cbcdf89 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTestBase.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTestBase.java
@@ -42,6 +42,12 @@ public abstract class PulsarFunctionsTestBase extends PulsarTestSuite {
     public static final String EXCEPTION_JAVA_CLASS =
             "org.apache.pulsar.tests.integration.functions.ExceptionFunction";
 
+    public static final String GENERIC_OBJECT_FUNCTION_JAVA_CLASS =
+            "org.apache.pulsar.tests.integration.functions.GenericObjectFunction";
+
+    public static final String REMOVE_AVRO_FIELD_FUNCTION_JAVA_CLASS =
+            "org.apache.pulsar.tests.integration.functions.RemoveAvroFieldFunction";
+
     public static final String SERDE_JAVA_CLASS =
             "org.apache.pulsar.functions.api.examples.CustomBaseToBaseFunction";
 
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/java/PulsarFunctionsJavaTest.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/java/PulsarFunctionsJavaTest.java
index 2e35a70..46cb158 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/java/PulsarFunctionsJavaTest.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/java/PulsarFunctionsJavaTest.java
@@ -168,4 +168,24 @@ public class PulsarFunctionsJavaTest extends PulsarFunctionsTest {
 	    testMergeFunction();
    }
 
+    @Test(groups = {"java_function", "function"})
+    public void testGenericObjectFunction() throws Exception {
+        testGenericObjectFunction(GENERIC_OBJECT_FUNCTION_JAVA_CLASS, false, false);
+    }
+
+    @Test(groups = {"java_function", "function"})
+    public void testGenericObjectRemoveFiledFunction() throws Exception {
+        testGenericObjectFunction(REMOVE_AVRO_FIELD_FUNCTION_JAVA_CLASS, true, false);
+    }
+
+    @Test(groups = {"java_function", "function"})
+    public void testGenericObjectFunctionKeyValue() throws Exception {
+        testGenericObjectFunction(GENERIC_OBJECT_FUNCTION_JAVA_CLASS, false, true);
+    }
+
+    @Test(groups = {"java_function", "function"})
+    public void testGenericObjectRemoveFiledFunctionKeyValue() throws Exception {
+        testGenericObjectFunction(REMOVE_AVRO_FIELD_FUNCTION_JAVA_CLASS, true, true);
+    }
+
 }