You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2021/05/29 02:46:26 UTC
[pulsar] branch master updated: Support using
AutoProduceBytesSchema as the function output schema (#10716)
This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 10ed501 Support using AutoProduceBytesSchema as the function output schema (#10716)
10ed501 is described below
commit 10ed501d8a3bdb5f70d2dce831fa28cbab9aa623
Author: ran <ga...@126.com>
AuthorDate: Sat May 29 10:45:36 2021 +0800
Support using AutoProduceBytesSchema as the function output schema (#10716)
### Motivation
Currently, in function, we couldn't use the `AutoProduceBytesSchema` as the output schema.
### Modifications
Support users use the `AUTO_PUBLISH` as the value of configuration `schemaType` in function. Such as `--schema-type AUTO_PUBLISH`
---
.../java/org/apache/pulsar/schema/SchemaTest.java | 23 +-
.../pulsar/functions/instance/ContextImpl.java | 8 +-
.../pulsar/functions/source/TopicSchema.java | 3 +
.../functions/api/examples/MergeTopicFunction.java | 74 ++++++
.../pulsar/functions/api/examples/pojo/Users.java | 47 ++++
.../integration/functions/PulsarFunctionsTest.java | 253 +++++++++++++++++++--
.../functions/java/PulsarFunctionsJavaTest.java | 5 +
.../functions/utils/CommandGenerator.java | 38 +++-
8 files changed, 420 insertions(+), 31 deletions(-)
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java
index a50ab61..fe720a1 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java
@@ -57,6 +57,7 @@ import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionInitialPosition;
+import org.apache.pulsar.client.api.TypedMessageBuilder;
import org.apache.pulsar.client.api.schema.GenericRecord;
import org.apache.pulsar.client.api.schema.SchemaDefinition;
import org.apache.pulsar.client.impl.schema.KeyValueSchema;
@@ -877,21 +878,15 @@ public class SchemaTest extends MockedPulsarServiceBaseTest {
}
message.getValue();
- Schema<?> readerSchema = message.getReaderSchema().get();
- if (readerSchema instanceof KeyValueSchema
- && ((KeyValueSchema<?, ?>) readerSchema)
- .getKeyValueEncodingType().equals(KeyValueEncodingType.SEPARATED)) {
- autoProducer.newMessage(
- Schema.AUTO_PRODUCE_BYTES(message.getReaderSchema().get())).keyBytes(message.getKeyBytes())
- .value(message.getData())
- .properties(message.getProperties())
- .send();
- } else {
- autoProducer.newMessage(Schema.AUTO_PRODUCE_BYTES(message.getReaderSchema().get()))
- .properties(message.getProperties())
- .value(message.getData())
- .send();
+ TypedMessageBuilder messageBuilder = autoProducer
+ .newMessage(Schema.AUTO_PRODUCE_BYTES(message.getReaderSchema().get()))
+ .value(message.getData())
+ .properties(message.getProperties());
+ if (message.getKeyBytes() != null) {
+ messageBuilder.keyBytes(message.getKeyBytes());
}
+ messageBuilder.send();
+
producer.close();
consumer.close();
}
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java
index 97aa538..eb9f541 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java
@@ -456,7 +456,13 @@ class ContextImpl implements Context, SinkContext, SourceContext, AutoCloseable
@Override
public <O> TypedMessageBuilder<O> newOutputMessage(String pulsarName, String topicName, Schema<O> schema) throws PulsarClientException {
MessageBuilderImpl<O> messageBuilder = new MessageBuilderImpl<>();
- TypedMessageBuilder<O> typedMessageBuilder = getProducer(pulsarName, topicName, schema).newMessage();
+ TypedMessageBuilder<O> typedMessageBuilder;
+ Producer<O> producer = getProducer(pulsarName, topicName, schema);
+ if (schema != null) {
+ typedMessageBuilder = producer.newMessage(schema);
+ } else {
+ typedMessageBuilder = producer.newMessage();
+ }
messageBuilder.setUnderlyingBuilder(typedMessageBuilder);
return messageBuilder;
}
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/TopicSchema.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/TopicSchema.java
index bc29029..7ed6b6c 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/TopicSchema.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/TopicSchema.java
@@ -169,6 +169,9 @@ public class TopicSchema {
case PROTOBUF:
return ProtobufSchema.ofGenericClass(clazz, new HashMap<>());
+ case AUTO_PUBLISH:
+ return (Schema<T>) Schema.AUTO_PRODUCE_BYTES();
+
default:
throw new RuntimeException("Unsupported schema type" + type);
}
diff --git a/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/MergeTopicFunction.java b/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/MergeTopicFunction.java
new file mode 100644
index 0000000..ea17846
--- /dev/null
+++ b/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/MergeTopicFunction.java
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.functions.api.examples;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.TypedMessageBuilder;
+import org.apache.pulsar.client.api.schema.GenericRecord;
+import org.apache.pulsar.functions.api.Context;
+import org.apache.pulsar.functions.api.Function;
+
+/**
+ * Merge various schemas data to a topic.
+ */
+@Slf4j
+public class MergeTopicFunction implements Function<GenericRecord, byte[]> {
+
+ @Override
+ public byte[] process(GenericRecord genericRecord, Context context) throws Exception {
+ if (context.getCurrentRecord().getMessage().isPresent()) {
+ Message<?> msg = context.getCurrentRecord().getMessage().get();
+ if (!msg.getReaderSchema().isPresent()) {
+ log.warn("The reader schema is null.");
+ return null;
+ }
+ log.info("process message with reader schema {}", msg.getReaderSchema().get());
+ TypedMessageBuilder<byte[]> messageBuilder =
+ context.newOutputMessage(context.getOutputTopic(),
+ Schema.AUTO_PRODUCE_BYTES(msg.getReaderSchema().get()));
+
+ messageBuilder
+ .value(msg.getData())
+ .property("__original_topic", msg.getTopicName())
+ .property("__publish_time", String.valueOf(msg.getPublishTime()))
+ .property("__sequence_id", String.valueOf(msg.getSequenceId()))
+ .property("__producer_name", msg.getProducerName());
+
+ if (msg.getKeyBytes() != null) {
+ messageBuilder.keyBytes(msg.getKeyBytes());
+ }
+
+ if (msg.getEventTime() > 0) {
+ messageBuilder.eventTime(msg.getEventTime());
+ }
+
+ if (!msg.getProperties().isEmpty()) {
+ messageBuilder.properties(msg.getProperties());
+ }
+
+ messageBuilder.send();
+ log.info("send message successfully");
+ } else {
+ log.warn("context current record message is not present.");
+ }
+ return null;
+ }
+}
diff --git a/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/pojo/Users.java b/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/pojo/Users.java
new file mode 100644
index 0000000..1522513
--- /dev/null
+++ b/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/pojo/Users.java
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.functions.api.examples.pojo;
+
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+/**
+ * Pojo for test multi-version schema.
+ */
+public class Users {
+
+ @NoArgsConstructor
+ @AllArgsConstructor
+ @Data
+ public static class UserV1 {
+ private String name;
+ private Integer age;
+ }
+
+ @NoArgsConstructor
+ @AllArgsConstructor
+ @Data
+ public static class UserV2 {
+ private String name;
+ private Integer age;
+ private String phone;
+ }
+
+}
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
index 1d51c2f..5b8116e 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
@@ -23,6 +23,10 @@ import static org.assertj.core.api.Assertions.assertThat;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.gson.Gson;
import java.time.Duration;
import java.util.Collections;
@@ -32,9 +36,12 @@ import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
import lombok.Cleanup;
import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.BatcherBuilder;
@@ -43,17 +50,28 @@ import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Reader;
import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.SubscriptionInitialPosition;
import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.client.api.schema.GenericRecord;
import org.apache.pulsar.client.impl.PulsarClientImpl;
+import org.apache.pulsar.client.impl.schema.generic.GenericJsonRecord;
import org.apache.pulsar.common.policies.data.FunctionStats;
import org.apache.pulsar.common.policies.data.FunctionStatus;
+import org.apache.pulsar.common.policies.data.RetentionPolicies;
+import org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy;
import org.apache.pulsar.common.policies.data.TopicStats;
+import org.apache.pulsar.common.schema.KeyValue;
+import org.apache.pulsar.common.schema.KeyValueEncodingType;
import org.apache.pulsar.common.schema.SchemaInfo;
+import org.apache.pulsar.common.schema.SchemaType;
import org.apache.pulsar.functions.api.examples.AutoSchemaFunction;
import org.apache.pulsar.functions.api.examples.AvroSchemaTestFunction;
+import org.apache.pulsar.functions.api.examples.MergeTopicFunction;
import org.apache.pulsar.functions.api.examples.pojo.AvroTestObject;
+import org.apache.pulsar.functions.api.examples.pojo.Users;
import org.apache.pulsar.functions.api.examples.serde.CustomObject;
import org.apache.pulsar.tests.integration.docker.ContainerExecException;
import org.apache.pulsar.tests.integration.docker.ContainerExecResult;
@@ -71,6 +89,8 @@ import org.testng.annotations.Test;
@Slf4j
public abstract class PulsarFunctionsTest extends PulsarFunctionsTestBase {
+ private final ObjectMapper objectMapper = new ObjectMapper();
+
public PulsarFunctionsTest(FunctionRuntimeType functionRuntimeType) {
super(functionRuntimeType);
}
@@ -534,7 +554,8 @@ public abstract class PulsarFunctionsTest extends PulsarFunctionsTestBase {
null,
PUBLISH_JAVA_CLASS,
schema,
- Collections.singletonMap("publish-topic", outputTopicName));
+ Collections.singletonMap("publish-topic", outputTopicName),
+ null, null, null);
break;
case PYTHON:
submitFunction(
@@ -545,7 +566,8 @@ public abstract class PulsarFunctionsTest extends PulsarFunctionsTestBase {
PUBLISH_FUNCTION_PYTHON_FILE,
PUBLISH_PYTHON_CLASS,
schema,
- Collections.singletonMap("publish-topic", outputTopicName));
+ Collections.singletonMap("publish-topic", outputTopicName),
+ null, null, null);
break;
case GO:
submitFunction(
@@ -556,7 +578,8 @@ public abstract class PulsarFunctionsTest extends PulsarFunctionsTestBase {
PUBLISH_FUNCTION_GO_FILE,
null,
schema,
- Collections.singletonMap("publish-topic", outputTopicName));
+ Collections.singletonMap("publish-topic", outputTopicName),
+ null, null, null);
}
// get function info
@@ -765,20 +788,24 @@ public abstract class PulsarFunctionsTest extends PulsarFunctionsTestBase {
String functionFile,
String functionClass,
Schema<T> inputTopicSchema) throws Exception {
- submitFunction(runtime, inputTopicName, outputTopicName, functionName, functionFile, functionClass, inputTopicSchema, null);
+ submitFunction(runtime, inputTopicName, outputTopicName, functionName, functionFile, functionClass,
+ inputTopicSchema, null, null, null, null);
}
private <T> void submitFunction(Runtime runtime,
- String inputTopicName,
- String outputTopicName,
- String functionName,
- String functionFile,
- String functionClass,
- Schema<T> inputTopicSchema,
- Map<String, String> userConfigs) throws Exception {
+ String inputTopicName,
+ String outputTopicName,
+ String functionName,
+ String functionFile,
+ String functionClass,
+ Schema<T> inputTopicSchema,
+ Map<String, String> userConfigs,
+ String customSchemaInputs,
+ String outputSchemaType,
+ SubscriptionInitialPosition subscriptionInitialPosition) throws Exception {
CommandGenerator generator;
- log.info("------- INPUT TOPIC: '{}'", inputTopicName);
+ log.info("------- INPUT TOPIC: '{}', customSchemaInputs: {}", inputTopicName, customSchemaInputs);
if (inputTopicName.endsWith(".*")) {
log.info("----- CREATING TOPIC PATTERN FUNCTION --- ");
generator = CommandGenerator.createTopicPatternGenerator(inputTopicName, functionClass);
@@ -791,6 +818,15 @@ public abstract class PulsarFunctionsTest extends PulsarFunctionsTestBase {
if (userConfigs != null) {
generator.setUserConfig(userConfigs);
}
+ if (customSchemaInputs != null) {
+ generator.setCustomSchemaInputs(customSchemaInputs);
+ }
+ if (outputSchemaType != null) {
+ generator.setSchemaType(outputSchemaType);
+ }
+ if (subscriptionInitialPosition != null) {
+ generator.setSubscriptionInitialPosition(subscriptionInitialPosition);
+ }
String command = "";
switch (runtime){
@@ -814,7 +850,10 @@ public abstract class PulsarFunctionsTest extends PulsarFunctionsTestBase {
commands);
assertTrue(result.getStdout().contains("\"Created successfully\""));
- ensureSubscriptionCreated(inputTopicName, String.format("public/default/%s", functionName), inputTopicSchema);
+ if (StringUtils.isNotEmpty(inputTopicName)) {
+ ensureSubscriptionCreated(
+ inputTopicName, String.format("public/default/%s", functionName), inputTopicSchema);
+ }
}
private void updateFunctionParallelism(String functionName, int parallelism) throws Exception {
@@ -1520,4 +1559,192 @@ public abstract class PulsarFunctionsTest extends PulsarFunctionsTestBase {
client.close();
}
+ protected void testMergeFunction() throws Exception {
+ log.info("start merge function test ...");
+
+ String ns = "public/ns-merge-" + randomName(8);
+ @Cleanup
+ PulsarAdmin pulsarAdmin = getPulsarAdmin();
+ pulsarAdmin.namespaces().createNamespace(ns);
+ pulsarAdmin.namespaces().setSchemaCompatibilityStrategy(ns, SchemaCompatibilityStrategy.ALWAYS_COMPATIBLE);
+ SchemaCompatibilityStrategy strategy = pulsarAdmin.namespaces().getSchemaCompatibilityStrategy(ns);
+ log.info("namespace {} SchemaCompatibilityStrategy is {}", ns, strategy);
+
+ @Cleanup
+ PulsarClient pulsarClient = getPulsarClient();
+
+ ObjectNode inputSpecNode = objectMapper.createObjectNode();
+ Map<String, AtomicInteger> topicMsgCntMap = new ConcurrentHashMap<>();
+ int messagePerTopic = 10;
+ prepareDataForMergeFunction(ns, pulsarClient, inputSpecNode, messagePerTopic, topicMsgCntMap);
+
+ final String outputTopic = ns + "/test-merge-output";
+ @Cleanup
+ Consumer<GenericRecord> consumer = pulsarClient
+ .newConsumer(Schema.AUTO_CONSUME())
+ .subscriptionName("test-merge-fn")
+ .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
+ .topic(outputTopic)
+ .subscribe();
+
+ final String functionName = "test-merge-fn-" + randomName(8);
+ submitFunction(
+ Runtime.JAVA,
+ "",
+ outputTopic,
+ functionName,
+ null,
+ MergeTopicFunction.class.getName(),
+ null,
+ null,
+ inputSpecNode.toString(),
+ SchemaType.AUTO_PUBLISH.name().toUpperCase(),
+ SubscriptionInitialPosition.Earliest);
+
+ getFunctionInfoSuccess(functionName);
+
+ getFunctionStatus(functionName, topicMsgCntMap.keySet().size() * messagePerTopic, true);
+
+ Message<GenericRecord> message;
+ do {
+ message = consumer.receive(30, TimeUnit.SECONDS);
+ if (message != null) {
+ String baseTopic = message.getProperty("baseTopic");
+ GenericRecord genericRecord = message.getValue();
+ log.info("receive msg baseTopic: {}, schemaType: {}, nativeClass: {}, nativeObject: {}",
+ baseTopic,
+ genericRecord.getSchemaType(),
+ genericRecord.getNativeObject().getClass(),
+ genericRecord.getNativeObject());
+ checkSchemaForAutoSchema(message, baseTopic);
+ topicMsgCntMap.get(baseTopic).decrementAndGet();
+ consumer.acknowledge(message);
+ }
+ } while (message != null);
+
+ for (Map.Entry<String, AtomicInteger> entry : topicMsgCntMap.entrySet()) {
+ assertEquals(entry.getValue().get(), 0,
+ "topic " + entry.getKey() + " left message cnt is not 0.");
+ }
+
+ deleteFunction(functionName);
+
+ getFunctionInfoNotFound(functionName);
+ log.info("finish merge function test.");
+ }
+
+ private void prepareDataForMergeFunction(String ns,
+ PulsarClient pulsarClient,
+ ObjectNode inputSpecNode,
+ int messagePerTopic,
+ Map<String, AtomicInteger> topicMsgCntMap) throws PulsarClientException {
+ generateDataByDifferentSchema(ns, "merge-schema-bytes", pulsarClient,
+ Schema.BYTES, "bytes schema test".getBytes(), messagePerTopic, inputSpecNode, topicMsgCntMap);
+ generateDataByDifferentSchema(ns, "merge-schema-string", pulsarClient,
+ Schema.STRING, "string schema test", messagePerTopic, inputSpecNode, topicMsgCntMap);
+ generateDataByDifferentSchema(ns, "merge-schema-json-userv1", pulsarClient,
+ Schema.JSON(Users.UserV1.class), new Users.UserV1("ran", 33),
+ messagePerTopic, inputSpecNode, topicMsgCntMap);
+ generateDataByDifferentSchema(ns, "merge-schema-json-userv2", pulsarClient,
+ Schema.JSON(Users.UserV2.class), new Users.UserV2("tang", 18, "123123123"),
+ messagePerTopic, inputSpecNode, topicMsgCntMap);
+ generateDataByDifferentSchema(ns, "merge-schema-avro-userv2", pulsarClient,
+ Schema.AVRO(Users.UserV2.class), new Users.UserV2("tang", 20, "456456456"),
+ messagePerTopic, inputSpecNode, topicMsgCntMap);
+ generateDataByDifferentSchema(ns, "merge-schema-k-int-v-json-userv1-separate", pulsarClient,
+ Schema.KeyValue(Schema.INT32, Schema.JSON(Users.UserV1.class), KeyValueEncodingType.SEPARATED),
+ new KeyValue<>(100, new Users.UserV1("ran", 40)),
+ messagePerTopic, inputSpecNode, topicMsgCntMap);
+ generateDataByDifferentSchema(ns, "merge-schema-k-json-userv2-v-json-userv1-inline", pulsarClient,
+ Schema.KeyValue(Schema.JSON(Users.UserV2.class), Schema.JSON(Users.UserV1.class),
+ KeyValueEncodingType.INLINE),
+ new KeyValue<>(new Users.UserV2("tang", 20, "789789789"),
+ new Users.UserV1("ran", 40)),
+ messagePerTopic, inputSpecNode, topicMsgCntMap);
+ }
+
+ private void generateDataByDifferentSchema(String ns,
+ String baseTopic,
+ PulsarClient pulsarClient,
+ Schema schema,
+ Object data,
+ int messageCnt,
+ ObjectNode inputSpecNode,
+ Map<String, AtomicInteger> topicMsgCntMap) throws PulsarClientException {
+ String topic = ns + "/" + baseTopic;
+ Producer producer = pulsarClient.newProducer(schema)
+ .topic(topic)
+ .create();
+ for (int i = 0; i < messageCnt; i++) {
+ producer.newMessage().value(data).property("baseTopic", baseTopic).send();
+ }
+ ObjectNode confNode = objectMapper.createObjectNode();
+ confNode.put("schemaType", SchemaType.AUTO_CONSUME.name().toUpperCase());
+ inputSpecNode.put(topic, confNode.toString());
+ topicMsgCntMap.put(baseTopic, new AtomicInteger(messageCnt));
+ producer.close();
+ log.info("[merge-fn] generate {} messages for schema {}", messageCnt, schema.getSchemaInfo());
+ }
+
+ private void checkSchemaForAutoSchema(Message<GenericRecord> message, String baseTopic) {
+ if (!message.getReaderSchema().isPresent()) {
+ fail("Failed to get reader schema for auto consume multiple schema topic.");
+ }
+ Object nativeObject = message.getValue().getNativeObject();
+ JsonNode jsonNode;
+ KeyValue<?, ?> kv;
+ switch (baseTopic) {
+ case "merge-schema-bytes":
+ assertEquals(new String((byte[]) nativeObject), "bytes schema test");
+ break;
+ case "merge-schema-string":
+ assertEquals((String) nativeObject, "string schema test");
+ break;
+ case "merge-schema-json-userv1":
+ jsonNode = (JsonNode) nativeObject;
+ assertEquals(jsonNode.get("name").textValue(), "ran");
+ assertEquals(jsonNode.get("age").intValue(), 33);
+ break;
+ case "merge-schema-json-userv2":
+ jsonNode = (JsonNode) nativeObject;
+ assertEquals(jsonNode.get("name").textValue(), "tang");
+ assertEquals(jsonNode.get("age").intValue(), 18);
+ assertEquals(jsonNode.get("phone").textValue(), "123123123");
+ break;
+ case "merge-schema-avro-userv2":
+ org.apache.avro.generic.GenericRecord genericRecord =
+ (org.apache.avro.generic.GenericRecord) nativeObject;
+ assertEquals(genericRecord.get("name").toString(), "tang");
+ assertEquals(genericRecord.get("age"), 20);
+ assertEquals(genericRecord.get("phone").toString(), "456456456");
+ break;
+ case "merge-schema-k-int-v-json-userv1-separate":
+ kv = (KeyValue<Integer, GenericRecord>) nativeObject;
+ assertEquals(kv.getKey(), 100);
+ jsonNode = ((GenericJsonRecord) kv.getValue()).getJsonNode();
+ assertEquals(jsonNode.get("name").textValue(), "ran");
+ assertEquals(jsonNode.get("age").intValue(), 40);
+ break;
+ case "merge-schema-k-json-userv2-v-json-userv1-inline":
+ kv = (KeyValue<GenericRecord, GenericRecord>) nativeObject;
+ jsonNode = ((GenericJsonRecord) kv.getKey()).getJsonNode();
+ assertEquals(jsonNode.get("name").textValue(), "tang");
+ assertEquals(jsonNode.get("age").intValue(), 20);
+ assertEquals(jsonNode.get("phone").textValue(), "789789789");
+ jsonNode = ((GenericJsonRecord) kv.getValue()).getJsonNode();
+ assertEquals(jsonNode.get("name").textValue(), "ran");
+ assertEquals(jsonNode.get("age").intValue(), 40);
+ break;
+ default:
+ // nothing to do
+ }
+ }
+
+ private PulsarClient getPulsarClient() throws PulsarClientException {
+ return PulsarClient.builder().serviceUrl(pulsarCluster.getPlainTextServiceUrl()).build();
+ }
+
+ private PulsarAdmin getPulsarAdmin() throws PulsarClientException {
+ return PulsarAdmin.builder().serviceHttpUrl(pulsarCluster.getHttpServiceUrl()).build();
+ }
}
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/java/PulsarFunctionsJavaTest.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/java/PulsarFunctionsJavaTest.java
index 13134fa..466942d 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/java/PulsarFunctionsJavaTest.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/java/PulsarFunctionsJavaTest.java
@@ -156,4 +156,9 @@ public class PulsarFunctionsJavaTest extends PulsarFunctionsTest {
testWindowFunction("sliding", EXPECTED_RESULTS);
}
+ @Test(groups = {"java_function", "function"})
+ public void testMergeFunctionTest() throws Exception {
+ testMergeFunction();
+ }
+
}
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/utils/CommandGenerator.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/utils/CommandGenerator.java
index 65b54ee..4edf462 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/utils/CommandGenerator.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/utils/CommandGenerator.java
@@ -24,6 +24,8 @@ import java.util.Map;
import lombok.Getter;
import lombok.Setter;
import lombok.ToString;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.pulsar.client.api.SubscriptionInitialPosition;
import org.apache.pulsar.tests.integration.topologies.PulsarCluster;
@Getter
@@ -57,6 +59,9 @@ public class CommandGenerator {
private Long windowLengthDurationMs;
private Integer slidingIntervalCount;
private Long slidingIntervalDurationMs;
+ private String customSchemaInputs;
+ private String schemaType;
+ private SubscriptionInitialPosition subscriptionInitialPosition;
private Map<String, String> userConfig = new HashMap<>();
public static final String JAVAJAR = "/pulsar/examples/java-test-functions.jar";
@@ -97,12 +102,21 @@ public class CommandGenerator {
if(runtime != Runtime.GO){
commandBuilder.append(" --className " + functionClassName);
}
- if (sourceTopic != null) {
+ if (StringUtils.isNotEmpty(sourceTopic)) {
commandBuilder.append(" --inputs " + sourceTopic);
}
if (sinkTopic != null) {
commandBuilder.append(" --output " + sinkTopic);
}
+ if (customSchemaInputs != null) {
+ commandBuilder.append(" --custom-schema-inputs \'" + customSchemaInputs + "\'");
+ }
+ if (schemaType != null) {
+ commandBuilder.append(" --schema-type " + schemaType);
+ }
+ if (subscriptionInitialPosition != null) {
+ commandBuilder.append(" --subs-position " + subscriptionInitialPosition.name());
+ }
switch (runtime){
case JAVA:
commandBuilder.append(" --jar " + JAVAJAR);
@@ -148,7 +162,7 @@ public class CommandGenerator {
if (runtime != Runtime.GO){
commandBuilder.append(" --className " + functionClassName);
}
- if (sourceTopic != null) {
+ if (StringUtils.isNotEmpty(sourceTopic)) {
commandBuilder.append(" --inputs " + sourceTopic);
}
if (sourceTopicPattern != null) {
@@ -190,6 +204,15 @@ public class CommandGenerator {
if (slidingIntervalDurationMs != null) {
commandBuilder.append(" --slidingIntervalDurationMs " + slidingIntervalDurationMs);
}
+ if (customSchemaInputs != null) {
+ commandBuilder.append(" --custom-schema-inputs \'" + customSchemaInputs + "\'");
+ }
+ if (schemaType != null) {
+ commandBuilder.append(" --schema-type " + schemaType);
+ }
+ if (subscriptionInitialPosition != null) {
+ commandBuilder.append(" --subs-position " + subscriptionInitialPosition.name());
+ }
switch (runtime){
case JAVA:
@@ -240,7 +263,7 @@ public class CommandGenerator {
if (functionClassName != null) {
commandBuilder.append(" --className " + functionClassName);
}
- if (sourceTopic != null) {
+ if (StringUtils.isNotEmpty(sourceTopic)) {
commandBuilder.append(" --inputs " + sourceTopic);
}
if (customSereSourceTopics != null && !customSereSourceTopics.isEmpty()) {
@@ -279,6 +302,15 @@ public class CommandGenerator {
if (slidingIntervalDurationMs != null) {
commandBuilder.append(" --slidingIntervalDurationMs " + slidingIntervalDurationMs);
}
+ if (customSchemaInputs != null) {
+ commandBuilder.append(" --custom-schema-inputs \'" + customSchemaInputs + "\'");
+ }
+ if (schemaType != null) {
+ commandBuilder.append(" --schema-type " + schemaType);
+ }
+ if (subscriptionInitialPosition != null) {
+ commandBuilder.append(" --subs-position " + subscriptionInitialPosition.name());
+ }
if (codeFile != null) {
switch (runtime) {