You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by "TengHuo (via GitHub)" <gi...@apache.org> on 2023/02/02 04:00:04 UTC

[GitHub] [hudi] TengHuo commented on issue #7691: [SUPPORT] Flink's schema conflicts with spark's schema.

TengHuo commented on issue #7691:
URL: https://github.com/apache/hudi/issues/7691#issuecomment-1413118913

   > > @TengHuo @danny0405 What's the followup here? If it's a bug, is it going to be fixed in 0.13.0?
   > > I fixed it in this pr #7694
   
   @LinMingQiang The PR should fix the compatibility check error only. When resolving Avro schema, the generator will compare the full name of schema if the type is `FIXED`, `ENUM`, `ARRAY`, `MAP`, `RECORD` and `UNION`. If the full name is not matched, the generator may return a `Symbol.error`. Then it will be throw as an `AvroTypeException` in `ResolvingDecoder#doAction`.
   
   Code reference: `org.apache.avro.io.parsing.ResolvingGrammarGenerator#generate(Schema writer, Schema reader, Map<LitS, Symbol> seen)`
   
   ```java
     public Symbol generate(Schema writer, Schema reader,
                                   Map<LitS, Symbol> seen) throws IOException
     {
       final Schema.Type writerType = writer.getType();
       final Schema.Type readerType = reader.getType();
   
       if (writerType == readerType) {
         switch (writerType) {
         case NULL:
           return Symbol.NULL;
         case BOOLEAN:
           return Symbol.BOOLEAN;
         case INT:
           return Symbol.INT;
         case LONG:
           return Symbol.LONG;
         case FLOAT:
           return Symbol.FLOAT;
         case DOUBLE:
           return Symbol.DOUBLE;
         case STRING:
           return Symbol.STRING;
         case BYTES:
           return Symbol.BYTES;
         case FIXED:
           if (writer.getFullName().equals(reader.getFullName())
               && writer.getFixedSize() == reader.getFixedSize()) {
             return Symbol.seq(Symbol.intCheckAction(writer.getFixedSize()),
                 Symbol.FIXED);
           }
           break;
   
         case ENUM:
           if (writer.getFullName() == null
                   || writer.getFullName().equals(reader.getFullName())) {
             return Symbol.seq(mkEnumAdjust(writer.getEnumSymbols(),
                     reader.getEnumSymbols()), Symbol.ENUM);
           }
           break;
   
         case ARRAY:
           return Symbol.seq(Symbol.repeat(Symbol.ARRAY_END,
                   generate(writer.getElementType(),
                   reader.getElementType(), seen)),
               Symbol.ARRAY_START);
   
         case MAP:
           return Symbol.seq(Symbol.repeat(Symbol.MAP_END,
                   generate(writer.getValueType(),
                   reader.getValueType(), seen), Symbol.STRING),
               Symbol.MAP_START);
         case RECORD:
           return resolveRecords(writer, reader, seen);
         case UNION:
           return resolveUnion(writer, reader, seen);
         default:
           throw new AvroTypeException("Unkown type for schema: " + writerType);
         }
       } else {  // writer and reader are of different types
         if (writerType == Schema.Type.UNION) {
           return resolveUnion(writer, reader, seen);
         }
   
         switch (readerType) {
         case LONG:
           switch (writerType) {
           case INT:
             return Symbol.resolve(super.generate(writer, seen), Symbol.LONG);
           }
           break;
   
         case FLOAT:
           switch (writerType) {
           case INT:
           case LONG:
             return Symbol.resolve(super.generate(writer, seen), Symbol.FLOAT);
           }
           break;
   
         case DOUBLE:
           switch (writerType) {
           case INT:
           case LONG:
           case FLOAT:
             return Symbol.resolve(super.generate(writer, seen), Symbol.DOUBLE);
           }
           break;
   
         case BYTES:
           switch (writerType) {
           case STRING:
             return Symbol.resolve(super.generate(writer, seen), Symbol.BYTES);
           }
           break;
   
         case STRING:
           switch (writerType) {
           case BYTES:
             return Symbol.resolve(super.generate(writer, seen), Symbol.STRING);
           }
           break;
   
         case UNION:
           int j = bestBranch(reader, writer, seen);
           if (j >= 0) {
             Symbol s = generate(writer, reader.getTypes().get(j), seen);
             return Symbol.seq(Symbol.unionAdjustAction(j, s), Symbol.UNION);
           }
           break;
         case NULL:
         case BOOLEAN:
         case INT:
         case ENUM:
         case ARRAY:
         case MAP:
         case RECORD:
         case FIXED:
           break;
         default:
           throw new RuntimeException("Unexpected schema type: " + readerType);
         }
       }
       return Symbol.error("Found " + writer.getFullName()
                           + ", expecting " + reader.getFullName());
     }
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org