You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2021/09/09 08:01:00 UTC

[pulsar] 08/09: [function] enable protobuf-native schema support for function (#11868)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit c0bada6198055c13b8004e16b5d436349dd9fe19
Author: Neng Lu <nl...@streamnative.io>
AuthorDate: Wed Sep 1 21:48:02 2021 -0700

    [function] enable protobuf-native schema support for function (#11868)
    
    Fixes #11721
    
    ### Motivation
    
    Enable function proces topic with protobuf_native schema
    
    ### Modifications
    
    update `TopicSchema`
    
    (cherry picked from commit d0e5d96185336f56a7599e97474a6074cf6b76a7)
---
 .../pulsar/functions/source/TopicSchema.java       |  4 ++
 .../pulsar/functions/source/TopicSchemaTest.java   | 58 ++++++++++++++++++++++
 2 files changed, 62 insertions(+)

diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/TopicSchema.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/TopicSchema.java
index dcd424e..067a793 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/TopicSchema.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/TopicSchema.java
@@ -32,6 +32,7 @@ import org.apache.pulsar.client.api.schema.SchemaDefinition;
 import org.apache.pulsar.client.impl.PulsarClientImpl;
 import org.apache.pulsar.client.impl.schema.AvroSchema;
 import org.apache.pulsar.client.impl.schema.JSONSchema;
+import org.apache.pulsar.client.impl.schema.ProtobufNativeSchema;
 import org.apache.pulsar.client.impl.schema.ProtobufSchema;
 import org.apache.pulsar.common.functions.ConsumerConfig;
 import org.apache.pulsar.common.schema.KeyValue;
@@ -172,6 +173,9 @@ public class TopicSchema {
         case PROTOBUF:
             return ProtobufSchema.ofGenericClass(clazz, new HashMap<>());
 
+        case PROTOBUF_NATIVE:
+            return ProtobufNativeSchema.ofGenericClass(clazz, new HashMap<>());
+
         case AUTO_PUBLISH:
             return (Schema<T>) Schema.AUTO_PRODUCE_BYTES();
 
diff --git a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/source/TopicSchemaTest.java b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/source/TopicSchemaTest.java
new file mode 100644
index 0000000..c746093
--- /dev/null
+++ b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/source/TopicSchemaTest.java
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.functions.source;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.impl.schema.AvroSchema;
+import org.apache.pulsar.client.impl.schema.JSONSchema;
+import org.apache.pulsar.client.impl.schema.ProtobufNativeSchema;
+import org.apache.pulsar.client.impl.schema.ProtobufSchema;
+import org.apache.pulsar.common.schema.SchemaType;
+import org.apache.pulsar.functions.proto.Request;
+import org.junit.Test;
+
+import java.util.Optional;
+
+import static org.testng.Assert.assertEquals;
+
+@Slf4j
+public class TopicSchemaTest {
+
+    @Test
+    public void testGetSchema() {
+        TopicSchema topicSchema = new TopicSchema(null);
+
+        String TOPIC = "public/default/test";
+        Schema<?> schema = topicSchema.getSchema(TOPIC + "1", DummyClass.class, Optional.of(SchemaType.JSON));
+        assertEquals(schema.getClass(), JSONSchema.class);
+
+        schema = topicSchema.getSchema(TOPIC + "2", DummyClass.class, Optional.of(SchemaType.AVRO));
+        assertEquals(schema.getClass(), AvroSchema.class);
+
+        // use an arbitrary protobuf class for testing purpose
+        schema = topicSchema.getSchema(TOPIC + "3", Request.ServiceRequest.class, Optional.of(SchemaType.PROTOBUF));
+        assertEquals(schema.getClass(), ProtobufSchema.class);
+
+        schema = topicSchema.getSchema(TOPIC + "4", Request.ServiceRequest.class, Optional.of(SchemaType.PROTOBUF_NATIVE));
+        assertEquals(schema.getClass(), ProtobufNativeSchema.class);
+    }
+
+    private static class DummyClass {}
+}