You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2021/01/27 13:21:05 UTC

[GitHub] [pulsar] eolivelli opened a new pull request #9343: Issue 9004: Pulsar Schema API: provide Type information for Fields

eolivelli opened a new pull request #9343:
URL: https://github.com/apache/pulsar/pull/9343


   Fixes #9004
   
   ### Motivation
   GenericRecord API exposes the list of `Fields` but not the datatype associated to the field.
   this patch introduces support for retrieving the `Schema` for each Field.
   
   This information is available only for the limited subset of data types supported by Pulsar, for instance Avro MAPS and ARRAYS are not implemented and there is not way to represent such data types using SchemaType.
   In the future when support for new datatype will be added we will support those new types in this system.  
   
   The implementation covers Avro and JSON schema types.
   
   With this implementation Pulsar IO support for Metadata will be more similar the one in Kafka Connect, even we are still far as we do not have full support for all AVRO datatypes. 
   
   ### Modifications
   - add new Field#getSchema method
   - add new version of RecordSchemaBuilder#field that accepts a Schema instead of a GenericSchema (` FieldSchemaBuilder field(String fieldName, org.apache.pulsar.client.api.Schema<?> genericSchema);`)
   - add relevant test cases
   
   ### Verifying this change
   This change added tests.
   I tested manually that on Sinks the information is available, there is no need to add more integration tests
   
   ### Documentation
     - Does this pull request introduce a new feature? yes
     - If yes, how is the feature documented? JavaDocs


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] eolivelli commented on pull request #9343: Issue 9004: Pulsar Schema API: provide Type information for Fields

Posted by GitBox <gi...@apache.org>.
eolivelli commented on pull request #9343:
URL: https://github.com/apache/pulsar/pull/9343#issuecomment-810424070


   @vroyer 
   we went down the path of adding `GenericObject#getNativeObject` (formerly GenericRecord#getNativeRecord) in order to let the user deal with AVRO directly.
   
   the problem, raised by @sijie , is that we would have to maintain the mapping between our "general purpose" Schema API and Avro, Protobuf and all of the future technologies that we want to support.
   
   Avro and Protobuf are very different, and the API that we can create will be only an approximation and it won't be able to represent all of the internal features of such libraries, in other words, advanced users will not be able to use Pulsar.
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] eolivelli closed pull request #9343: Issue 9004: Pulsar Schema API: provide Type information for Fields

Posted by GitBox <gi...@apache.org>.
eolivelli closed pull request #9343:
URL: https://github.com/apache/pulsar/pull/9343


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] eolivelli commented on pull request #9343: Issue 9004: Pulsar Schema API: provide Type information for Fields

Posted by GitBox <gi...@apache.org>.
eolivelli commented on pull request #9343:
URL: https://github.com/apache/pulsar/pull/9343#issuecomment-781883953


   closing in favour of #9614 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] eolivelli commented on pull request #9343: Issue 9004: Pulsar Schema API: provide Type information for Fields

Posted by GitBox <gi...@apache.org>.
eolivelli commented on pull request #9343:
URL: https://github.com/apache/pulsar/pull/9343#issuecomment-781220628


   @sijie I sent a new PR that allows you to access the underlying Schema information
   
   https://github.com/apache/pulsar/pull/9614
   
   I am going to close this PR if you are fine with that new approach


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] vroyer commented on pull request #9343: Issue 9004: Pulsar Schema API: provide Type information for Fields

Posted by GitBox <gi...@apache.org>.
vroyer commented on pull request #9343:
URL: https://github.com/apache/pulsar/pull/9343#issuecomment-810391780


   By making this decision (no generic schema on top of multiple encoding types), the Pulsar IO ecosystem won't be able to do data transformation without dealing with the data serialisation/encoding. Don't you think it is a limitation compared to the Kafka connect ecosystem ?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] eolivelli commented on pull request #9343: Issue 9004: Pulsar Schema API: provide Type information for Fields

Posted by GitBox <gi...@apache.org>.
eolivelli commented on pull request #9343:
URL: https://github.com/apache/pulsar/pull/9343#issuecomment-770768654


   Sorry, I didn't mean that Pulsar IO is a competitor for Kafka Connect, but I would like to share my experience in porting projects that are working on Kafka Connect to Pulsar IO Sinks.
   
   I recently got stuck in this problem, that when you deal with GenericRecord you cannot have metadata about the Schema, you can only access the raw schema bytes or you have to use Java reflection in order to deal with the type of each field (and this also is a problem because for null values you do not know the original datatype).
   
   GenericRecord is a wonderful abstraction over generic data structures and it looks to me that the most missing part of the story is about having this kind of metadata about the structure.
   
   GenericRecord already provides many APIs to create records and create schemas (RecordSchemaBuilder and GenericRecordBuilder) , the only missing part here is to "read" that metadata.
   
   Once we have this API working with structured data with Pulsar IO (and probably with PulsarFunctions in general) will be more easier.
   
   Dealing directly with Avro (and Protobuf...) is not a good way to go for my usecases, because I will have to explicitly write code for every schema type, and also enter the details of every technology. 
   
   For simple cases (but GenericRecord is not that simple, it is already very powerful, as it is already able to deal with nested structs) it is a great feature to be able to code your Pulsar Sink independently from the physical encoding of the records, the same way we do with Compression.
   
   
   cc @shiv4289 @aahmed-se 
   
   
   
   
   
   
   
   
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] dlg99 commented on a change in pull request #9343: Issue 9004: Pulsar Schema API: provide Type information for Fields

Posted by GitBox <gi...@apache.org>.
dlg99 commented on a change in pull request #9343:
URL: https://github.com/apache/pulsar/pull/9343#discussion_r565513072



##########
File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/GenericSchemaImpl.java
##########
@@ -41,10 +43,126 @@ protected GenericSchemaImpl(SchemaInfo schemaInfo) {
 
         this.fields = schema.getFields()
                 .stream()
-                .map(f -> new Field(f.name(), f.pos()))
+                .map(f -> new Field(f.name(), f.pos(), convertFieldSchema(f, schemaInfo.getType())))
                 .collect(Collectors.toList());
     }
 
+
+    public static org.apache.pulsar.client.api.Schema<?> convertFieldSchema(Schema.Field f, SchemaType mainType) {
+        return convertFieldSchema(f.schema(), mainType);
+    }
+
+    private static org.apache.pulsar.client.api.Schema<?> convertFieldSchema(Schema schema, SchemaType mainType) {
+        switch (schema.getType()) {
+            case RECORD:
+                return buildStructSchema(schema, mainType);
+            case BYTES:
+                return GenericSchema.BYTES;
+            case LONG:
+                return GenericSchema.INT64;
+            case FLOAT:
+                return GenericSchema.FLOAT;
+            case DOUBLE:
+                return GenericSchema.DOUBLE;
+            case BOOLEAN:
+                return GenericSchema.BOOL;
+            case STRING:
+                return GenericSchema.STRING;
+            case INT:
+                return GenericSchema.INT32;
+            case UNION:
+                // this is very common while representing NULLABLE types
+                // the first entry is "null", the second is the effective type
+                List<Schema> types = schema.getTypes();
+                if (types.size() == 2) {
+                    if (types.stream().anyMatch(s -> s.getType() == Schema.Type.NULL)) {
+                        Schema nonNull = types.stream().filter(s -> s.getType() != Schema.Type.NULL).findFirst().orElse(null);
+                        if (nonNull != null) {
+                            return convertFieldSchema(nonNull, mainType);
+                        }
+                    }
+                }
+                return null;
+            case NULL:

Review comment:
       add tests for these special cases

##########
File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/GenericJsonSchema.java
##########
@@ -43,4 +51,88 @@ public GenericJsonSchema(SchemaInfo schemaInfo) {
     public GenericRecordBuilder newRecordBuilder() {
         throw new UnsupportedOperationException("Json Schema doesn't support record builder yet");
     }
+
+    static org.apache.pulsar.client.api.Schema<?> convertFieldSchema(JsonNode fn) {
+        if (fn.isContainerNode()) {
+            return buildStructSchema(fn, new AtomicInteger());
+        }
+        switch (fn.getNodeType()) {
+            case STRING:
+                return GenericSchema.STRING;
+            case BOOLEAN:
+                return GenericSchema.BOOL;
+            case BINARY:
+                return GenericSchema.BYTES;
+            case NUMBER:
+                if (fn.isInt()) {
+                    return GenericSchema.INT32;
+                } else if (fn.isLong()) {
+                    return GenericSchema.INT64;
+                } else if (fn.isFloat()) {
+                    return GenericSchema.FLOAT;
+                } else if (fn.isDouble()) {
+                    return GenericSchema.DOUBLE;
+                } else {
+                    return null;

Review comment:
       (here and in other places) Consider using `Optional<>`. 
   Are you using nulls for the performance reasons? how frequently this code runs?

##########
File path: pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/generic/SchemaFileTypeConversionTest.java
##########
@@ -0,0 +1,98 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl.schema.generic;
+
+import lombok.Data;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.schema.Field;
+
+import org.apache.pulsar.client.api.schema.GenericSchema;
+import org.apache.pulsar.common.schema.SchemaType;
+import org.testng.annotations.Test;
+
+import java.util.Collection;
+
+import static org.testng.Assert.*;
+
+/**
+ * Unit testing generic schemas.
+ */
+@Slf4j
+public class SchemaFileTypeConversionTest {
+
+    @Data
+    private static final class Nested {
+        private int value;
+    }
+
+    @Data
+    private static final class MyStruct {
+        private String theString;
+        private boolean theBool;
+        private int theInt;
+        private long theLong;
+        private double theDouble;
+        private float theFloat;
+        private byte[] theByteArray;
+        private Nested nested;
+    }
+
+    @Test
+    public void testGenericAvroSchema() {
+        Schema<MyStruct> encodeSchema = Schema.AVRO(MyStruct.class);
+        GenericSchema decodeSchema = GenericAvroSchema.of(encodeSchema.getSchemaInfo());
+        testSchema(decodeSchema, SchemaType.AVRO);
+    }
+
+    @Test
+    public void testGenericJsonSchema() {
+        Schema<MyStruct> encodeSchema = Schema.JSON(MyStruct.class);
+        GenericSchema decodeSchema = GenericJsonSchema.of(encodeSchema.getSchemaInfo());
+        testSchema(decodeSchema, SchemaType.JSON);
+    }
+
+    private void testSchema(GenericSchema decodeSchema, SchemaType schemaTypeForStructs) {
+        findField(decodeSchema.getFields(), "theString", SchemaType.STRING);

Review comment:
       add tests for the field that cannot be matched to anything in the schema (`"theUnknown"`)

##########
File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/GenericSchemaImpl.java
##########
@@ -41,10 +43,126 @@ protected GenericSchemaImpl(SchemaInfo schemaInfo) {
 
         this.fields = schema.getFields()
                 .stream()
-                .map(f -> new Field(f.name(), f.pos()))
+                .map(f -> new Field(f.name(), f.pos(), convertFieldSchema(f, schemaInfo.getType())))
                 .collect(Collectors.toList());
     }
 
+
+    public static org.apache.pulsar.client.api.Schema<?> convertFieldSchema(Schema.Field f, SchemaType mainType) {
+        return convertFieldSchema(f.schema(), mainType);
+    }
+
+    private static org.apache.pulsar.client.api.Schema<?> convertFieldSchema(Schema schema, SchemaType mainType) {
+        switch (schema.getType()) {
+            case RECORD:
+                return buildStructSchema(schema, mainType);
+            case BYTES:
+                return GenericSchema.BYTES;
+            case LONG:
+                return GenericSchema.INT64;
+            case FLOAT:
+                return GenericSchema.FLOAT;
+            case DOUBLE:
+                return GenericSchema.DOUBLE;
+            case BOOLEAN:
+                return GenericSchema.BOOL;
+            case STRING:
+                return GenericSchema.STRING;
+            case INT:
+                return GenericSchema.INT32;
+            case UNION:
+                // this is very common while representing NULLABLE types
+                // the first entry is "null", the second is the effective type
+                List<Schema> types = schema.getTypes();
+                if (types.size() == 2) {
+                    if (types.stream().anyMatch(s -> s.getType() == Schema.Type.NULL)) {

Review comment:
       `if (types.size() == 2 && types.stream().filter(s -> s.getType() != Schema.Type.NULL)).count() == 1)` ?
   removes need for `.orElse` and null check later




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] eolivelli commented on pull request #9343: Issue 9004: Pulsar Schema API: provide Type information for Fields

Posted by GitBox <gi...@apache.org>.
eolivelli commented on pull request #9343:
URL: https://github.com/apache/pulsar/pull/9343#issuecomment-768871076


   > When we design Pulsar schema, we don't want to invent yet another type system.
   
   @sijie
   In Kafka Connect we see this great abstraction of [Schema](https://kafka.apache.org/11/javadoc/org/apache/kafka/connect/data/Schema.html)
   and this makes it very easy to implement Connectors (especially Sinks) in a way that is not dependant on the serialization mechanism.
   
   Therefore in Pulsar IO we do not allow the function implementation to access the underlying Avro libraries, I like this fact and I would like not to change it.
   
   So my thinking is that we need our way to describe a Schema with a Pulsar API.
   I would like also to follow up in order to add to the Pulsar Schema API all of the data types that are missing, like Arrays and Maps.
   
   In my opinion without these features Pulsar IO won't be able to compete with Kafka Connect, and also to "port" existing Kafka Connect workflows to the Pulsar Ecosystem   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] sijie commented on a change in pull request #9343: Issue 9004: Pulsar Schema API: provide Type information for Fields

Posted by GitBox <gi...@apache.org>.
sijie commented on a change in pull request #9343:
URL: https://github.com/apache/pulsar/pull/9343#discussion_r565784657



##########
File path: pulsar-client-api/src/main/java/org/apache/pulsar/client/api/schema/RecordSchemaBuilder.java
##########
@@ -54,7 +54,17 @@
      * @param genericSchema schema of the field
      * @return field schema builder to build the field.
      */
-    FieldSchemaBuilder field(String fieldName, GenericSchema genericSchema);
+    FieldSchemaBuilder field(String fieldName, org.apache.pulsar.client.api.Schema<?> genericSchema);
+
+    /**
+     * Add a field with the given name and genericSchema to the record.
+     *
+     * @param fieldName name of the field
+     * @param genericSchema schema of the field
+     * @return field schema builder to build the field.
+     */
+    @Deprecated
+    FieldSchemaBuilder field(String fieldName, GenericSchema<?> genericSchema);

Review comment:
       Any reason why do you deprecated it? 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] eolivelli commented on a change in pull request #9343: Issue 9004: Pulsar Schema API: provide Type information for Fields

Posted by GitBox <gi...@apache.org>.
eolivelli commented on a change in pull request #9343:
URL: https://github.com/apache/pulsar/pull/9343#discussion_r565878943



##########
File path: pulsar-client-api/src/main/java/org/apache/pulsar/client/api/schema/RecordSchemaBuilder.java
##########
@@ -54,7 +54,17 @@
      * @param genericSchema schema of the field
      * @return field schema builder to build the field.
      */
-    FieldSchemaBuilder field(String fieldName, GenericSchema genericSchema);
+    FieldSchemaBuilder field(String fieldName, org.apache.pulsar.client.api.Schema<?> genericSchema);
+
+    /**
+     * Add a field with the given name and genericSchema to the record.
+     *
+     * @param fieldName name of the field
+     * @param genericSchema schema of the field
+     * @return field schema builder to build the field.
+     */
+    @Deprecated
+    FieldSchemaBuilder field(String fieldName, GenericSchema<?> genericSchema);

Review comment:
       I have deprecated it in favour of the new method that allows a `Schema<?>`
   the new method is more general, and it should be preferred.
   I cannot remove the one with `GenericSchema<?>` in order to keep binary compatibility with compiled code




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] sijie commented on pull request #9343: Issue 9004: Pulsar Schema API: provide Type information for Fields

Posted by GitBox <gi...@apache.org>.
sijie commented on pull request #9343:
URL: https://github.com/apache/pulsar/pull/9343#issuecomment-768990538


   @eolivelli 
   
   There are two questions here. Let's not couple them together.
   
   1) Whether we want to introduce a type system or not? If we want to introduce a type system what would be the approach.
   2) How connector can access the schema information?
   
   For the first one, I have a really strong different opinion on introducing a type system. I have seen problems with converting types between different systems. AVRO is already an open-standard that all the existing computing engines support. If we are introducing another type of system, we introduce a lot of troubles when integrating with other computing engines. That's something I would avoid. If you want to do that, let's think carefully. It is not just about connectors. Pulsar has a broader set of integrations that relies on schema. We can't afford to maintain different type systems and different approaches for different integrations. You end up need to maintain a lot of converters between Pulsar schemas with many other systems. It is going to be a nightmare. 
   
   For the second one, we should just let users access the underlying libraries. AVRO, JSON, and PROTOBUF have different type systems. Let's use their existing tools and not reinvent one.
   
   > without these features Pulsar IO won't be able to compete with Kafka Connect, and also to "port" existing Kafka Connect workflows to the Pulsar Ecosystem
   
   First of all, Pulsar I/O doesn't compete with Kafka Connect. Pulsar I/O is an extension of the Pulsar ecosystem. Kafka Connect is the extension to Kafka. They might share similar goals. But they don't compete with each other. 
   
   Secondly, we embrace the Kafka ecosystem by providing a fully compatible wrapper to run existing Kafka connectors without introducing a type system. The key for this to work is using an open standard like ARVO. 
   
   ```
   Kafka Schema -> [ Open Standards: AVRO ] -> Pulsar Schema 
   ```
   
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org