You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2021/01/27 17:53:00 UTC

[GitHub] [pulsar] dlg99 commented on a change in pull request #9343: Issue 9004: Pulsar Schema API: provide Type information for Fields

dlg99 commented on a change in pull request #9343:
URL: https://github.com/apache/pulsar/pull/9343#discussion_r565513072



##########
File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/GenericSchemaImpl.java
##########
@@ -41,10 +43,126 @@ protected GenericSchemaImpl(SchemaInfo schemaInfo) {
 
         this.fields = schema.getFields()
                 .stream()
-                .map(f -> new Field(f.name(), f.pos()))
+                .map(f -> new Field(f.name(), f.pos(), convertFieldSchema(f, schemaInfo.getType())))
                 .collect(Collectors.toList());
     }
 
+
+    public static org.apache.pulsar.client.api.Schema<?> convertFieldSchema(Schema.Field f, SchemaType mainType) {
+        return convertFieldSchema(f.schema(), mainType);
+    }
+
+    private static org.apache.pulsar.client.api.Schema<?> convertFieldSchema(Schema schema, SchemaType mainType) {
+        switch (schema.getType()) {
+            case RECORD:
+                return buildStructSchema(schema, mainType);
+            case BYTES:
+                return GenericSchema.BYTES;
+            case LONG:
+                return GenericSchema.INT64;
+            case FLOAT:
+                return GenericSchema.FLOAT;
+            case DOUBLE:
+                return GenericSchema.DOUBLE;
+            case BOOLEAN:
+                return GenericSchema.BOOL;
+            case STRING:
+                return GenericSchema.STRING;
+            case INT:
+                return GenericSchema.INT32;
+            case UNION:
+                // this is very common while representing NULLABLE types
+                // the first entry is "null", the second is the effective type
+                List<Schema> types = schema.getTypes();
+                if (types.size() == 2) {
+                    if (types.stream().anyMatch(s -> s.getType() == Schema.Type.NULL)) {
+                        Schema nonNull = types.stream().filter(s -> s.getType() != Schema.Type.NULL).findFirst().orElse(null);
+                        if (nonNull != null) {
+                            return convertFieldSchema(nonNull, mainType);
+                        }
+                    }
+                }
+                return null;
+            case NULL:

Review comment:
       add tests for these special cases

##########
File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/GenericJsonSchema.java
##########
@@ -43,4 +51,88 @@ public GenericJsonSchema(SchemaInfo schemaInfo) {
     public GenericRecordBuilder newRecordBuilder() {
         throw new UnsupportedOperationException("Json Schema doesn't support record builder yet");
     }
+
+    static org.apache.pulsar.client.api.Schema<?> convertFieldSchema(JsonNode fn) {
+        if (fn.isContainerNode()) {
+            return buildStructSchema(fn, new AtomicInteger());
+        }
+        switch (fn.getNodeType()) {
+            case STRING:
+                return GenericSchema.STRING;
+            case BOOLEAN:
+                return GenericSchema.BOOL;
+            case BINARY:
+                return GenericSchema.BYTES;
+            case NUMBER:
+                if (fn.isInt()) {
+                    return GenericSchema.INT32;
+                } else if (fn.isLong()) {
+                    return GenericSchema.INT64;
+                } else if (fn.isFloat()) {
+                    return GenericSchema.FLOAT;
+                } else if (fn.isDouble()) {
+                    return GenericSchema.DOUBLE;
+                } else {
+                    return null;

Review comment:
       (here and in other places) Consider using `Optional<>`. 
   Are you using nulls for the performance reasons? how frequently this code runs?

##########
File path: pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/generic/SchemaFileTypeConversionTest.java
##########
@@ -0,0 +1,98 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl.schema.generic;
+
+import lombok.Data;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.schema.Field;
+
+import org.apache.pulsar.client.api.schema.GenericSchema;
+import org.apache.pulsar.common.schema.SchemaType;
+import org.testng.annotations.Test;
+
+import java.util.Collection;
+
+import static org.testng.Assert.*;
+
+/**
+ * Unit testing generic schemas.
+ */
+@Slf4j
+public class SchemaFileTypeConversionTest {
+
+    @Data
+    private static final class Nested {
+        private int value;
+    }
+
+    @Data
+    private static final class MyStruct {
+        private String theString;
+        private boolean theBool;
+        private int theInt;
+        private long theLong;
+        private double theDouble;
+        private float theFloat;
+        private byte[] theByteArray;
+        private Nested nested;
+    }
+
+    @Test
+    public void testGenericAvroSchema() {
+        Schema<MyStruct> encodeSchema = Schema.AVRO(MyStruct.class);
+        GenericSchema decodeSchema = GenericAvroSchema.of(encodeSchema.getSchemaInfo());
+        testSchema(decodeSchema, SchemaType.AVRO);
+    }
+
+    @Test
+    public void testGenericJsonSchema() {
+        Schema<MyStruct> encodeSchema = Schema.JSON(MyStruct.class);
+        GenericSchema decodeSchema = GenericJsonSchema.of(encodeSchema.getSchemaInfo());
+        testSchema(decodeSchema, SchemaType.JSON);
+    }
+
+    private void testSchema(GenericSchema decodeSchema, SchemaType schemaTypeForStructs) {
+        findField(decodeSchema.getFields(), "theString", SchemaType.STRING);

Review comment:
       add tests for the field that cannot be matched to anything in the schema (`"theUnknown"`)

##########
File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/GenericSchemaImpl.java
##########
@@ -41,10 +43,126 @@ protected GenericSchemaImpl(SchemaInfo schemaInfo) {
 
         this.fields = schema.getFields()
                 .stream()
-                .map(f -> new Field(f.name(), f.pos()))
+                .map(f -> new Field(f.name(), f.pos(), convertFieldSchema(f, schemaInfo.getType())))
                 .collect(Collectors.toList());
     }
 
+
+    public static org.apache.pulsar.client.api.Schema<?> convertFieldSchema(Schema.Field f, SchemaType mainType) {
+        return convertFieldSchema(f.schema(), mainType);
+    }
+
+    private static org.apache.pulsar.client.api.Schema<?> convertFieldSchema(Schema schema, SchemaType mainType) {
+        switch (schema.getType()) {
+            case RECORD:
+                return buildStructSchema(schema, mainType);
+            case BYTES:
+                return GenericSchema.BYTES;
+            case LONG:
+                return GenericSchema.INT64;
+            case FLOAT:
+                return GenericSchema.FLOAT;
+            case DOUBLE:
+                return GenericSchema.DOUBLE;
+            case BOOLEAN:
+                return GenericSchema.BOOL;
+            case STRING:
+                return GenericSchema.STRING;
+            case INT:
+                return GenericSchema.INT32;
+            case UNION:
+                // this is very common while representing NULLABLE types
+                // the first entry is "null", the second is the effective type
+                List<Schema> types = schema.getTypes();
+                if (types.size() == 2) {
+                    if (types.stream().anyMatch(s -> s.getType() == Schema.Type.NULL)) {

Review comment:
       `if (types.size() == 2 && types.stream().filter(s -> s.getType() != Schema.Type.NULL)).count() == 1)` ?
   removes need for `.orElse` and null check later




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org