You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by eo...@apache.org on 2022/06/17 11:36:46 UTC

[pulsar] branch master updated: [improve][function] Support Record as Function output type (#16041)

This is an automated email from the ASF dual-hosted git repository.

eolivelli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new b3c5191304a [improve][function] Support Record<?> as Function output type (#16041)
b3c5191304a is described below

commit b3c5191304ad4cc25a4a2292073a9ccc8191ea55
Author: Christophe Bornet <cb...@hotmail.com>
AuthorDate: Fri Jun 17 13:36:39 2022 +0200

    [improve][function] Support Record<?> as Function output type (#16041)
---
 .../org/apache/pulsar/functions/api/Context.java   |  10 ++
 .../pulsar/functions/api/utils/FunctionRecord.java | 117 +++++++++++++++++++++
 .../{SinkRecord.java => AbstractSinkRecord.java}   |  89 ++++------------
 .../pulsar/functions/instance/ContextImpl.java     |   6 ++
 .../functions/instance/JavaInstanceRunnable.java   |   8 +-
 .../functions/instance/OutputRecordSinkRecord.java |  94 +++++++++++++++++
 .../pulsar/functions/instance/SinkRecord.java      | 100 +++---------------
 .../apache/pulsar/functions/sink/PulsarSink.java   |  27 ++---
 .../pulsar/functions/instance/ContextImplTest.java |  68 ++++++++++++
 .../functions/api/examples/RecordFunction.java     |  43 ++++++++
 .../pulsar/functions/utils/FunctionCommon.java     |  13 +++
 .../pulsar/functions/utils/FunctionCommonTest.java |  69 ++++++++++++
 .../integration/functions/PulsarFunctionsTest.java |  61 +++++++++++
 .../functions/java/PulsarFunctionsJavaTest.java    |   9 +-
 14 files changed, 547 insertions(+), 167 deletions(-)

diff --git a/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/Context.java b/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/Context.java
index e45c2dec63e..36bdd0ef6ce 100644
--- a/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/Context.java
+++ b/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/Context.java
@@ -29,6 +29,7 @@ import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.api.TypedMessageBuilder;
 import org.apache.pulsar.common.classification.InterfaceAudience;
 import org.apache.pulsar.common.classification.InterfaceStability;
+import org.apache.pulsar.functions.api.utils.FunctionRecord;
 
 /**
  * Context provides contextual information to the executing function.
@@ -161,4 +162,13 @@ public interface Context extends BaseContext {
      * @throws PulsarClientException
      */
     <X> ConsumerBuilder<X> newConsumerBuilder(Schema<X> schema) throws PulsarClientException;
+
+    /**
+     * Creates a FunctionRecordBuilder initialized with values from this Context.
+     * It can be used in Functions to prepare a Record to return with default values taken from the Context and the
+     * input Record.
+     *
+     * @return the record builder instance
+     */
+    <X> FunctionRecord.FunctionRecordBuilder<X> newOutputRecordBuilder();
 }
diff --git a/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/utils/FunctionRecord.java b/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/utils/FunctionRecord.java
new file mode 100644
index 00000000000..be204a7bc21
--- /dev/null
+++ b/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/utils/FunctionRecord.java
@@ -0,0 +1,117 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.functions.api.utils;
+
+import java.util.Map;
+import java.util.Optional;
+import lombok.Builder;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.functions.api.Context;
+import org.apache.pulsar.functions.api.Record;
+
+@Builder(builderMethodName = "")
+public class FunctionRecord<T> implements Record<T> {
+
+    private final T value;
+    private final String topicName;
+    private final String destinationTopic;
+    private final Map<String, String> properties;
+    private final String key;
+    private final Schema<T> schema;
+    private final Long eventTime;
+    private final String partitionId;
+    private final Integer partitionIndex;
+    private final Long recordSequence;
+
+    /**
+     * Creates a builder for a Record from a Function Context.
+     * The builder is initialized with the output topic from the Context and with the topicName, key, eventTime,
+     * properties, partitionId, partitionIndex and recordSequence from the Context input Record.
+     * It doesn't initialize a Message at the moment.
+     *
+     * @param context a Function Context
+     * @param <T> type of Record to build
+     * @return a Record builder initialised with values from the Function Context
+     */
+     public static <T> FunctionRecord.FunctionRecordBuilder<T> from(Context context) {
+        Record<?> currentRecord = context.getCurrentRecord();
+        FunctionRecordBuilder<T> builder = new FunctionRecordBuilder<T>()
+                .destinationTopic(context.getOutputTopic())
+                .properties(currentRecord.getProperties());
+        currentRecord.getTopicName().ifPresent(builder::topicName);
+        currentRecord.getKey().ifPresent(builder::key);
+        currentRecord.getEventTime().ifPresent(builder::eventTime);
+        currentRecord.getPartitionId().ifPresent(builder::partitionId);
+        currentRecord.getPartitionIndex().ifPresent(builder::partitionIndex);
+        currentRecord.getRecordSequence().ifPresent(builder::recordSequence);
+
+        return builder;
+    }
+
+    @Override
+    public T getValue() {
+        return value;
+    }
+
+    @Override
+    public Optional<String> getTopicName() {
+        return Optional.ofNullable(topicName);
+    }
+
+    @Override
+    public Optional<String> getDestinationTopic() {
+        return Optional.ofNullable(destinationTopic);
+    }
+
+    @Override
+    public Map<String, String> getProperties() {
+        return properties;
+    }
+
+    @Override
+    public Optional<String> getKey() {
+        return Optional.ofNullable(key);
+    }
+
+    @Override
+    public Schema<T> getSchema() {
+        return schema;
+    }
+
+    @Override
+    public Optional<Long> getEventTime() {
+        return Optional.ofNullable(eventTime);
+    }
+
+    @Override
+    public Optional<String> getPartitionId() {
+        return Optional.ofNullable(partitionId);
+    }
+
+    @Override
+    public Optional<Integer> getPartitionIndex() {
+        return Optional.ofNullable(partitionIndex);
+    }
+
+    @Override
+    public Optional<Long> getRecordSequence() {
+        return Optional.ofNullable(recordSequence);
+    }
+
+}
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/SinkRecord.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/AbstractSinkRecord.java
similarity index 62%
copy from pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/SinkRecord.java
copy to pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/AbstractSinkRecord.java
index b922b988581..2adcbd0065f 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/SinkRecord.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/AbstractSinkRecord.java
@@ -18,12 +18,9 @@
  */
 package org.apache.pulsar.functions.instance;
 
-import java.util.Map;
 import java.util.Optional;
-import lombok.AllArgsConstructor;
-import lombok.Data;
-import lombok.extern.slf4j.Slf4j;
-import org.apache.pulsar.client.api.Message;
+import lombok.EqualsAndHashCode;
+import lombok.ToString;
 import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.impl.schema.AutoConsumeSchema;
 import org.apache.pulsar.client.impl.schema.KeyValueSchemaImpl;
@@ -31,51 +28,25 @@ import org.apache.pulsar.functions.api.KVRecord;
 import org.apache.pulsar.functions.api.Record;
 import org.apache.pulsar.functions.source.PulsarRecord;
 
-@Slf4j
-@Data
-@AllArgsConstructor
-public class SinkRecord<T> implements Record<T> {
+@EqualsAndHashCode
+@ToString
+public abstract class AbstractSinkRecord<T> implements Record<T> {
 
-    private final Record<T> sourceRecord;
-    private final T value;
+    private final Record<?> sourceRecord;
 
-    public Record<T> getSourceRecord() {
-        return sourceRecord;
-    }
-
-    @Override
-    public Optional<String> getTopicName() {
-        return sourceRecord.getTopicName();
+    protected AbstractSinkRecord(Record<?> sourceRecord) {
+        this.sourceRecord = sourceRecord;
     }
 
-    @Override
-    public Optional<String> getKey() {
-        return sourceRecord.getKey();
-    }
+    public abstract boolean shouldAlwaysSetMessageProperties();
 
-    @Override
-    public T getValue() {
-        return value;
-    }
-
-    @Override
-    public Optional<String> getPartitionId() {
-        return sourceRecord.getPartitionId();
-    }
-
-    @Override
-    public Optional<Integer> getPartitionIndex() {
-        return sourceRecord.getPartitionIndex();
+    public Record<?> getSourceRecord() {
+        return sourceRecord;
     }
 
     @Override
-    public Optional<Long> getRecordSequence() {
-        return sourceRecord.getRecordSequence();
-    }
-
-     @Override
-    public Map<String, String> getProperties() {
-        return sourceRecord.getProperties();
+    public Optional<String> getTopicName() {
+        return sourceRecord.getTopicName();
     }
 
     @Override
@@ -112,29 +83,23 @@ public class SinkRecord<T> implements Record<T> {
         sourceRecord.fail();
     }
 
-    @Override
-    public Optional<String> getDestinationTopic() {
-        return sourceRecord.getDestinationTopic();
-    }
-
-    @Override
-    public Schema<T> getSchema() {
-        if (sourceRecord == null) {
+    protected static <T> Schema<T> getRecordSchema(Record<T> record) {
+        if (record == null) {
             return null;
         }
 
-        if (sourceRecord.getSchema() != null) {
+        if (record.getSchema() != null) {
             // unwrap actual schema
-            Schema<T> schema = sourceRecord.getSchema();
+            Schema<T> schema = record.getSchema();
             // AutoConsumeSchema is a special schema, that comes into play
             // when the Sink is going to handle any Schema
             // usually you see Sink<GenericObject> or Sink<GenericRecord> in this case
             if (schema instanceof AutoConsumeSchema) {
                 // extract the Schema from the message, this is the most accurate schema we have
                 // see PIP-85
-                if (sourceRecord.getMessage().isPresent()
-                        && sourceRecord.getMessage().get().getReaderSchema().isPresent()) {
-                    schema = (Schema<T>) sourceRecord.getMessage().get().getReaderSchema().get();
+                if (record.getMessage().isPresent()
+                        && record.getMessage().get().getReaderSchema().isPresent()) {
+                    schema = (Schema<T>) record.getMessage().get().getReaderSchema().get();
                 } else {
                     schema = (Schema<T>) ((AutoConsumeSchema) schema).getInternalSchema();
                 }
@@ -142,22 +107,12 @@ public class SinkRecord<T> implements Record<T> {
             return schema;
         }
 
-        if (sourceRecord instanceof KVRecord) {
-            KVRecord kvRecord = (KVRecord) sourceRecord;
+        if (record instanceof KVRecord) {
+            KVRecord kvRecord = (KVRecord) record;
             return KeyValueSchemaImpl.of(kvRecord.getKeySchema(), kvRecord.getValueSchema(),
                     kvRecord.getKeyValueEncodingType());
         }
 
         return null;
     }
-
-    @Override
-    public Optional<Long> getEventTime() {
-        return sourceRecord.getEventTime();
-    }
-
-    @Override
-    public Optional<Message<T>> getMessage() {
-        return sourceRecord.getMessage();
-    }
 }
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java
index 79ff3531de1..336c2bd0be3 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java
@@ -62,6 +62,7 @@ import org.apache.pulsar.common.util.FutureUtil;
 import org.apache.pulsar.functions.api.Context;
 import org.apache.pulsar.functions.api.Record;
 import org.apache.pulsar.functions.api.StateStore;
+import org.apache.pulsar.functions.api.utils.FunctionRecord;
 import org.apache.pulsar.functions.instance.state.DefaultStateStore;
 import org.apache.pulsar.functions.instance.state.StateManager;
 import org.apache.pulsar.functions.instance.stats.ComponentStatsManager;
@@ -481,6 +482,11 @@ class ContextImpl implements Context, SinkContext, SourceContext, AutoCloseable
         return this.client.newConsumer(schema);
     }
 
+    @Override
+    public <X> FunctionRecord.FunctionRecordBuilder<X> newOutputRecordBuilder() {
+        return FunctionRecord.from(this);
+    }
+
     @Override
     public SubscriptionType getSubscriptionType() {
         return subscriptionType;
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
index 56bd64a7107..61f809a4cbc 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
@@ -375,8 +375,14 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable {
         if (componentType == org.apache.pulsar.functions.proto.Function.FunctionDetails.ComponentType.SINK) {
             Thread.currentThread().setContextClassLoader(functionClassLoader);
         }
+        AbstractSinkRecord<?> sinkRecord;
+        if (output instanceof Record) {
+            sinkRecord = new OutputRecordSinkRecord<>(srcRecord, (Record) output);
+        } else {
+            sinkRecord = new SinkRecord<>(srcRecord, output);
+        }
         try {
-            this.sink.write(new SinkRecord<>(srcRecord, output));
+            this.sink.write(sinkRecord);
         } catch (Exception e) {
             log.info("Encountered exception in sink write: ", e);
             stats.incrSinkExceptions(e);
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/OutputRecordSinkRecord.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/OutputRecordSinkRecord.java
new file mode 100644
index 00000000000..6220517414d
--- /dev/null
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/OutputRecordSinkRecord.java
@@ -0,0 +1,94 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.functions.instance;
+
+import java.util.Map;
+import java.util.Optional;
+import lombok.EqualsAndHashCode;
+import lombok.ToString;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.functions.api.Record;
+
+@EqualsAndHashCode(callSuper = true)
+@ToString
+class OutputRecordSinkRecord<T> extends AbstractSinkRecord<T> {
+
+    private final Record<T> sinkRecord;
+
+    OutputRecordSinkRecord(Record<T> sourceRecord, Record<T> sinkRecord) {
+        super(sourceRecord);
+        this.sinkRecord = sinkRecord;
+    }
+
+    @Override
+    public Optional<String> getKey() {
+        return sinkRecord.getKey();
+    }
+
+    @Override
+    public T getValue() {
+        return sinkRecord.getValue();
+    }
+
+    @Override
+    public Optional<String> getPartitionId() {
+        return sinkRecord.getPartitionId();
+    }
+
+    @Override
+    public Optional<Integer> getPartitionIndex() {
+        return sinkRecord.getPartitionIndex();
+    }
+
+    @Override
+    public Optional<Long> getRecordSequence() {
+        return sinkRecord.getRecordSequence();
+    }
+
+     @Override
+    public Map<String, String> getProperties() {
+        return sinkRecord.getProperties();
+    }
+
+    @Override
+    public Optional<String> getDestinationTopic() {
+        return sinkRecord.getDestinationTopic();
+    }
+
+    @Override
+    public Schema<T> getSchema() {
+        return getRecordSchema(sinkRecord);
+    }
+
+    @Override
+    public Optional<Long> getEventTime() {
+        return sinkRecord.getEventTime();
+    }
+
+    @Override
+    public Optional<Message<T>> getMessage() {
+        return sinkRecord.getMessage();
+    }
+
+    @Override
+    public boolean shouldAlwaysSetMessageProperties() {
+        return true;
+    }
+}
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/SinkRecord.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/SinkRecord.java
index b922b988581..8f64ed2ce09 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/SinkRecord.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/SinkRecord.java
@@ -20,32 +20,22 @@ package org.apache.pulsar.functions.instance;
 
 import java.util.Map;
 import java.util.Optional;
-import lombok.AllArgsConstructor;
-import lombok.Data;
-import lombok.extern.slf4j.Slf4j;
+import lombok.EqualsAndHashCode;
+import lombok.ToString;
 import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.Schema;
-import org.apache.pulsar.client.impl.schema.AutoConsumeSchema;
-import org.apache.pulsar.client.impl.schema.KeyValueSchemaImpl;
-import org.apache.pulsar.functions.api.KVRecord;
 import org.apache.pulsar.functions.api.Record;
-import org.apache.pulsar.functions.source.PulsarRecord;
-
-@Slf4j
-@Data
-@AllArgsConstructor
-public class SinkRecord<T> implements Record<T> {
 
+@EqualsAndHashCode(callSuper = true)
+@ToString
+public class SinkRecord<T> extends AbstractSinkRecord<T> {
     private final Record<T> sourceRecord;
     private final T value;
 
-    public Record<T> getSourceRecord() {
-        return sourceRecord;
-    }
-
-    @Override
-    public Optional<String> getTopicName() {
-        return sourceRecord.getTopicName();
+    public SinkRecord(Record<T> sourceRecord, T value) {
+        super(sourceRecord);
+        this.sourceRecord = sourceRecord;
+        this.value = value;
     }
 
     @Override
@@ -73,45 +63,11 @@ public class SinkRecord<T> implements Record<T> {
         return sourceRecord.getRecordSequence();
     }
 
-     @Override
+    @Override
     public Map<String, String> getProperties() {
         return sourceRecord.getProperties();
     }
 
-    @Override
-    public void ack() {
-        sourceRecord.ack();
-    }
-
-    /**
-     * Some sink sometimes wants to control the ack type.
-     */
-    public void cumulativeAck() {
-        if (sourceRecord instanceof PulsarRecord) {
-            PulsarRecord pulsarRecord = (PulsarRecord) sourceRecord;
-            pulsarRecord.cumulativeAck();
-        } else {
-            throw new RuntimeException("SourceRecord class type must be PulsarRecord");
-        }
-    }
-
-    /**
-     * Some sink sometimes wants to control the ack type.
-     */
-    public void individualAck() {
-        if (sourceRecord instanceof PulsarRecord) {
-            PulsarRecord pulsarRecord = (PulsarRecord) sourceRecord;
-            pulsarRecord.individualAck();
-        } else {
-            throw new RuntimeException("SourceRecord class type must be PulsarRecord");
-        }
-    }
-
-    @Override
-    public void fail() {
-        sourceRecord.fail();
-    }
-
     @Override
     public Optional<String> getDestinationTopic() {
         return sourceRecord.getDestinationTopic();
@@ -119,36 +75,7 @@ public class SinkRecord<T> implements Record<T> {
 
     @Override
     public Schema<T> getSchema() {
-        if (sourceRecord == null) {
-            return null;
-        }
-
-        if (sourceRecord.getSchema() != null) {
-            // unwrap actual schema
-            Schema<T> schema = sourceRecord.getSchema();
-            // AutoConsumeSchema is a special schema, that comes into play
-            // when the Sink is going to handle any Schema
-            // usually you see Sink<GenericObject> or Sink<GenericRecord> in this case
-            if (schema instanceof AutoConsumeSchema) {
-                // extract the Schema from the message, this is the most accurate schema we have
-                // see PIP-85
-                if (sourceRecord.getMessage().isPresent()
-                        && sourceRecord.getMessage().get().getReaderSchema().isPresent()) {
-                    schema = (Schema<T>) sourceRecord.getMessage().get().getReaderSchema().get();
-                } else {
-                    schema = (Schema<T>) ((AutoConsumeSchema) schema).getInternalSchema();
-                }
-            }
-            return schema;
-        }
-
-        if (sourceRecord instanceof KVRecord) {
-            KVRecord kvRecord = (KVRecord) sourceRecord;
-            return KeyValueSchemaImpl.of(kvRecord.getKeySchema(), kvRecord.getValueSchema(),
-                    kvRecord.getKeyValueEncodingType());
-        }
-
-        return null;
+        return getRecordSchema(sourceRecord);
     }
 
     @Override
@@ -160,4 +87,9 @@ public class SinkRecord<T> implements Record<T> {
     public Optional<Message<T>> getMessage() {
         return sourceRecord.getMessage();
     }
+
+    @Override
+    public boolean shouldAlwaysSetMessageProperties() {
+        return false;
+    }
 }
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java
index ac08370a31e..80aa49e05ae 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java
@@ -59,8 +59,8 @@ import org.apache.pulsar.common.schema.KeyValueEncodingType;
 import org.apache.pulsar.common.schema.SchemaType;
 import org.apache.pulsar.common.util.Reflections;
 import org.apache.pulsar.functions.api.Record;
+import org.apache.pulsar.functions.instance.AbstractSinkRecord;
 import org.apache.pulsar.functions.instance.FunctionResultRouter;
-import org.apache.pulsar.functions.instance.SinkRecord;
 import org.apache.pulsar.functions.instance.stats.ComponentStatsManager;
 import org.apache.pulsar.functions.source.PulsarRecord;
 import org.apache.pulsar.functions.source.TopicSchema;
@@ -85,9 +85,9 @@ public class PulsarSink<T> implements Sink<T> {
 
     private interface PulsarSinkProcessor<T> {
 
-        TypedMessageBuilder<T> newMessage(SinkRecord<T> record);
+        TypedMessageBuilder<T> newMessage(AbstractSinkRecord<T> record);
 
-        void sendOutputMessage(TypedMessageBuilder<T> msg, SinkRecord<T> record);
+        void sendOutputMessage(TypedMessageBuilder<T> msg, AbstractSinkRecord<T> record);
 
         void close() throws Exception;
     }
@@ -183,17 +183,17 @@ public class PulsarSink<T> implements Sink<T> {
             }
         }
 
-        public Function<Throwable, Void> getPublishErrorHandler(SinkRecord<T> record, boolean failSource) {
+        public Function<Throwable, Void> getPublishErrorHandler(AbstractSinkRecord<T> record, boolean failSource) {
 
             return throwable -> {
-                Record<T> srcRecord = record.getSourceRecord();
+                Record<?> srcRecord = record.getSourceRecord();
                 if (failSource) {
                     srcRecord.fail();
                 }
 
                 String topic = record.getDestinationTopic().orElse(pulsarSinkConfig.getTopic());
 
-                String errorMsg = null;
+                String errorMsg;
                 if (srcRecord instanceof PulsarRecord) {
                     errorMsg = String.format("Failed to publish to topic [%s] with error [%s] with src message id [%s]",
                             topic, throwable.getMessage(), ((PulsarRecord) srcRecord).getMessageId());
@@ -234,7 +234,7 @@ public class PulsarSink<T> implements Sink<T> {
         }
 
         @Override
-        public TypedMessageBuilder<T> newMessage(SinkRecord<T> record) {
+        public TypedMessageBuilder<T> newMessage(AbstractSinkRecord<T> record) {
             Schema<T> schemaToWrite = record.getSchema();
             if (record.getSourceRecord() instanceof PulsarRecord) {
                 // we are receiving data directly from another Pulsar topic
@@ -256,7 +256,7 @@ public class PulsarSink<T> implements Sink<T> {
         }
 
         @Override
-        public void sendOutputMessage(TypedMessageBuilder<T> msg, SinkRecord<T> record) {
+        public void sendOutputMessage(TypedMessageBuilder<T> msg, AbstractSinkRecord<T> record) {
             msg.sendAsync().thenAccept(messageId -> {
                 //no op
             }).exceptionally(getPublishErrorHandler(record, false));
@@ -270,7 +270,7 @@ public class PulsarSink<T> implements Sink<T> {
         }
 
         @Override
-        public void sendOutputMessage(TypedMessageBuilder<T> msg, SinkRecord<T> record) {
+        public void sendOutputMessage(TypedMessageBuilder<T> msg, AbstractSinkRecord<T> record) {
             msg.sendAsync()
                     .thenAccept(messageId -> record.ack())
                     .exceptionally(getPublishErrorHandler(record, true));
@@ -285,7 +285,7 @@ public class PulsarSink<T> implements Sink<T> {
         }
 
         @Override
-        public TypedMessageBuilder<T> newMessage(SinkRecord<T> record) {
+        public TypedMessageBuilder<T> newMessage(AbstractSinkRecord<T> record) {
             if (!record.getPartitionId().isPresent()) {
                 throw new RuntimeException(
                         "PartitionId needs to be specified for every record while in Effectively-once mode");
@@ -311,7 +311,7 @@ public class PulsarSink<T> implements Sink<T> {
         }
 
         @Override
-        public void sendOutputMessage(TypedMessageBuilder<T> msg, SinkRecord<T> record) {
+        public void sendOutputMessage(TypedMessageBuilder<T> msg, AbstractSinkRecord<T> record) {
 
             if (!record.getRecordSequence().isPresent()) {
                 throw new RuntimeException(
@@ -367,7 +367,7 @@ public class PulsarSink<T> implements Sink<T> {
 
     @Override
     public void write(Record<T> record) {
-        SinkRecord<T> sinkRecord = (SinkRecord<T>) record;
+        AbstractSinkRecord<T> sinkRecord = (AbstractSinkRecord<T>) record;
         TypedMessageBuilder<T> msg = pulsarSinkProcessor.newMessage(sinkRecord);
 
         if (record.getKey().isPresent() && !(record.getSchema() instanceof KeyValueSchema
@@ -377,7 +377,8 @@ public class PulsarSink<T> implements Sink<T> {
 
         msg.value(record.getValue());
 
-        if (!record.getProperties().isEmpty() && pulsarSinkConfig.isForwardSourceMessageProperty()) {
+        if (!record.getProperties().isEmpty()
+            && (sinkRecord.shouldAlwaysSetMessageProperties() || pulsarSinkConfig.isForwardSourceMessageProperty())) {
             msg.properties(record.getProperties());
         }
 
diff --git a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/ContextImplTest.java b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/ContextImplTest.java
index 41cb550250a..e2e34f4ff91 100644
--- a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/ContextImplTest.java
+++ b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/ContextImplTest.java
@@ -30,8 +30,12 @@ import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNull;
+import static org.testng.Assert.assertTrue;
 import java.nio.ByteBuffer;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import org.apache.pulsar.client.admin.PulsarAdmin;
@@ -351,4 +355,68 @@ public class ContextImplTest {
         }
 
     }
+
+    @Test
+    public void testNewOutputRecordBuilder() {
+        Map<String, String> properties = new HashMap<>();
+        properties.put("prop-key", "prop-value");
+        long now = System.currentTimeMillis();
+        context.setCurrentMessageContext(new Record<String>() {
+            @Override
+            public Optional<String> getTopicName() {
+                return Optional.of("input-topic");
+            }
+
+            @Override
+            public Optional<String> getKey() {
+                return Optional.of("input-key");
+            }
+
+            @Override
+            public Schema<String> getSchema() {
+                return Schema.STRING;
+            }
+
+            @Override
+            public String getValue() {
+                return "input-value";
+            }
+
+            @Override
+            public Optional<Long> getEventTime() {
+                return Optional.of(now);
+            }
+
+            @Override
+            public Optional<String> getPartitionId() {
+                return Optional.of("input-partition-id");
+            }
+
+            @Override
+            public Optional<Integer> getPartitionIndex() {
+                return Optional.of(42);
+            }
+
+            @Override
+            public Optional<Long> getRecordSequence() {
+                return Optional.of(43L);
+            }
+
+            @Override
+            public Map<String, String> getProperties() {
+                return properties;
+            }
+        });
+        Record<Integer> record = context.<Integer>newOutputRecordBuilder().build();
+        assertEquals(record.getTopicName().get(), "input-topic");
+        assertEquals(record.getKey().get(), "input-key");
+        assertEquals(record.getEventTime(), Optional.of(now));
+        assertEquals(record.getPartitionId().get(), "input-partition-id");
+        assertEquals(record.getPartitionIndex(), Optional.of(42));
+        assertEquals(record.getRecordSequence(), Optional.of(43L));
+        assertTrue(record.getProperties().containsKey("prop-key"));
+        assertEquals(record.getProperties().get("prop-key"), "prop-value");
+        assertNull(record.getValue());
+        assertNull(record.getSchema());
+    }
 }
diff --git a/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/RecordFunction.java b/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/RecordFunction.java
new file mode 100644
index 00000000000..028bccae5fc
--- /dev/null
+++ b/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/RecordFunction.java
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.functions.api.examples;
+
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.pulsar.functions.api.Context;
+import org.apache.pulsar.functions.api.Function;
+import org.apache.pulsar.functions.api.Record;
+
+public class RecordFunction implements Function<String, Record<String>> {
+
+    @Override
+    public Record<String> process(String input, Context context) throws Exception {
+        String publishTopic = (String) context.getUserConfigValueOrDefault("publish-topic", "publishtopic");
+        String output = String.format("%s!", input);
+
+        Map<String, String> properties = new HashMap<>(context.getCurrentRecord().getProperties());
+        context.getCurrentRecord().getTopicName().ifPresent(topic -> properties.put("input_topic", topic));
+
+        return context.<String>newOutputRecordBuilder()
+                .destinationTopic(publishTopic)
+                .value(output)
+                .properties(properties)
+                .build();
+    }
+}
diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionCommon.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionCommon.java
index 02202ae7978..e4233acd4c4 100644
--- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionCommon.java
+++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionCommon.java
@@ -54,6 +54,7 @@ import org.apache.pulsar.common.nar.NarClassLoader;
 import org.apache.pulsar.common.nar.NarClassLoaderBuilder;
 import org.apache.pulsar.common.util.ClassLoaderUtils;
 import org.apache.pulsar.functions.api.Function;
+import org.apache.pulsar.functions.api.Record;
 import org.apache.pulsar.functions.api.WindowFunction;
 import org.apache.pulsar.functions.proto.Function.FunctionDetails.Runtime;
 import org.apache.pulsar.functions.utils.io.ConnectorUtils;
@@ -120,8 +121,20 @@ public class FunctionCommon {
         } else {
             if (Function.class.isAssignableFrom(userClass)) {
                 typeArgs = TypeResolver.resolveRawArguments(Function.class, userClass);
+                if (typeArgs[1].equals(Record.class)) {
+                    Type type = TypeResolver.resolveGenericType(Function.class, userClass);
+                    Type recordType = ((ParameterizedType) type).getActualTypeArguments()[1];
+                    Type actualInputType = ((ParameterizedType) recordType).getActualTypeArguments()[0];
+                    typeArgs[1] = (Class<?>) actualInputType;
+                }
             } else {
                 typeArgs = TypeResolver.resolveRawArguments(java.util.function.Function.class, userClass);
+                if (typeArgs[1].equals(Record.class)) {
+                    Type type = TypeResolver.resolveGenericType(java.util.function.Function.class, userClass);
+                    Type recordType = ((ParameterizedType) type).getActualTypeArguments()[1];
+                    Type actualInputType = ((ParameterizedType) recordType).getActualTypeArguments()[0];
+                    typeArgs[1] = (Class<?>) actualInputType;
+                }
             }
         }
 
diff --git a/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/FunctionCommonTest.java b/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/FunctionCommonTest.java
index 0e34e2997c5..3a1f69d4bfd 100644
--- a/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/FunctionCommonTest.java
+++ b/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/FunctionCommonTest.java
@@ -19,9 +19,16 @@
 
 package org.apache.pulsar.functions.utils;
 
+import java.util.Collection;
 import org.apache.pulsar.client.impl.MessageIdImpl;
 import org.apache.pulsar.common.util.FutureUtil;
+import org.apache.pulsar.functions.api.Context;
+import org.apache.pulsar.functions.api.Function;
+import org.apache.pulsar.functions.api.Record;
+import org.apache.pulsar.functions.api.WindowContext;
+import org.apache.pulsar.functions.api.WindowFunction;
 import org.testng.Assert;
+import org.testng.annotations.DataProvider;
 import org.testng.annotations.Test;
 
 import java.io.File;
@@ -101,4 +108,66 @@ public class FunctionCommonTest {
         assertEquals(lid, id.getLedgerId());
         assertEquals(eid, id.getEntryId());
     }
+
+    @DataProvider(name = "function")
+    public Object[][] functionProvider() {
+        return new Object[][] {
+            {
+                new Function<String, Integer>() {
+                    @Override
+                    public Integer process(String input, Context context) throws Exception {
+                        return null;
+                    }
+                }, false
+            },
+            {
+                new Function<String, Record<Integer>>() {
+                    @Override
+                    public Record<Integer> process(String input, Context context) throws Exception {
+                        return null;
+                    }
+                }, false
+            },
+            {
+                new java.util.function.Function<String, Integer>() {
+                    @Override
+                    public Integer apply(String s) {
+                        return null;
+                    }
+                }, false
+            },
+            {
+                new java.util.function.Function<String, Record<Integer>>() {
+                    @Override
+                    public Record<Integer> apply(String s) {
+                        return null;
+                    }
+                }, false
+            },
+            {
+                new WindowFunction<String, Integer>() {
+                    @Override
+                    public Integer process(Collection<Record<String>> input, WindowContext context) throws Exception {
+                        return null;
+                    }
+                }, true
+            },
+            {
+                new java.util.function.Function<Collection<String>, Integer>() {
+                    @Override
+                    public Integer apply(Collection<String> strings) {
+                        return null;
+                    }
+                }, true
+            }
+        };
+    }
+
+    @Test(dataProvider = "function")
+    public void testGetFunctionTypes(Object function, boolean isWindowConfigPresent) {
+        Class<?>[] types = FunctionCommon.getFunctionTypes(function.getClass(), isWindowConfigPresent);
+        assertEquals(types.length, 2);
+        assertEquals(types[0], String.class);
+        assertEquals(types[1], Integer.class);
+    }
 }
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
index 9a6a26be3da..d674e6d5101 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
@@ -72,6 +72,7 @@ import org.apache.pulsar.functions.api.examples.AutoSchemaFunction;
 import org.apache.pulsar.functions.api.examples.AvroSchemaTestFunction;
 import org.apache.pulsar.functions.api.examples.MergeTopicFunction;
 import org.apache.pulsar.functions.api.examples.InitializableFunction;
+import org.apache.pulsar.functions.api.examples.RecordFunction;
 import org.apache.pulsar.functions.api.examples.pojo.AvroTestObject;
 import org.apache.pulsar.functions.api.examples.pojo.Users;
 import org.apache.pulsar.functions.api.examples.serde.CustomObject;
@@ -1715,6 +1716,66 @@ public abstract class PulsarFunctionsTest extends PulsarFunctionsTestBase {
         getFunctionInfoNotFound(functionName);
     }
 
+    protected void testRecordFunction() throws Exception {
+        log.info("start RecordFunction function test ...");
+
+        String ns = "public/ns-recordfunction-" + randomName(8);
+        @Cleanup
+        PulsarAdmin pulsarAdmin = getPulsarAdmin();
+        pulsarAdmin.namespaces().createNamespace(ns);
+
+        @Cleanup
+        PulsarClient pulsarClient = getPulsarClient();
+
+        final int numMessages = 10;
+        final String inputTopic = ns + "/test-string-input-" + randomName(8);
+        final String outputTopic = ns + "/test-string-output-" + randomName(8);
+        @Cleanup
+        Consumer<String> consumer = pulsarClient
+                .newConsumer(Schema.STRING)
+                .subscriptionName("test")
+                .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
+                .topic("publishtopic")
+                .subscribe();
+
+        final String functionName = "test-record-fn-" + randomName(8);
+        submitFunction(
+                Runtime.JAVA,
+                inputTopic,
+                outputTopic,
+                functionName,
+                null,
+                RecordFunction.class.getName(),
+                Schema.AUTO_CONSUME());
+        try {
+            @Cleanup
+            Producer<String> producer = pulsarClient
+                    .newProducer(Schema.STRING)
+                    .topic(inputTopic)
+                    .create();
+            for (int i = 0; i < numMessages; i++) {
+                producer.send("message" + i);
+            }
+
+            getFunctionInfoSuccess(functionName);
+
+            getFunctionStatus(functionName, numMessages, true);
+
+            for (int i = 0; i < numMessages; i++) {
+                Message<String> msg = consumer.receive(30, TimeUnit.SECONDS);
+                log.info("Received: {}", msg.getValue());
+                assertEquals(msg.getValue(), "message" + i + "!");
+                assertEquals(msg.getProperty("input_topic"), "persistent://" + inputTopic);
+            }
+        } finally {
+            pulsarCluster.dumpFunctionLogs(functionName);
+        }
+
+        deleteFunction(functionName);
+
+        getFunctionInfoNotFound(functionName);
+    }
+
     protected void testMergeFunction() throws Exception {
         log.info("start merge function test ...");
 
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/java/PulsarFunctionsJavaTest.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/java/PulsarFunctionsJavaTest.java
index 46cb15892a1..6e496214219 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/java/PulsarFunctionsJavaTest.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/java/PulsarFunctionsJavaTest.java
@@ -165,8 +165,8 @@ public class PulsarFunctionsJavaTest extends PulsarFunctionsTest {
 
     @Test(groups = {"java_function", "function"})
     public void testMergeFunctionTest() throws Exception {
-	    testMergeFunction();
-   }
+        testMergeFunction();
+    }
 
     @Test(groups = {"java_function", "function"})
     public void testGenericObjectFunction() throws Exception {
@@ -188,4 +188,9 @@ public class PulsarFunctionsJavaTest extends PulsarFunctionsTest {
         testGenericObjectFunction(REMOVE_AVRO_FIELD_FUNCTION_JAVA_CLASS, true, true);
     }
 
+    @Test(groups = {"java_function", "function"})
+    public void testRecordFunctionTest() throws Exception {
+        testRecordFunction();
+    }
+
 }