You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by eo...@apache.org on 2022/06/17 11:36:46 UTC
[pulsar] branch master updated: [improve][function] Support Record> as Function output type (#16041)
This is an automated email from the ASF dual-hosted git repository.
eolivelli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new b3c5191304a [improve][function] Support Record<?> as Function output type (#16041)
b3c5191304a is described below
commit b3c5191304ad4cc25a4a2292073a9ccc8191ea55
Author: Christophe Bornet <cb...@hotmail.com>
AuthorDate: Fri Jun 17 13:36:39 2022 +0200
[improve][function] Support Record<?> as Function output type (#16041)
---
.../org/apache/pulsar/functions/api/Context.java | 10 ++
.../pulsar/functions/api/utils/FunctionRecord.java | 117 +++++++++++++++++++++
.../{SinkRecord.java => AbstractSinkRecord.java} | 89 ++++------------
.../pulsar/functions/instance/ContextImpl.java | 6 ++
.../functions/instance/JavaInstanceRunnable.java | 8 +-
.../functions/instance/OutputRecordSinkRecord.java | 94 +++++++++++++++++
.../pulsar/functions/instance/SinkRecord.java | 100 +++---------------
.../apache/pulsar/functions/sink/PulsarSink.java | 27 ++---
.../pulsar/functions/instance/ContextImplTest.java | 68 ++++++++++++
.../functions/api/examples/RecordFunction.java | 43 ++++++++
.../pulsar/functions/utils/FunctionCommon.java | 13 +++
.../pulsar/functions/utils/FunctionCommonTest.java | 69 ++++++++++++
.../integration/functions/PulsarFunctionsTest.java | 61 +++++++++++
.../functions/java/PulsarFunctionsJavaTest.java | 9 +-
14 files changed, 547 insertions(+), 167 deletions(-)
diff --git a/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/Context.java b/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/Context.java
index e45c2dec63e..36bdd0ef6ce 100644
--- a/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/Context.java
+++ b/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/Context.java
@@ -29,6 +29,7 @@ import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.TypedMessageBuilder;
import org.apache.pulsar.common.classification.InterfaceAudience;
import org.apache.pulsar.common.classification.InterfaceStability;
+import org.apache.pulsar.functions.api.utils.FunctionRecord;
/**
* Context provides contextual information to the executing function.
@@ -161,4 +162,13 @@ public interface Context extends BaseContext {
* @throws PulsarClientException
*/
<X> ConsumerBuilder<X> newConsumerBuilder(Schema<X> schema) throws PulsarClientException;
+
+ /**
+ * Creates a FunctionRecordBuilder initialized with values from this Context.
+ * It can be used in Functions to prepare a Record to return with default values taken from the Context and the
+ * input Record.
+ *
+ * @return the record builder instance
+ */
+ <X> FunctionRecord.FunctionRecordBuilder<X> newOutputRecordBuilder();
}
diff --git a/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/utils/FunctionRecord.java b/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/utils/FunctionRecord.java
new file mode 100644
index 00000000000..be204a7bc21
--- /dev/null
+++ b/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/utils/FunctionRecord.java
@@ -0,0 +1,117 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.functions.api.utils;
+
+import java.util.Map;
+import java.util.Optional;
+import lombok.Builder;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.functions.api.Context;
+import org.apache.pulsar.functions.api.Record;
+
+@Builder(builderMethodName = "")
+public class FunctionRecord<T> implements Record<T> {
+
+ private final T value;
+ private final String topicName;
+ private final String destinationTopic;
+ private final Map<String, String> properties;
+ private final String key;
+ private final Schema<T> schema;
+ private final Long eventTime;
+ private final String partitionId;
+ private final Integer partitionIndex;
+ private final Long recordSequence;
+
+ /**
+ * Creates a builder for a Record from a Function Context.
+ * The builder is initialized with the output topic from the Context and with the topicName, key, eventTime,
+ * properties, partitionId, partitionIndex and recordSequence from the Context input Record.
+ * It doesn't initialize a Message at the moment.
+ *
+ * @param context a Function Context
+ * @param <T> type of Record to build
+ * @return a Record builder initialised with values from the Function Context
+ */
+ public static <T> FunctionRecord.FunctionRecordBuilder<T> from(Context context) {
+ Record<?> currentRecord = context.getCurrentRecord();
+ FunctionRecordBuilder<T> builder = new FunctionRecordBuilder<T>()
+ .destinationTopic(context.getOutputTopic())
+ .properties(currentRecord.getProperties());
+ currentRecord.getTopicName().ifPresent(builder::topicName);
+ currentRecord.getKey().ifPresent(builder::key);
+ currentRecord.getEventTime().ifPresent(builder::eventTime);
+ currentRecord.getPartitionId().ifPresent(builder::partitionId);
+ currentRecord.getPartitionIndex().ifPresent(builder::partitionIndex);
+ currentRecord.getRecordSequence().ifPresent(builder::recordSequence);
+
+ return builder;
+ }
+
+ @Override
+ public T getValue() {
+ return value;
+ }
+
+ @Override
+ public Optional<String> getTopicName() {
+ return Optional.ofNullable(topicName);
+ }
+
+ @Override
+ public Optional<String> getDestinationTopic() {
+ return Optional.ofNullable(destinationTopic);
+ }
+
+ @Override
+ public Map<String, String> getProperties() {
+ return properties;
+ }
+
+ @Override
+ public Optional<String> getKey() {
+ return Optional.ofNullable(key);
+ }
+
+ @Override
+ public Schema<T> getSchema() {
+ return schema;
+ }
+
+ @Override
+ public Optional<Long> getEventTime() {
+ return Optional.ofNullable(eventTime);
+ }
+
+ @Override
+ public Optional<String> getPartitionId() {
+ return Optional.ofNullable(partitionId);
+ }
+
+ @Override
+ public Optional<Integer> getPartitionIndex() {
+ return Optional.ofNullable(partitionIndex);
+ }
+
+ @Override
+ public Optional<Long> getRecordSequence() {
+ return Optional.ofNullable(recordSequence);
+ }
+
+}
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/SinkRecord.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/AbstractSinkRecord.java
similarity index 62%
copy from pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/SinkRecord.java
copy to pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/AbstractSinkRecord.java
index b922b988581..2adcbd0065f 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/SinkRecord.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/AbstractSinkRecord.java
@@ -18,12 +18,9 @@
*/
package org.apache.pulsar.functions.instance;
-import java.util.Map;
import java.util.Optional;
-import lombok.AllArgsConstructor;
-import lombok.Data;
-import lombok.extern.slf4j.Slf4j;
-import org.apache.pulsar.client.api.Message;
+import lombok.EqualsAndHashCode;
+import lombok.ToString;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.impl.schema.AutoConsumeSchema;
import org.apache.pulsar.client.impl.schema.KeyValueSchemaImpl;
@@ -31,51 +28,25 @@ import org.apache.pulsar.functions.api.KVRecord;
import org.apache.pulsar.functions.api.Record;
import org.apache.pulsar.functions.source.PulsarRecord;
-@Slf4j
-@Data
-@AllArgsConstructor
-public class SinkRecord<T> implements Record<T> {
+@EqualsAndHashCode
+@ToString
+public abstract class AbstractSinkRecord<T> implements Record<T> {
- private final Record<T> sourceRecord;
- private final T value;
+ private final Record<?> sourceRecord;
- public Record<T> getSourceRecord() {
- return sourceRecord;
- }
-
- @Override
- public Optional<String> getTopicName() {
- return sourceRecord.getTopicName();
+ protected AbstractSinkRecord(Record<?> sourceRecord) {
+ this.sourceRecord = sourceRecord;
}
- @Override
- public Optional<String> getKey() {
- return sourceRecord.getKey();
- }
+ public abstract boolean shouldAlwaysSetMessageProperties();
- @Override
- public T getValue() {
- return value;
- }
-
- @Override
- public Optional<String> getPartitionId() {
- return sourceRecord.getPartitionId();
- }
-
- @Override
- public Optional<Integer> getPartitionIndex() {
- return sourceRecord.getPartitionIndex();
+ public Record<?> getSourceRecord() {
+ return sourceRecord;
}
@Override
- public Optional<Long> getRecordSequence() {
- return sourceRecord.getRecordSequence();
- }
-
- @Override
- public Map<String, String> getProperties() {
- return sourceRecord.getProperties();
+ public Optional<String> getTopicName() {
+ return sourceRecord.getTopicName();
}
@Override
@@ -112,29 +83,23 @@ public class SinkRecord<T> implements Record<T> {
sourceRecord.fail();
}
- @Override
- public Optional<String> getDestinationTopic() {
- return sourceRecord.getDestinationTopic();
- }
-
- @Override
- public Schema<T> getSchema() {
- if (sourceRecord == null) {
+ protected static <T> Schema<T> getRecordSchema(Record<T> record) {
+ if (record == null) {
return null;
}
- if (sourceRecord.getSchema() != null) {
+ if (record.getSchema() != null) {
// unwrap actual schema
- Schema<T> schema = sourceRecord.getSchema();
+ Schema<T> schema = record.getSchema();
// AutoConsumeSchema is a special schema, that comes into play
// when the Sink is going to handle any Schema
// usually you see Sink<GenericObject> or Sink<GenericRecord> in this case
if (schema instanceof AutoConsumeSchema) {
// extract the Schema from the message, this is the most accurate schema we have
// see PIP-85
- if (sourceRecord.getMessage().isPresent()
- && sourceRecord.getMessage().get().getReaderSchema().isPresent()) {
- schema = (Schema<T>) sourceRecord.getMessage().get().getReaderSchema().get();
+ if (record.getMessage().isPresent()
+ && record.getMessage().get().getReaderSchema().isPresent()) {
+ schema = (Schema<T>) record.getMessage().get().getReaderSchema().get();
} else {
schema = (Schema<T>) ((AutoConsumeSchema) schema).getInternalSchema();
}
@@ -142,22 +107,12 @@ public class SinkRecord<T> implements Record<T> {
return schema;
}
- if (sourceRecord instanceof KVRecord) {
- KVRecord kvRecord = (KVRecord) sourceRecord;
+ if (record instanceof KVRecord) {
+ KVRecord kvRecord = (KVRecord) record;
return KeyValueSchemaImpl.of(kvRecord.getKeySchema(), kvRecord.getValueSchema(),
kvRecord.getKeyValueEncodingType());
}
return null;
}
-
- @Override
- public Optional<Long> getEventTime() {
- return sourceRecord.getEventTime();
- }
-
- @Override
- public Optional<Message<T>> getMessage() {
- return sourceRecord.getMessage();
- }
}
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java
index 79ff3531de1..336c2bd0be3 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java
@@ -62,6 +62,7 @@ import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.functions.api.Context;
import org.apache.pulsar.functions.api.Record;
import org.apache.pulsar.functions.api.StateStore;
+import org.apache.pulsar.functions.api.utils.FunctionRecord;
import org.apache.pulsar.functions.instance.state.DefaultStateStore;
import org.apache.pulsar.functions.instance.state.StateManager;
import org.apache.pulsar.functions.instance.stats.ComponentStatsManager;
@@ -481,6 +482,11 @@ class ContextImpl implements Context, SinkContext, SourceContext, AutoCloseable
return this.client.newConsumer(schema);
}
+ @Override
+ public <X> FunctionRecord.FunctionRecordBuilder<X> newOutputRecordBuilder() {
+ return FunctionRecord.from(this);
+ }
+
@Override
public SubscriptionType getSubscriptionType() {
return subscriptionType;
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
index 56bd64a7107..61f809a4cbc 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
@@ -375,8 +375,14 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable {
if (componentType == org.apache.pulsar.functions.proto.Function.FunctionDetails.ComponentType.SINK) {
Thread.currentThread().setContextClassLoader(functionClassLoader);
}
+ AbstractSinkRecord<?> sinkRecord;
+ if (output instanceof Record) {
+ sinkRecord = new OutputRecordSinkRecord<>(srcRecord, (Record) output);
+ } else {
+ sinkRecord = new SinkRecord<>(srcRecord, output);
+ }
try {
- this.sink.write(new SinkRecord<>(srcRecord, output));
+ this.sink.write(sinkRecord);
} catch (Exception e) {
log.info("Encountered exception in sink write: ", e);
stats.incrSinkExceptions(e);
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/OutputRecordSinkRecord.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/OutputRecordSinkRecord.java
new file mode 100644
index 00000000000..6220517414d
--- /dev/null
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/OutputRecordSinkRecord.java
@@ -0,0 +1,94 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.functions.instance;
+
+import java.util.Map;
+import java.util.Optional;
+import lombok.EqualsAndHashCode;
+import lombok.ToString;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.functions.api.Record;
+
+@EqualsAndHashCode(callSuper = true)
+@ToString
+class OutputRecordSinkRecord<T> extends AbstractSinkRecord<T> {
+
+ private final Record<T> sinkRecord;
+
+ OutputRecordSinkRecord(Record<T> sourceRecord, Record<T> sinkRecord) {
+ super(sourceRecord);
+ this.sinkRecord = sinkRecord;
+ }
+
+ @Override
+ public Optional<String> getKey() {
+ return sinkRecord.getKey();
+ }
+
+ @Override
+ public T getValue() {
+ return sinkRecord.getValue();
+ }
+
+ @Override
+ public Optional<String> getPartitionId() {
+ return sinkRecord.getPartitionId();
+ }
+
+ @Override
+ public Optional<Integer> getPartitionIndex() {
+ return sinkRecord.getPartitionIndex();
+ }
+
+ @Override
+ public Optional<Long> getRecordSequence() {
+ return sinkRecord.getRecordSequence();
+ }
+
+ @Override
+ public Map<String, String> getProperties() {
+ return sinkRecord.getProperties();
+ }
+
+ @Override
+ public Optional<String> getDestinationTopic() {
+ return sinkRecord.getDestinationTopic();
+ }
+
+ @Override
+ public Schema<T> getSchema() {
+ return getRecordSchema(sinkRecord);
+ }
+
+ @Override
+ public Optional<Long> getEventTime() {
+ return sinkRecord.getEventTime();
+ }
+
+ @Override
+ public Optional<Message<T>> getMessage() {
+ return sinkRecord.getMessage();
+ }
+
+ @Override
+ public boolean shouldAlwaysSetMessageProperties() {
+ return true;
+ }
+}
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/SinkRecord.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/SinkRecord.java
index b922b988581..8f64ed2ce09 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/SinkRecord.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/SinkRecord.java
@@ -20,32 +20,22 @@ package org.apache.pulsar.functions.instance;
import java.util.Map;
import java.util.Optional;
-import lombok.AllArgsConstructor;
-import lombok.Data;
-import lombok.extern.slf4j.Slf4j;
+import lombok.EqualsAndHashCode;
+import lombok.ToString;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.Schema;
-import org.apache.pulsar.client.impl.schema.AutoConsumeSchema;
-import org.apache.pulsar.client.impl.schema.KeyValueSchemaImpl;
-import org.apache.pulsar.functions.api.KVRecord;
import org.apache.pulsar.functions.api.Record;
-import org.apache.pulsar.functions.source.PulsarRecord;
-
-@Slf4j
-@Data
-@AllArgsConstructor
-public class SinkRecord<T> implements Record<T> {
+@EqualsAndHashCode(callSuper = true)
+@ToString
+public class SinkRecord<T> extends AbstractSinkRecord<T> {
private final Record<T> sourceRecord;
private final T value;
- public Record<T> getSourceRecord() {
- return sourceRecord;
- }
-
- @Override
- public Optional<String> getTopicName() {
- return sourceRecord.getTopicName();
+ public SinkRecord(Record<T> sourceRecord, T value) {
+ super(sourceRecord);
+ this.sourceRecord = sourceRecord;
+ this.value = value;
}
@Override
@@ -73,45 +63,11 @@ public class SinkRecord<T> implements Record<T> {
return sourceRecord.getRecordSequence();
}
- @Override
+ @Override
public Map<String, String> getProperties() {
return sourceRecord.getProperties();
}
- @Override
- public void ack() {
- sourceRecord.ack();
- }
-
- /**
- * Some sink sometimes wants to control the ack type.
- */
- public void cumulativeAck() {
- if (sourceRecord instanceof PulsarRecord) {
- PulsarRecord pulsarRecord = (PulsarRecord) sourceRecord;
- pulsarRecord.cumulativeAck();
- } else {
- throw new RuntimeException("SourceRecord class type must be PulsarRecord");
- }
- }
-
- /**
- * Some sink sometimes wants to control the ack type.
- */
- public void individualAck() {
- if (sourceRecord instanceof PulsarRecord) {
- PulsarRecord pulsarRecord = (PulsarRecord) sourceRecord;
- pulsarRecord.individualAck();
- } else {
- throw new RuntimeException("SourceRecord class type must be PulsarRecord");
- }
- }
-
- @Override
- public void fail() {
- sourceRecord.fail();
- }
-
@Override
public Optional<String> getDestinationTopic() {
return sourceRecord.getDestinationTopic();
@@ -119,36 +75,7 @@ public class SinkRecord<T> implements Record<T> {
@Override
public Schema<T> getSchema() {
- if (sourceRecord == null) {
- return null;
- }
-
- if (sourceRecord.getSchema() != null) {
- // unwrap actual schema
- Schema<T> schema = sourceRecord.getSchema();
- // AutoConsumeSchema is a special schema, that comes into play
- // when the Sink is going to handle any Schema
- // usually you see Sink<GenericObject> or Sink<GenericRecord> in this case
- if (schema instanceof AutoConsumeSchema) {
- // extract the Schema from the message, this is the most accurate schema we have
- // see PIP-85
- if (sourceRecord.getMessage().isPresent()
- && sourceRecord.getMessage().get().getReaderSchema().isPresent()) {
- schema = (Schema<T>) sourceRecord.getMessage().get().getReaderSchema().get();
- } else {
- schema = (Schema<T>) ((AutoConsumeSchema) schema).getInternalSchema();
- }
- }
- return schema;
- }
-
- if (sourceRecord instanceof KVRecord) {
- KVRecord kvRecord = (KVRecord) sourceRecord;
- return KeyValueSchemaImpl.of(kvRecord.getKeySchema(), kvRecord.getValueSchema(),
- kvRecord.getKeyValueEncodingType());
- }
-
- return null;
+ return getRecordSchema(sourceRecord);
}
@Override
@@ -160,4 +87,9 @@ public class SinkRecord<T> implements Record<T> {
public Optional<Message<T>> getMessage() {
return sourceRecord.getMessage();
}
+
+ @Override
+ public boolean shouldAlwaysSetMessageProperties() {
+ return false;
+ }
}
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java
index ac08370a31e..80aa49e05ae 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java
@@ -59,8 +59,8 @@ import org.apache.pulsar.common.schema.KeyValueEncodingType;
import org.apache.pulsar.common.schema.SchemaType;
import org.apache.pulsar.common.util.Reflections;
import org.apache.pulsar.functions.api.Record;
+import org.apache.pulsar.functions.instance.AbstractSinkRecord;
import org.apache.pulsar.functions.instance.FunctionResultRouter;
-import org.apache.pulsar.functions.instance.SinkRecord;
import org.apache.pulsar.functions.instance.stats.ComponentStatsManager;
import org.apache.pulsar.functions.source.PulsarRecord;
import org.apache.pulsar.functions.source.TopicSchema;
@@ -85,9 +85,9 @@ public class PulsarSink<T> implements Sink<T> {
private interface PulsarSinkProcessor<T> {
- TypedMessageBuilder<T> newMessage(SinkRecord<T> record);
+ TypedMessageBuilder<T> newMessage(AbstractSinkRecord<T> record);
- void sendOutputMessage(TypedMessageBuilder<T> msg, SinkRecord<T> record);
+ void sendOutputMessage(TypedMessageBuilder<T> msg, AbstractSinkRecord<T> record);
void close() throws Exception;
}
@@ -183,17 +183,17 @@ public class PulsarSink<T> implements Sink<T> {
}
}
- public Function<Throwable, Void> getPublishErrorHandler(SinkRecord<T> record, boolean failSource) {
+ public Function<Throwable, Void> getPublishErrorHandler(AbstractSinkRecord<T> record, boolean failSource) {
return throwable -> {
- Record<T> srcRecord = record.getSourceRecord();
+ Record<?> srcRecord = record.getSourceRecord();
if (failSource) {
srcRecord.fail();
}
String topic = record.getDestinationTopic().orElse(pulsarSinkConfig.getTopic());
- String errorMsg = null;
+ String errorMsg;
if (srcRecord instanceof PulsarRecord) {
errorMsg = String.format("Failed to publish to topic [%s] with error [%s] with src message id [%s]",
topic, throwable.getMessage(), ((PulsarRecord) srcRecord).getMessageId());
@@ -234,7 +234,7 @@ public class PulsarSink<T> implements Sink<T> {
}
@Override
- public TypedMessageBuilder<T> newMessage(SinkRecord<T> record) {
+ public TypedMessageBuilder<T> newMessage(AbstractSinkRecord<T> record) {
Schema<T> schemaToWrite = record.getSchema();
if (record.getSourceRecord() instanceof PulsarRecord) {
// we are receiving data directly from another Pulsar topic
@@ -256,7 +256,7 @@ public class PulsarSink<T> implements Sink<T> {
}
@Override
- public void sendOutputMessage(TypedMessageBuilder<T> msg, SinkRecord<T> record) {
+ public void sendOutputMessage(TypedMessageBuilder<T> msg, AbstractSinkRecord<T> record) {
msg.sendAsync().thenAccept(messageId -> {
//no op
}).exceptionally(getPublishErrorHandler(record, false));
@@ -270,7 +270,7 @@ public class PulsarSink<T> implements Sink<T> {
}
@Override
- public void sendOutputMessage(TypedMessageBuilder<T> msg, SinkRecord<T> record) {
+ public void sendOutputMessage(TypedMessageBuilder<T> msg, AbstractSinkRecord<T> record) {
msg.sendAsync()
.thenAccept(messageId -> record.ack())
.exceptionally(getPublishErrorHandler(record, true));
@@ -285,7 +285,7 @@ public class PulsarSink<T> implements Sink<T> {
}
@Override
- public TypedMessageBuilder<T> newMessage(SinkRecord<T> record) {
+ public TypedMessageBuilder<T> newMessage(AbstractSinkRecord<T> record) {
if (!record.getPartitionId().isPresent()) {
throw new RuntimeException(
"PartitionId needs to be specified for every record while in Effectively-once mode");
@@ -311,7 +311,7 @@ public class PulsarSink<T> implements Sink<T> {
}
@Override
- public void sendOutputMessage(TypedMessageBuilder<T> msg, SinkRecord<T> record) {
+ public void sendOutputMessage(TypedMessageBuilder<T> msg, AbstractSinkRecord<T> record) {
if (!record.getRecordSequence().isPresent()) {
throw new RuntimeException(
@@ -367,7 +367,7 @@ public class PulsarSink<T> implements Sink<T> {
@Override
public void write(Record<T> record) {
- SinkRecord<T> sinkRecord = (SinkRecord<T>) record;
+ AbstractSinkRecord<T> sinkRecord = (AbstractSinkRecord<T>) record;
TypedMessageBuilder<T> msg = pulsarSinkProcessor.newMessage(sinkRecord);
if (record.getKey().isPresent() && !(record.getSchema() instanceof KeyValueSchema
@@ -377,7 +377,8 @@ public class PulsarSink<T> implements Sink<T> {
msg.value(record.getValue());
- if (!record.getProperties().isEmpty() && pulsarSinkConfig.isForwardSourceMessageProperty()) {
+ if (!record.getProperties().isEmpty()
+ && (sinkRecord.shouldAlwaysSetMessageProperties() || pulsarSinkConfig.isForwardSourceMessageProperty())) {
msg.properties(record.getProperties());
}
diff --git a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/ContextImplTest.java b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/ContextImplTest.java
index 41cb550250a..e2e34f4ff91 100644
--- a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/ContextImplTest.java
+++ b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/ContextImplTest.java
@@ -30,8 +30,12 @@ import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNull;
+import static org.testng.Assert.assertTrue;
import java.nio.ByteBuffer;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import org.apache.pulsar.client.admin.PulsarAdmin;
@@ -351,4 +355,68 @@ public class ContextImplTest {
}
}
+
+ @Test
+ public void testNewOutputRecordBuilder() {
+ Map<String, String> properties = new HashMap<>();
+ properties.put("prop-key", "prop-value");
+ long now = System.currentTimeMillis();
+ context.setCurrentMessageContext(new Record<String>() {
+ @Override
+ public Optional<String> getTopicName() {
+ return Optional.of("input-topic");
+ }
+
+ @Override
+ public Optional<String> getKey() {
+ return Optional.of("input-key");
+ }
+
+ @Override
+ public Schema<String> getSchema() {
+ return Schema.STRING;
+ }
+
+ @Override
+ public String getValue() {
+ return "input-value";
+ }
+
+ @Override
+ public Optional<Long> getEventTime() {
+ return Optional.of(now);
+ }
+
+ @Override
+ public Optional<String> getPartitionId() {
+ return Optional.of("input-partition-id");
+ }
+
+ @Override
+ public Optional<Integer> getPartitionIndex() {
+ return Optional.of(42);
+ }
+
+ @Override
+ public Optional<Long> getRecordSequence() {
+ return Optional.of(43L);
+ }
+
+ @Override
+ public Map<String, String> getProperties() {
+ return properties;
+ }
+ });
+ Record<Integer> record = context.<Integer>newOutputRecordBuilder().build();
+ assertEquals(record.getTopicName().get(), "input-topic");
+ assertEquals(record.getKey().get(), "input-key");
+ assertEquals(record.getEventTime(), Optional.of(now));
+ assertEquals(record.getPartitionId().get(), "input-partition-id");
+ assertEquals(record.getPartitionIndex(), Optional.of(42));
+ assertEquals(record.getRecordSequence(), Optional.of(43L));
+ assertTrue(record.getProperties().containsKey("prop-key"));
+ assertEquals(record.getProperties().get("prop-key"), "prop-value");
+ assertNull(record.getValue());
+ assertNull(record.getSchema());
+ }
}
diff --git a/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/RecordFunction.java b/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/RecordFunction.java
new file mode 100644
index 00000000000..028bccae5fc
--- /dev/null
+++ b/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/RecordFunction.java
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.functions.api.examples;
+
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.pulsar.functions.api.Context;
+import org.apache.pulsar.functions.api.Function;
+import org.apache.pulsar.functions.api.Record;
+
+public class RecordFunction implements Function<String, Record<String>> {
+
+ @Override
+ public Record<String> process(String input, Context context) throws Exception {
+ String publishTopic = (String) context.getUserConfigValueOrDefault("publish-topic", "publishtopic");
+ String output = String.format("%s!", input);
+
+ Map<String, String> properties = new HashMap<>(context.getCurrentRecord().getProperties());
+ context.getCurrentRecord().getTopicName().ifPresent(topic -> properties.put("input_topic", topic));
+
+ return context.<String>newOutputRecordBuilder()
+ .destinationTopic(publishTopic)
+ .value(output)
+ .properties(properties)
+ .build();
+ }
+}
diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionCommon.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionCommon.java
index 02202ae7978..e4233acd4c4 100644
--- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionCommon.java
+++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionCommon.java
@@ -54,6 +54,7 @@ import org.apache.pulsar.common.nar.NarClassLoader;
import org.apache.pulsar.common.nar.NarClassLoaderBuilder;
import org.apache.pulsar.common.util.ClassLoaderUtils;
import org.apache.pulsar.functions.api.Function;
+import org.apache.pulsar.functions.api.Record;
import org.apache.pulsar.functions.api.WindowFunction;
import org.apache.pulsar.functions.proto.Function.FunctionDetails.Runtime;
import org.apache.pulsar.functions.utils.io.ConnectorUtils;
@@ -120,8 +121,20 @@ public class FunctionCommon {
} else {
if (Function.class.isAssignableFrom(userClass)) {
typeArgs = TypeResolver.resolveRawArguments(Function.class, userClass);
+ if (typeArgs[1].equals(Record.class)) {
+ Type type = TypeResolver.resolveGenericType(Function.class, userClass);
+ Type recordType = ((ParameterizedType) type).getActualTypeArguments()[1];
+ Type actualInputType = ((ParameterizedType) recordType).getActualTypeArguments()[0];
+ typeArgs[1] = (Class<?>) actualInputType;
+ }
} else {
typeArgs = TypeResolver.resolveRawArguments(java.util.function.Function.class, userClass);
+ if (typeArgs[1].equals(Record.class)) {
+ Type type = TypeResolver.resolveGenericType(java.util.function.Function.class, userClass);
+ Type recordType = ((ParameterizedType) type).getActualTypeArguments()[1];
+ Type actualInputType = ((ParameterizedType) recordType).getActualTypeArguments()[0];
+ typeArgs[1] = (Class<?>) actualInputType;
+ }
}
}
diff --git a/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/FunctionCommonTest.java b/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/FunctionCommonTest.java
index 0e34e2997c5..3a1f69d4bfd 100644
--- a/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/FunctionCommonTest.java
+++ b/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/FunctionCommonTest.java
@@ -19,9 +19,16 @@
package org.apache.pulsar.functions.utils;
+import java.util.Collection;
import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.common.util.FutureUtil;
+import org.apache.pulsar.functions.api.Context;
+import org.apache.pulsar.functions.api.Function;
+import org.apache.pulsar.functions.api.Record;
+import org.apache.pulsar.functions.api.WindowContext;
+import org.apache.pulsar.functions.api.WindowFunction;
import org.testng.Assert;
+import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
import java.io.File;
@@ -101,4 +108,66 @@ public class FunctionCommonTest {
assertEquals(lid, id.getLedgerId());
assertEquals(eid, id.getEntryId());
}
+
+ @DataProvider(name = "function")
+ public Object[][] functionProvider() {
+ return new Object[][] {
+ {
+ new Function<String, Integer>() {
+ @Override
+ public Integer process(String input, Context context) throws Exception {
+ return null;
+ }
+ }, false
+ },
+ {
+ new Function<String, Record<Integer>>() {
+ @Override
+ public Record<Integer> process(String input, Context context) throws Exception {
+ return null;
+ }
+ }, false
+ },
+ {
+ new java.util.function.Function<String, Integer>() {
+ @Override
+ public Integer apply(String s) {
+ return null;
+ }
+ }, false
+ },
+ {
+ new java.util.function.Function<String, Record<Integer>>() {
+ @Override
+ public Record<Integer> apply(String s) {
+ return null;
+ }
+ }, false
+ },
+ {
+ new WindowFunction<String, Integer>() {
+ @Override
+ public Integer process(Collection<Record<String>> input, WindowContext context) throws Exception {
+ return null;
+ }
+ }, true
+ },
+ {
+ new java.util.function.Function<Collection<String>, Integer>() {
+ @Override
+ public Integer apply(Collection<String> strings) {
+ return null;
+ }
+ }, true
+ }
+ };
+ }
+
+ @Test(dataProvider = "function")
+ public void testGetFunctionTypes(Object function, boolean isWindowConfigPresent) {
+ Class<?>[] types = FunctionCommon.getFunctionTypes(function.getClass(), isWindowConfigPresent);
+ assertEquals(types.length, 2);
+ assertEquals(types[0], String.class);
+ assertEquals(types[1], Integer.class);
+ }
}
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
index 9a6a26be3da..d674e6d5101 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
@@ -72,6 +72,7 @@ import org.apache.pulsar.functions.api.examples.AutoSchemaFunction;
import org.apache.pulsar.functions.api.examples.AvroSchemaTestFunction;
import org.apache.pulsar.functions.api.examples.MergeTopicFunction;
import org.apache.pulsar.functions.api.examples.InitializableFunction;
+import org.apache.pulsar.functions.api.examples.RecordFunction;
import org.apache.pulsar.functions.api.examples.pojo.AvroTestObject;
import org.apache.pulsar.functions.api.examples.pojo.Users;
import org.apache.pulsar.functions.api.examples.serde.CustomObject;
@@ -1715,6 +1716,66 @@ public abstract class PulsarFunctionsTest extends PulsarFunctionsTestBase {
getFunctionInfoNotFound(functionName);
}
+ protected void testRecordFunction() throws Exception {
+ log.info("start RecordFunction function test ...");
+
+ String ns = "public/ns-recordfunction-" + randomName(8);
+ @Cleanup
+ PulsarAdmin pulsarAdmin = getPulsarAdmin();
+ pulsarAdmin.namespaces().createNamespace(ns);
+
+ @Cleanup
+ PulsarClient pulsarClient = getPulsarClient();
+
+ final int numMessages = 10;
+ final String inputTopic = ns + "/test-string-input-" + randomName(8);
+ final String outputTopic = ns + "/test-string-output-" + randomName(8);
+ @Cleanup
+ Consumer<String> consumer = pulsarClient
+ .newConsumer(Schema.STRING)
+ .subscriptionName("test")
+ .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
+ .topic("publishtopic")
+ .subscribe();
+
+ final String functionName = "test-record-fn-" + randomName(8);
+ submitFunction(
+ Runtime.JAVA,
+ inputTopic,
+ outputTopic,
+ functionName,
+ null,
+ RecordFunction.class.getName(),
+ Schema.AUTO_CONSUME());
+ try {
+ @Cleanup
+ Producer<String> producer = pulsarClient
+ .newProducer(Schema.STRING)
+ .topic(inputTopic)
+ .create();
+ for (int i = 0; i < numMessages; i++) {
+ producer.send("message" + i);
+ }
+
+ getFunctionInfoSuccess(functionName);
+
+ getFunctionStatus(functionName, numMessages, true);
+
+ for (int i = 0; i < numMessages; i++) {
+ Message<String> msg = consumer.receive(30, TimeUnit.SECONDS);
+ log.info("Received: {}", msg.getValue());
+ assertEquals(msg.getValue(), "message" + i + "!");
+ assertEquals(msg.getProperty("input_topic"), "persistent://" + inputTopic);
+ }
+ } finally {
+ pulsarCluster.dumpFunctionLogs(functionName);
+ }
+
+ deleteFunction(functionName);
+
+ getFunctionInfoNotFound(functionName);
+ }
+
protected void testMergeFunction() throws Exception {
log.info("start merge function test ...");
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/java/PulsarFunctionsJavaTest.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/java/PulsarFunctionsJavaTest.java
index 46cb15892a1..6e496214219 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/java/PulsarFunctionsJavaTest.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/java/PulsarFunctionsJavaTest.java
@@ -165,8 +165,8 @@ public class PulsarFunctionsJavaTest extends PulsarFunctionsTest {
@Test(groups = {"java_function", "function"})
public void testMergeFunctionTest() throws Exception {
- testMergeFunction();
- }
+ testMergeFunction();
+ }
@Test(groups = {"java_function", "function"})
public void testGenericObjectFunction() throws Exception {
@@ -188,4 +188,9 @@ public class PulsarFunctionsJavaTest extends PulsarFunctionsTest {
testGenericObjectFunction(REMOVE_AVRO_FIELD_FUNCTION_JAVA_CLASS, true, true);
}
+ @Test(groups = {"java_function", "function"})
+ public void testRecordFunctionTest() throws Exception {
+ testRecordFunction();
+ }
+
}