You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2021/09/09 08:01:00 UTC
[pulsar] 08/09: [function] enable protobuf-native schema support
for function (#11868)
This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit c0bada6198055c13b8004e16b5d436349dd9fe19
Author: Neng Lu <nl...@streamnative.io>
AuthorDate: Wed Sep 1 21:48:02 2021 -0700
[function] enable protobuf-native schema support for function (#11868)
Fixes #11721
### Motivation
Enable function proces topic with protobuf_native schema
### Modifications
update `TopicSchema`
(cherry picked from commit d0e5d96185336f56a7599e97474a6074cf6b76a7)
---
.../pulsar/functions/source/TopicSchema.java | 4 ++
.../pulsar/functions/source/TopicSchemaTest.java | 58 ++++++++++++++++++++++
2 files changed, 62 insertions(+)
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/TopicSchema.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/TopicSchema.java
index dcd424e..067a793 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/TopicSchema.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/TopicSchema.java
@@ -32,6 +32,7 @@ import org.apache.pulsar.client.api.schema.SchemaDefinition;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.client.impl.schema.AvroSchema;
import org.apache.pulsar.client.impl.schema.JSONSchema;
+import org.apache.pulsar.client.impl.schema.ProtobufNativeSchema;
import org.apache.pulsar.client.impl.schema.ProtobufSchema;
import org.apache.pulsar.common.functions.ConsumerConfig;
import org.apache.pulsar.common.schema.KeyValue;
@@ -172,6 +173,9 @@ public class TopicSchema {
case PROTOBUF:
return ProtobufSchema.ofGenericClass(clazz, new HashMap<>());
+ case PROTOBUF_NATIVE:
+ return ProtobufNativeSchema.ofGenericClass(clazz, new HashMap<>());
+
case AUTO_PUBLISH:
return (Schema<T>) Schema.AUTO_PRODUCE_BYTES();
diff --git a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/source/TopicSchemaTest.java b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/source/TopicSchemaTest.java
new file mode 100644
index 0000000..c746093
--- /dev/null
+++ b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/source/TopicSchemaTest.java
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.functions.source;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.impl.schema.AvroSchema;
+import org.apache.pulsar.client.impl.schema.JSONSchema;
+import org.apache.pulsar.client.impl.schema.ProtobufNativeSchema;
+import org.apache.pulsar.client.impl.schema.ProtobufSchema;
+import org.apache.pulsar.common.schema.SchemaType;
+import org.apache.pulsar.functions.proto.Request;
+import org.junit.Test;
+
+import java.util.Optional;
+
+import static org.testng.Assert.assertEquals;
+
+@Slf4j
+public class TopicSchemaTest {
+
+ @Test
+ public void testGetSchema() {
+ TopicSchema topicSchema = new TopicSchema(null);
+
+ String TOPIC = "public/default/test";
+ Schema<?> schema = topicSchema.getSchema(TOPIC + "1", DummyClass.class, Optional.of(SchemaType.JSON));
+ assertEquals(schema.getClass(), JSONSchema.class);
+
+ schema = topicSchema.getSchema(TOPIC + "2", DummyClass.class, Optional.of(SchemaType.AVRO));
+ assertEquals(schema.getClass(), AvroSchema.class);
+
+ // use an arbitrary protobuf class for testing purpose
+ schema = topicSchema.getSchema(TOPIC + "3", Request.ServiceRequest.class, Optional.of(SchemaType.PROTOBUF));
+ assertEquals(schema.getClass(), ProtobufSchema.class);
+
+ schema = topicSchema.getSchema(TOPIC + "4", Request.ServiceRequest.class, Optional.of(SchemaType.PROTOBUF_NATIVE));
+ assertEquals(schema.getClass(), ProtobufNativeSchema.class);
+ }
+
+ private static class DummyClass {}
+}