You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by te...@apache.org on 2022/08/19 01:36:36 UTC

[pulsar] branch master updated: [fix][functions] Make mandatory to provide a schema in Context::newOutputRecordBuilder (#17118)

This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new e3c2dbfb662 [fix][functions] Make mandatory to provide a schema in Context::newOutputRecordBuilder (#17118)
e3c2dbfb662 is described below

commit e3c2dbfb66277cbcd8177dcf26dc225bc3b302d2
Author: Christophe Bornet <cb...@hotmail.com>
AuthorDate: Fri Aug 19 03:36:29 2022 +0200

    [fix][functions] Make mandatory to provide a schema in Context::newOutputRecordBuilder (#17118)
---
 .../src/main/java/org/apache/pulsar/functions/api/Context.java |  6 ++++--
 .../org/apache/pulsar/functions/api/utils/FunctionRecord.java  | 10 +++++++---
 .../java/org/apache/pulsar/functions/instance/ContextImpl.java |  4 ++--
 .../org/apache/pulsar/functions/instance/ContextImplTest.java  |  4 ++--
 .../apache/pulsar/functions/api/examples/RecordFunction.java   |  3 ++-
 5 files changed, 17 insertions(+), 10 deletions(-)

diff --git a/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/Context.java b/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/Context.java
index eeaf6f4e20f..2a89fecbc65 100644
--- a/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/Context.java
+++ b/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/Context.java
@@ -169,8 +169,10 @@ public interface Context extends BaseContext {
      * Creates a FunctionRecordBuilder initialized with values from this Context.
      * It can be used in Functions to prepare a Record to return with default values taken from the Context and the
      * input Record.
-     *
+
+     * @param schema provide a way to convert between serialized data and domain objects
+     * @param <X> the message type of record builder
      * @return the record builder instance
      */
-    <X> FunctionRecord.FunctionRecordBuilder<X> newOutputRecordBuilder();
+    <X> FunctionRecord.FunctionRecordBuilder<X> newOutputRecordBuilder(Schema<X> schema);
 }
diff --git a/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/utils/FunctionRecord.java b/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/utils/FunctionRecord.java
index be204a7bc21..c7cb1a6480a 100644
--- a/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/utils/FunctionRecord.java
+++ b/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/utils/FunctionRecord.java
@@ -49,11 +49,15 @@ public class FunctionRecord<T> implements Record<T> {
      * @param <T> type of Record to build
      * @return a Record builder initialised with values from the Function Context
      */
-     public static <T> FunctionRecord.FunctionRecordBuilder<T> from(Context context) {
+    public static <T> FunctionRecord.FunctionRecordBuilder<T> from(Context context, Schema<T> schema) {
+        if (schema == null) {
+            throw new IllegalArgumentException("Schema should not be null.");
+        }
         Record<?> currentRecord = context.getCurrentRecord();
         FunctionRecordBuilder<T> builder = new FunctionRecordBuilder<T>()
-                .destinationTopic(context.getOutputTopic())
-                .properties(currentRecord.getProperties());
+            .schema(schema)
+            .destinationTopic(context.getOutputTopic())
+            .properties(currentRecord.getProperties());
         currentRecord.getTopicName().ifPresent(builder::topicName);
         currentRecord.getKey().ifPresent(builder::key);
         currentRecord.getEventTime().ifPresent(builder::eventTime);
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java
index 31d8385c1fc..914275eb163 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java
@@ -491,8 +491,8 @@ class ContextImpl implements Context, SinkContext, SourceContext, AutoCloseable
     }
 
     @Override
-    public <X> FunctionRecord.FunctionRecordBuilder<X> newOutputRecordBuilder() {
-        return FunctionRecord.from(this);
+    public <X> FunctionRecord.FunctionRecordBuilder<X> newOutputRecordBuilder(Schema<X> schema) {
+        return FunctionRecord.from(this, schema);
     }
 
     @Override
diff --git a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/ContextImplTest.java b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/ContextImplTest.java
index a5fe51d5403..8f28ce40105 100644
--- a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/ContextImplTest.java
+++ b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/ContextImplTest.java
@@ -423,7 +423,8 @@ public class ContextImplTest {
                 return properties;
             }
         });
-        Record<Integer> record = context.<Integer>newOutputRecordBuilder().build();
+        Record<Integer> record = context.newOutputRecordBuilder(Schema.INT32).build();
+        assertEquals(record.getSchema(), Schema.INT32);
         assertEquals(record.getTopicName().get(), "input-topic");
         assertEquals(record.getKey().get(), "input-key");
         assertEquals(record.getEventTime(), Optional.of(now));
@@ -433,6 +434,5 @@ public class ContextImplTest {
         assertTrue(record.getProperties().containsKey("prop-key"));
         assertEquals(record.getProperties().get("prop-key"), "prop-value");
         assertNull(record.getValue());
-        assertNull(record.getSchema());
     }
 }
diff --git a/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/RecordFunction.java b/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/RecordFunction.java
index 028bccae5fc..55adf848da5 100644
--- a/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/RecordFunction.java
+++ b/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/RecordFunction.java
@@ -20,6 +20,7 @@ package org.apache.pulsar.functions.api.examples;
 
 import java.util.HashMap;
 import java.util.Map;
+import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.functions.api.Context;
 import org.apache.pulsar.functions.api.Function;
 import org.apache.pulsar.functions.api.Record;
@@ -34,7 +35,7 @@ public class RecordFunction implements Function<String, Record<String>> {
         Map<String, String> properties = new HashMap<>(context.getCurrentRecord().getProperties());
         context.getCurrentRecord().getTopicName().ifPresent(topic -> properties.put("input_topic", topic));
 
-        return context.<String>newOutputRecordBuilder()
+        return context.newOutputRecordBuilder(Schema.STRING)
                 .destinationTopic(publishTopic)
                 .value(output)
                 .properties(properties)