You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by te...@apache.org on 2022/08/19 01:36:36 UTC
[pulsar] branch master updated: [fix][functions] Make mandatory to provide a schema in Context::newOutputRecordBuilder (#17118)
This is an automated email from the ASF dual-hosted git repository.
technoboy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new e3c2dbfb662 [fix][functions] Make mandatory to provide a schema in Context::newOutputRecordBuilder (#17118)
e3c2dbfb662 is described below
commit e3c2dbfb66277cbcd8177dcf26dc225bc3b302d2
Author: Christophe Bornet <cb...@hotmail.com>
AuthorDate: Fri Aug 19 03:36:29 2022 +0200
[fix][functions] Make mandatory to provide a schema in Context::newOutputRecordBuilder (#17118)
---
.../src/main/java/org/apache/pulsar/functions/api/Context.java | 6 ++++--
.../org/apache/pulsar/functions/api/utils/FunctionRecord.java | 10 +++++++---
.../java/org/apache/pulsar/functions/instance/ContextImpl.java | 4 ++--
.../org/apache/pulsar/functions/instance/ContextImplTest.java | 4 ++--
.../apache/pulsar/functions/api/examples/RecordFunction.java | 3 ++-
5 files changed, 17 insertions(+), 10 deletions(-)
diff --git a/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/Context.java b/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/Context.java
index eeaf6f4e20f..2a89fecbc65 100644
--- a/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/Context.java
+++ b/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/Context.java
@@ -169,8 +169,10 @@ public interface Context extends BaseContext {
* Creates a FunctionRecordBuilder initialized with values from this Context.
* It can be used in Functions to prepare a Record to return with default values taken from the Context and the
* input Record.
- *
+
+ * @param schema provide a way to convert between serialized data and domain objects
+ * @param <X> the message type of record builder
* @return the record builder instance
*/
- <X> FunctionRecord.FunctionRecordBuilder<X> newOutputRecordBuilder();
+ <X> FunctionRecord.FunctionRecordBuilder<X> newOutputRecordBuilder(Schema<X> schema);
}
diff --git a/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/utils/FunctionRecord.java b/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/utils/FunctionRecord.java
index be204a7bc21..c7cb1a6480a 100644
--- a/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/utils/FunctionRecord.java
+++ b/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/utils/FunctionRecord.java
@@ -49,11 +49,15 @@ public class FunctionRecord<T> implements Record<T> {
* @param <T> type of Record to build
* @return a Record builder initialised with values from the Function Context
*/
- public static <T> FunctionRecord.FunctionRecordBuilder<T> from(Context context) {
+ public static <T> FunctionRecord.FunctionRecordBuilder<T> from(Context context, Schema<T> schema) {
+ if (schema == null) {
+ throw new IllegalArgumentException("Schema should not be null.");
+ }
Record<?> currentRecord = context.getCurrentRecord();
FunctionRecordBuilder<T> builder = new FunctionRecordBuilder<T>()
- .destinationTopic(context.getOutputTopic())
- .properties(currentRecord.getProperties());
+ .schema(schema)
+ .destinationTopic(context.getOutputTopic())
+ .properties(currentRecord.getProperties());
currentRecord.getTopicName().ifPresent(builder::topicName);
currentRecord.getKey().ifPresent(builder::key);
currentRecord.getEventTime().ifPresent(builder::eventTime);
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java
index 31d8385c1fc..914275eb163 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java
@@ -491,8 +491,8 @@ class ContextImpl implements Context, SinkContext, SourceContext, AutoCloseable
}
@Override
- public <X> FunctionRecord.FunctionRecordBuilder<X> newOutputRecordBuilder() {
- return FunctionRecord.from(this);
+ public <X> FunctionRecord.FunctionRecordBuilder<X> newOutputRecordBuilder(Schema<X> schema) {
+ return FunctionRecord.from(this, schema);
}
@Override
diff --git a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/ContextImplTest.java b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/ContextImplTest.java
index a5fe51d5403..8f28ce40105 100644
--- a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/ContextImplTest.java
+++ b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/ContextImplTest.java
@@ -423,7 +423,8 @@ public class ContextImplTest {
return properties;
}
});
- Record<Integer> record = context.<Integer>newOutputRecordBuilder().build();
+ Record<Integer> record = context.newOutputRecordBuilder(Schema.INT32).build();
+ assertEquals(record.getSchema(), Schema.INT32);
assertEquals(record.getTopicName().get(), "input-topic");
assertEquals(record.getKey().get(), "input-key");
assertEquals(record.getEventTime(), Optional.of(now));
@@ -433,6 +434,5 @@ public class ContextImplTest {
assertTrue(record.getProperties().containsKey("prop-key"));
assertEquals(record.getProperties().get("prop-key"), "prop-value");
assertNull(record.getValue());
- assertNull(record.getSchema());
}
}
diff --git a/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/RecordFunction.java b/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/RecordFunction.java
index 028bccae5fc..55adf848da5 100644
--- a/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/RecordFunction.java
+++ b/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/RecordFunction.java
@@ -20,6 +20,7 @@ package org.apache.pulsar.functions.api.examples;
import java.util.HashMap;
import java.util.Map;
+import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.functions.api.Context;
import org.apache.pulsar.functions.api.Function;
import org.apache.pulsar.functions.api.Record;
@@ -34,7 +35,7 @@ public class RecordFunction implements Function<String, Record<String>> {
Map<String, String> properties = new HashMap<>(context.getCurrentRecord().getProperties());
context.getCurrentRecord().getTopicName().ifPresent(topic -> properties.put("input_topic", topic));
- return context.<String>newOutputRecordBuilder()
+ return context.newOutputRecordBuilder(Schema.STRING)
.destinationTopic(publishTopic)
.value(output)
.properties(properties)