You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by "voonhous (via GitHub)" <gi...@apache.org> on 2023/03/28 06:59:30 UTC

[GitHub] [hudi] voonhous opened a new pull request, #8307: [HUDI-5992] Fix (de)serialization for avro versions > 1.10.0

voonhous opened a new pull request, #8307:
URL: https://github.com/apache/hudi/pull/8307

   ### Change Logs
   Added a custom (de)serializer to handle `GenericData$Fixed` types.
   
   Please refer to [HUDI-5992](https://issues.apache.org/jira/browse/HUDI-5992) for more details.
   
   ### Impact
   
   `GenericData$Fixed` types will be ser-de with Hudi's **GenericAvroSerializer**, which is heavily adapted from:
   
   https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/serializer/GenericAvroSerializer.scala
   
   ### Risk level (write none, low medium or high below)
   
   None
   
   ### Documentation Update
   
   _Describe any necessary documentation update if there is any new feature, config, or user-facing change_
   
   - _The config description must be updated if new configs are added or the default value of the configs are changed_
   - _Any new feature or user-facing change requires updating the Hudi website. Please create a Jira ticket, attach the
     ticket number here and follow the [instruction](https://hudi.apache.org/contribute/developer-setup#website) to make
     changes to the website._
   
   ### Contributor's checklist
   
   - [ ] Read through [contributor's guide](https://hudi.apache.org/contribute/how-to-contribute)
   - [ ] Change Logs and Impact were stated clearly
   - [ ] Adequate tests were added if applicable
   - [ ] CI passed
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] hudi-bot commented on pull request #8307: [HUDI-5992] Fix (de)serialization for avro versions > 1.10.0

Posted by "hudi-bot (via GitHub)" <gi...@apache.org>.
hudi-bot commented on PR #8307:
URL: https://github.com/apache/hudi/pull/8307#issuecomment-1486993617

   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "bc6d6dc5798e71d00da8e48f641307ae1789b285",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15954",
       "triggerID" : "bc6d6dc5798e71d00da8e48f641307ae1789b285",
       "triggerType" : "PUSH"
     }, {
       "hash" : "8aef9327c27d3f716a7f4c40f9a6d0ea6f370d3e",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15957",
       "triggerID" : "8aef9327c27d3f716a7f4c40f9a6d0ea6f370d3e",
       "triggerType" : "PUSH"
     }, {
       "hash" : "8aef9327c27d3f716a7f4c40f9a6d0ea6f370d3e",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15961",
       "triggerID" : "1486933377",
       "triggerType" : "MANUAL"
     } ]
   }-->
   ## CI report:
   
   * 8aef9327c27d3f716a7f4c40f9a6d0ea6f370d3e Azure: [FAILURE](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15957) Azure: [PENDING](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15961) 
   
   <details>
   <summary>Bot commands</summary>
     @hudi-bot supports the following commands:
   
    - `@hudi-bot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] voonhous commented on a diff in pull request #8307: [HUDI-5992] Fix (de)serialization for avro versions > 1.10.0

Posted by "voonhous (via GitHub)" <gi...@apache.org>.
voonhous commented on code in PR #8307:
URL: https://github.com/apache/hudi/pull/8307#discussion_r1151467972


##########
hudi-common/src/main/java/org/apache/hudi/avro/GenericAvroSerializer.java:
##########
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.avro;
+
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.Serializer;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+import org.apache.avro.Schema;
+import org.apache.avro.SchemaNormalization;
+import org.apache.avro.generic.GenericContainer;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.generic.GenericDatumWriter;
+import org.apache.avro.io.DatumReader;
+import org.apache.avro.io.DatumWriter;
+import org.apache.avro.io.Decoder;
+import org.apache.avro.io.DecoderFactory;
+import org.apache.avro.io.Encoder;
+import org.apache.avro.io.EncoderFactory;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.HashMap;
+
+
+/**
+ * Custom serializer used for generic Avro containers.
+ * <p>
+ * Heavily adapted from:
+ * <p>
+ * <a href="https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/serializer/GenericAvroSerializer.scala">GenericAvroSerializer.scala</a>
+ * <p>
+ * As {@link org.apache.hudi.common.util.SerializationUtils} is not shared between threads and does not concern any
+ * shuffling operations, compression and decompression cache is omitted as network IO is not a concern.
+ * <p>
+ * Unlike Spark's implementation, the class and constructor is not initialized with a predefined map of avro schemas.
+ * This is the case as schemas to read and write are not known beforehand.
+ *
+ * @param <D> the subtype of [[GenericContainer]] handled by this serializer
+ */
+public class GenericAvroSerializer<D extends GenericContainer> extends Serializer<D> {
+
+  // reuses the same datum reader/writer since the same schema will be used many times
+  private final HashMap<Schema, DatumWriter<D>> writerCache = new HashMap<>();
+  private final HashMap<Schema, DatumReader<D>> readerCache = new HashMap<>();
+
+  // fingerprinting is very expensive so this alleviates most of the work
+  private final HashMap<Schema, Long> fingerprintCache = new HashMap<>();
+  private final HashMap<Long, Schema> schemaCache = new HashMap<>();
+
+  private Long getFingerprint(Schema schema) {
+    if (fingerprintCache.containsKey(schema)) {
+      return fingerprintCache.get(schema);
+    } else {
+      Long fingerprint = SchemaNormalization.parsingFingerprint64(schema);
+      fingerprintCache.put(schema, fingerprint);
+      return fingerprint;
+    }
+  }
+
+  private Schema getSchema(Long fingerprint, byte[] schemaBytes) {
+    if (schemaCache.containsKey(fingerprint)) {
+      return schemaCache.get(fingerprint);
+    } else {
+      String schema = new String(schemaBytes, StandardCharsets.UTF_8);
+      Schema parsedSchema = new Schema.Parser().parse(schema);
+      schemaCache.put(fingerprint, parsedSchema);
+      return parsedSchema;
+    }
+  }
+
+  private DatumWriter<D> getDatumWriter(Schema schema) {
+    DatumWriter<D> writer;
+    if (writerCache.containsKey(schema)) {
+      writer = writerCache.get(schema);
+    } else {
+      writer = new GenericDatumWriter<>(schema);
+      writerCache.put(schema, writer);
+    }
+    return writer;
+  }
+
+  private DatumReader<D> getDatumReader(Schema schema) {
+    DatumReader<D> reader;
+    if (readerCache.containsKey(schema)) {
+      reader = readerCache.get(schema);
+    } else {
+      reader = new GenericDatumReader<>(schema);
+      readerCache.put(schema, reader);
+    }
+    return reader;
+  }
+
+  private void serializeDatum(D datum, Output output) throws IOException {
+    Encoder encoder = EncoderFactory.get().directBinaryEncoder(output, null);
+    Schema schema = datum.getSchema();
+    Long fingerprint = this.getFingerprint(schema);
+    byte[] schemaBytes = schema.toString().getBytes(StandardCharsets.UTF_8);

Review Comment:
   In that case, I'll change the cache as such?
   
   ```java
   private final HashMap<Schema, byte[]> fingerprintCache = new HashMap<>();
   private final HashMap<byte[], Schema> schemaCache = new HashMap<>();
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] voonhous commented on pull request #8307: [HUDI-5992] Fix (de)serialization for avro versions > 1.10.0

Posted by "voonhous (via GitHub)" <gi...@apache.org>.
voonhous commented on PR #8307:
URL: https://github.com/apache/hudi/pull/8307#issuecomment-1488030287

   @hudi-bot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] hudi-bot commented on pull request #8307: [HUDI-5992] Fix (de)serialization for avro versions > 1.10.0

Posted by "hudi-bot (via GitHub)" <gi...@apache.org>.
hudi-bot commented on PR #8307:
URL: https://github.com/apache/hudi/pull/8307#issuecomment-1488016726

   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "bc6d6dc5798e71d00da8e48f641307ae1789b285",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15954",
       "triggerID" : "bc6d6dc5798e71d00da8e48f641307ae1789b285",
       "triggerType" : "PUSH"
     }, {
       "hash" : "8aef9327c27d3f716a7f4c40f9a6d0ea6f370d3e",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15957",
       "triggerID" : "8aef9327c27d3f716a7f4c40f9a6d0ea6f370d3e",
       "triggerType" : "PUSH"
     }, {
       "hash" : "8aef9327c27d3f716a7f4c40f9a6d0ea6f370d3e",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15961",
       "triggerID" : "1486933377",
       "triggerType" : "MANUAL"
     }, {
       "hash" : "d8521565c1a8a4e215f779c525b7d123b44b94b3",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15965",
       "triggerID" : "d8521565c1a8a4e215f779c525b7d123b44b94b3",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * d8521565c1a8a4e215f779c525b7d123b44b94b3 Azure: [FAILURE](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15965) 
   
   <details>
   <summary>Bot commands</summary>
     @hudi-bot supports the following commands:
   
    - `@hudi-bot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] danny0405 merged pull request #8307: [HUDI-5992] Fix (de)serialization for avro versions > 1.10.0

Posted by "danny0405 (via GitHub)" <gi...@apache.org>.
danny0405 merged PR #8307:
URL: https://github.com/apache/hudi/pull/8307


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] voonhous commented on a diff in pull request #8307: [HUDI-5992] Fix (de)serialization for avro versions > 1.10.0

Posted by "voonhous (via GitHub)" <gi...@apache.org>.
voonhous commented on code in PR #8307:
URL: https://github.com/apache/hudi/pull/8307#discussion_r1153030426


##########
hudi-common/src/main/java/org/apache/hudi/avro/GenericAvroSerializer.java:
##########
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.avro;
+
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.Serializer;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+import org.apache.avro.Schema;
+import org.apache.avro.SchemaNormalization;
+import org.apache.avro.generic.GenericContainer;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.generic.GenericDatumWriter;
+import org.apache.avro.io.DatumReader;
+import org.apache.avro.io.DatumWriter;
+import org.apache.avro.io.Decoder;
+import org.apache.avro.io.DecoderFactory;
+import org.apache.avro.io.Encoder;
+import org.apache.avro.io.EncoderFactory;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.HashMap;
+
+
+/**
+ * Custom serializer used for generic Avro containers.
+ * <p>
+ * Heavily adapted from:
+ * <p>
+ * <a href="https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/serializer/GenericAvroSerializer.scala">GenericAvroSerializer.scala</a>
+ * <p>
+ * As {@link org.apache.hudi.common.util.SerializationUtils} is not shared between threads and does not concern any
+ * shuffling operations, compression and decompression cache is omitted as network IO is not a concern.
+ * <p>
+ * Unlike Spark's implementation, the class and constructor is not initialized with a predefined map of avro schemas.
+ * This is the case as schemas to read and write are not known beforehand.
+ *
+ * @param <D> the subtype of [[GenericContainer]] handled by this serializer
+ */
+public class GenericAvroSerializer<D extends GenericContainer> extends Serializer<D> {
+
+  // reuses the same datum reader/writer since the same schema will be used many times
+  private final HashMap<Schema, DatumWriter<D>> writerCache = new HashMap<>();
+  private final HashMap<Schema, DatumReader<D>> readerCache = new HashMap<>();
+
+  // fingerprinting is very expensive so this alleviates most of the work
+  private final HashMap<Schema, Long> fingerprintCache = new HashMap<>();
+  private final HashMap<Long, Schema> schemaCache = new HashMap<>();
+
+  private Long getFingerprint(Schema schema) {
+    if (fingerprintCache.containsKey(schema)) {
+      return fingerprintCache.get(schema);
+    } else {
+      Long fingerprint = SchemaNormalization.parsingFingerprint64(schema);
+      fingerprintCache.put(schema, fingerprint);
+      return fingerprint;
+    }
+  }
+
+  private Schema getSchema(Long fingerprint, byte[] schemaBytes) {
+    if (schemaCache.containsKey(fingerprint)) {
+      return schemaCache.get(fingerprint);
+    } else {
+      String schema = new String(schemaBytes, StandardCharsets.UTF_8);
+      Schema parsedSchema = new Schema.Parser().parse(schema);
+      schemaCache.put(fingerprint, parsedSchema);
+      return parsedSchema;
+    }
+  }
+
+  private DatumWriter<D> getDatumWriter(Schema schema) {
+    DatumWriter<D> writer;
+    if (writerCache.containsKey(schema)) {
+      writer = writerCache.get(schema);
+    } else {
+      writer = new GenericDatumWriter<>(schema);
+      writerCache.put(schema, writer);
+    }
+    return writer;
+  }
+
+  private DatumReader<D> getDatumReader(Schema schema) {
+    DatumReader<D> reader;
+    if (readerCache.containsKey(schema)) {
+      reader = readerCache.get(schema);
+    } else {
+      reader = new GenericDatumReader<>(schema);
+      readerCache.put(schema, reader);
+    }
+    return reader;
+  }
+
+  private void serializeDatum(D datum, Output output) throws IOException {
+    Encoder encoder = EncoderFactory.get().directBinaryEncoder(output, null);
+    Schema schema = datum.getSchema();
+    Long fingerprint = this.getFingerprint(schema);
+    byte[] schemaBytes = schema.toString().getBytes(StandardCharsets.UTF_8);

Review Comment:
   Yeah, given that we are writing a long as a fingerprint, the storage penalty seems like a reasonable tradeoff for speed... 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] danny0405 commented on a diff in pull request #8307: [HUDI-5992] Fix (de)serialization for avro versions > 1.10.0

Posted by "danny0405 (via GitHub)" <gi...@apache.org>.
danny0405 commented on code in PR #8307:
URL: https://github.com/apache/hudi/pull/8307#discussion_r1151364009


##########
hudi-common/src/main/java/org/apache/hudi/avro/GenericAvroSerializer.java:
##########
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.avro;
+
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.Serializer;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+import org.apache.avro.Schema;
+import org.apache.avro.SchemaNormalization;
+import org.apache.avro.generic.GenericContainer;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.generic.GenericDatumWriter;
+import org.apache.avro.io.DatumReader;
+import org.apache.avro.io.DatumWriter;
+import org.apache.avro.io.Decoder;
+import org.apache.avro.io.DecoderFactory;
+import org.apache.avro.io.Encoder;
+import org.apache.avro.io.EncoderFactory;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.HashMap;
+
+
+/**
+ * Custom serializer used for generic Avro containers.
+ * <p>
+ * Heavily adapted from:
+ * <p>
+ * <a href="https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/serializer/GenericAvroSerializer.scala">GenericAvroSerializer.scala</a>
+ * <p>
+ * As {@link org.apache.hudi.common.util.SerializationUtils} is not shared between threads and does not concern any
+ * shuffling operations, compression and decompression cache is omitted as network IO is not a concern.
+ * <p>
+ * Unlike Spark's implementation, the class and constructor is not initialized with a predefined map of avro schemas.
+ * This is the case as schemas to read and write are not known beforehand.
+ *
+ * @param <D> the subtype of [[GenericContainer]] handled by this serializer
+ */
+public class GenericAvroSerializer<D extends GenericContainer> extends Serializer<D> {
+
+  // reuses the same datum reader/writer since the same schema will be used many times
+  private final HashMap<Schema, DatumWriter<D>> writerCache = new HashMap<>();
+  private final HashMap<Schema, DatumReader<D>> readerCache = new HashMap<>();
+
+  // fingerprinting is very expensive so this alleviates most of the work
+  private final HashMap<Schema, Long> fingerprintCache = new HashMap<>();
+  private final HashMap<Long, Schema> schemaCache = new HashMap<>();
+
+  private Long getFingerprint(Schema schema) {
+    if (fingerprintCache.containsKey(schema)) {
+      return fingerprintCache.get(schema);
+    } else {
+      Long fingerprint = SchemaNormalization.parsingFingerprint64(schema);
+      fingerprintCache.put(schema, fingerprint);
+      return fingerprint;
+    }
+  }
+
+  private Schema getSchema(Long fingerprint, byte[] schemaBytes) {
+    if (schemaCache.containsKey(fingerprint)) {
+      return schemaCache.get(fingerprint);
+    } else {
+      String schema = new String(schemaBytes, StandardCharsets.UTF_8);
+      Schema parsedSchema = new Schema.Parser().parse(schema);
+      schemaCache.put(fingerprint, parsedSchema);
+      return parsedSchema;
+    }
+  }
+
+  private DatumWriter<D> getDatumWriter(Schema schema) {
+    DatumWriter<D> writer;
+    if (writerCache.containsKey(schema)) {
+      writer = writerCache.get(schema);
+    } else {
+      writer = new GenericDatumWriter<>(schema);
+      writerCache.put(schema, writer);
+    }
+    return writer;
+  }
+
+  private DatumReader<D> getDatumReader(Schema schema) {
+    DatumReader<D> reader;
+    if (readerCache.containsKey(schema)) {
+      reader = readerCache.get(schema);
+    } else {
+      reader = new GenericDatumReader<>(schema);
+      readerCache.put(schema, reader);
+    }
+    return reader;
+  }
+
+  private void serializeDatum(D datum, Output output) throws IOException {
+    Encoder encoder = EncoderFactory.get().directBinaryEncoder(output, null);
+    Schema schema = datum.getSchema();
+    Long fingerprint = this.getFingerprint(schema);
+    byte[] schemaBytes = schema.toString().getBytes(StandardCharsets.UTF_8);

Review Comment:
   Then at least there is no need to serialize the fingerprint right? Just deserialize from the schema bytes directly should be fine.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] voonhous commented on pull request #8307: [HUDI-5992] Fix (de)serialization for avro versions > 1.10.0

Posted by "voonhous (via GitHub)" <gi...@apache.org>.
voonhous commented on PR #8307:
URL: https://github.com/apache/hudi/pull/8307#issuecomment-1486324094

   @danny0405 Can you please help to review this PR? Thank you!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] danny0405 commented on a diff in pull request #8307: [HUDI-5992] Fix (de)serialization for avro versions > 1.10.0

Posted by "danny0405 (via GitHub)" <gi...@apache.org>.
danny0405 commented on code in PR #8307:
URL: https://github.com/apache/hudi/pull/8307#discussion_r1150166449


##########
hudi-common/src/main/java/org/apache/hudi/avro/GenericAvroSerializer.java:
##########
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.avro;
+
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.Serializer;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+import org.apache.avro.Schema;
+import org.apache.avro.SchemaNormalization;
+import org.apache.avro.generic.GenericContainer;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.generic.GenericDatumWriter;
+import org.apache.avro.io.DatumReader;
+import org.apache.avro.io.DatumWriter;
+import org.apache.avro.io.Decoder;
+import org.apache.avro.io.DecoderFactory;
+import org.apache.avro.io.Encoder;
+import org.apache.avro.io.EncoderFactory;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.HashMap;
+
+
+/**
+ * Custom serializer used for generic Avro containers.
+ * <p>
+ * Heavily adapted from:
+ * <p>
+ * https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/serializer/GenericAvroSerializer.scala
+ * <p>
+ * As {@link org.apache.hudi.common.util.SerializationUtils} is not shared between threads and does not concern any
+ * shuffling operations, compression and decompression cache is omitted.
+ *

Review Comment:
   Thanks for the fix, can we add more details about the difference compared with the spark copy?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] hudi-bot commented on pull request #8307: [HUDI-5992] Fix (de)serialization for avro versions > 1.10.0

Posted by "hudi-bot (via GitHub)" <gi...@apache.org>.
hudi-bot commented on PR #8307:
URL: https://github.com/apache/hudi/pull/8307#issuecomment-1488097349

   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "bc6d6dc5798e71d00da8e48f641307ae1789b285",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15954",
       "triggerID" : "bc6d6dc5798e71d00da8e48f641307ae1789b285",
       "triggerType" : "PUSH"
     }, {
       "hash" : "8aef9327c27d3f716a7f4c40f9a6d0ea6f370d3e",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15957",
       "triggerID" : "8aef9327c27d3f716a7f4c40f9a6d0ea6f370d3e",
       "triggerType" : "PUSH"
     }, {
       "hash" : "8aef9327c27d3f716a7f4c40f9a6d0ea6f370d3e",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15961",
       "triggerID" : "1486933377",
       "triggerType" : "MANUAL"
     }, {
       "hash" : "d8521565c1a8a4e215f779c525b7d123b44b94b3",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15965",
       "triggerID" : "d8521565c1a8a4e215f779c525b7d123b44b94b3",
       "triggerType" : "PUSH"
     }, {
       "hash" : "d8521565c1a8a4e215f779c525b7d123b44b94b3",
       "status" : "CANCELED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15966",
       "triggerID" : "1488030287",
       "triggerType" : "MANUAL"
     }, {
       "hash" : "14984a0d4f5d2e7c05a55c0db380f68819e4a519",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15969",
       "triggerID" : "14984a0d4f5d2e7c05a55c0db380f68819e4a519",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * d8521565c1a8a4e215f779c525b7d123b44b94b3 Azure: [FAILURE](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15965) Azure: [CANCELED](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15966) 
   * 14984a0d4f5d2e7c05a55c0db380f68819e4a519 Azure: [PENDING](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15969) 
   
   <details>
   <summary>Bot commands</summary>
     @hudi-bot supports the following commands:
   
    - `@hudi-bot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] danny0405 commented on a diff in pull request #8307: [HUDI-5992] Fix (de)serialization for avro versions > 1.10.0

Posted by "danny0405 (via GitHub)" <gi...@apache.org>.
danny0405 commented on code in PR #8307:
URL: https://github.com/apache/hudi/pull/8307#discussion_r1151367039


##########
hudi-common/src/main/java/org/apache/hudi/avro/GenericAvroSerializer.java:
##########
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.avro;
+
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.Serializer;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+import org.apache.avro.Schema;
+import org.apache.avro.SchemaNormalization;
+import org.apache.avro.generic.GenericContainer;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.generic.GenericDatumWriter;
+import org.apache.avro.io.DatumReader;
+import org.apache.avro.io.DatumWriter;
+import org.apache.avro.io.Decoder;
+import org.apache.avro.io.DecoderFactory;
+import org.apache.avro.io.Encoder;
+import org.apache.avro.io.EncoderFactory;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.HashMap;
+
+
+/**
+ * Custom serializer used for generic Avro containers.
+ * <p>
+ * Heavily adapted from:
+ * <p>
+ * <a href="https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/serializer/GenericAvroSerializer.scala">GenericAvroSerializer.scala</a>
+ * <p>
+ * As {@link org.apache.hudi.common.util.SerializationUtils} is not shared between threads and does not concern any
+ * shuffling operations, compression and decompression cache is omitted as network IO is not a concern.
+ * <p>
+ * Unlike Spark's implementation, the class and constructor is not initialized with a predefined map of avro schemas.
+ * This is the case as schemas to read and write are not known beforehand.
+ *
+ * @param <D> the subtype of [[GenericContainer]] handled by this serializer
+ */
+public class GenericAvroSerializer<D extends GenericContainer> extends Serializer<D> {
+
+  // reuses the same datum reader/writer since the same schema will be used many times
+  private final HashMap<Schema, DatumWriter<D>> writerCache = new HashMap<>();
+  private final HashMap<Schema, DatumReader<D>> readerCache = new HashMap<>();
+
+  // fingerprinting is very expensive so this alleviates most of the work
+  private final HashMap<Schema, Long> fingerprintCache = new HashMap<>();
+  private final HashMap<Long, Schema> schemaCache = new HashMap<>();
+
+  private Long getFingerprint(Schema schema) {
+    if (fingerprintCache.containsKey(schema)) {
+      return fingerprintCache.get(schema);
+    } else {
+      Long fingerprint = SchemaNormalization.parsingFingerprint64(schema);
+      fingerprintCache.put(schema, fingerprint);
+      return fingerprint;
+    }
+  }
+
+  private Schema getSchema(Long fingerprint, byte[] schemaBytes) {
+    if (schemaCache.containsKey(fingerprint)) {
+      return schemaCache.get(fingerprint);
+    } else {
+      String schema = new String(schemaBytes, StandardCharsets.UTF_8);
+      Schema parsedSchema = new Schema.Parser().parse(schema);
+      schemaCache.put(fingerprint, parsedSchema);
+      return parsedSchema;
+    }
+  }
+
+  private DatumWriter<D> getDatumWriter(Schema schema) {
+    DatumWriter<D> writer;
+    if (writerCache.containsKey(schema)) {
+      writer = writerCache.get(schema);
+    } else {
+      writer = new GenericDatumWriter<>(schema);
+      writerCache.put(schema, writer);
+    }
+    return writer;
+  }
+
+  private DatumReader<D> getDatumReader(Schema schema) {
+    DatumReader<D> reader;
+    if (readerCache.containsKey(schema)) {
+      reader = readerCache.get(schema);
+    } else {
+      reader = new GenericDatumReader<>(schema);
+      readerCache.put(schema, reader);
+    }
+    return reader;
+  }
+
+  private void serializeDatum(D datum, Output output) throws IOException {
+    Encoder encoder = EncoderFactory.get().directBinaryEncoder(output, null);
+    Schema schema = datum.getSchema();
+    Long fingerprint = this.getFingerprint(schema);
+    byte[] schemaBytes = schema.toString().getBytes(StandardCharsets.UTF_8);

Review Comment:
   The schema cache could be useless now, see the code in Spark:
   
   ```scala
       def deserializeDatum(input: KryoInput): D = {
       val schema = {
         if (input.readBoolean()) {
           val fingerprint = input.readLong()
           schemaCache.getOrElseUpdate(fingerprint, {
             schemas.get(fingerprint) match {
               case Some(s) => new Schema.Parser().setValidateDefaults(false).parse(s)
               case None =>
                 throw new SparkException(
                   "Error reading attempting to read avro data -- encountered an unknown " +
                     s"fingerprint: $fingerprint, not sure what schema to use.  This could happen " +
                     "if you registered additional schemas after starting your spark context.")
             }
           })
         } else {
           val length = input.readInt()
           decompress(ByteBuffer.wrap(input.readBytes(length)))
         }
       }
       val decoder = DecoderFactory.get.directBinaryDecoder(input, null)
       readerCache.getOrElseUpdate(schema, GenericData.get.createDatumReader(schema))
         .asInstanceOf[DatumReader[D]]
         .read(null.asInstanceOf[D], decoder)
     }
   ```
   
   When fingleprint is disabled, the schema is decoded directly from the compressee bytes.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] voonhous commented on a diff in pull request #8307: [HUDI-5992] Fix (de)serialization for avro versions > 1.10.0

Posted by "voonhous (via GitHub)" <gi...@apache.org>.
voonhous commented on code in PR #8307:
URL: https://github.com/apache/hudi/pull/8307#discussion_r1151467972


##########
hudi-common/src/main/java/org/apache/hudi/avro/GenericAvroSerializer.java:
##########
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.avro;
+
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.Serializer;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+import org.apache.avro.Schema;
+import org.apache.avro.SchemaNormalization;
+import org.apache.avro.generic.GenericContainer;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.generic.GenericDatumWriter;
+import org.apache.avro.io.DatumReader;
+import org.apache.avro.io.DatumWriter;
+import org.apache.avro.io.Decoder;
+import org.apache.avro.io.DecoderFactory;
+import org.apache.avro.io.Encoder;
+import org.apache.avro.io.EncoderFactory;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.HashMap;
+
+
+/**
+ * Custom serializer used for generic Avro containers.
+ * <p>
+ * Heavily adapted from:
+ * <p>
+ * <a href="https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/serializer/GenericAvroSerializer.scala">GenericAvroSerializer.scala</a>
+ * <p>
+ * As {@link org.apache.hudi.common.util.SerializationUtils} is not shared between threads and does not concern any
+ * shuffling operations, compression and decompression cache is omitted as network IO is not a concern.
+ * <p>
+ * Unlike Spark's implementation, the class and constructor is not initialized with a predefined map of avro schemas.
+ * This is the case as schemas to read and write are not known beforehand.
+ *
+ * @param <D> the subtype of [[GenericContainer]] handled by this serializer
+ */
+public class GenericAvroSerializer<D extends GenericContainer> extends Serializer<D> {
+
+  // reuses the same datum reader/writer since the same schema will be used many times
+  private final HashMap<Schema, DatumWriter<D>> writerCache = new HashMap<>();
+  private final HashMap<Schema, DatumReader<D>> readerCache = new HashMap<>();
+
+  // fingerprinting is very expensive so this alleviates most of the work
+  private final HashMap<Schema, Long> fingerprintCache = new HashMap<>();
+  private final HashMap<Long, Schema> schemaCache = new HashMap<>();
+
+  private Long getFingerprint(Schema schema) {
+    if (fingerprintCache.containsKey(schema)) {
+      return fingerprintCache.get(schema);
+    } else {
+      Long fingerprint = SchemaNormalization.parsingFingerprint64(schema);
+      fingerprintCache.put(schema, fingerprint);
+      return fingerprint;
+    }
+  }
+
+  private Schema getSchema(Long fingerprint, byte[] schemaBytes) {
+    if (schemaCache.containsKey(fingerprint)) {
+      return schemaCache.get(fingerprint);
+    } else {
+      String schema = new String(schemaBytes, StandardCharsets.UTF_8);
+      Schema parsedSchema = new Schema.Parser().parse(schema);
+      schemaCache.put(fingerprint, parsedSchema);
+      return parsedSchema;
+    }
+  }
+
+  private DatumWriter<D> getDatumWriter(Schema schema) {
+    DatumWriter<D> writer;
+    if (writerCache.containsKey(schema)) {
+      writer = writerCache.get(schema);
+    } else {
+      writer = new GenericDatumWriter<>(schema);
+      writerCache.put(schema, writer);
+    }
+    return writer;
+  }
+
+  private DatumReader<D> getDatumReader(Schema schema) {
+    DatumReader<D> reader;
+    if (readerCache.containsKey(schema)) {
+      reader = readerCache.get(schema);
+    } else {
+      reader = new GenericDatumReader<>(schema);
+      readerCache.put(schema, reader);
+    }
+    return reader;
+  }
+
+  private void serializeDatum(D datum, Output output) throws IOException {
+    Encoder encoder = EncoderFactory.get().directBinaryEncoder(output, null);
+    Schema schema = datum.getSchema();
+    Long fingerprint = this.getFingerprint(schema);
+    byte[] schemaBytes = schema.toString().getBytes(StandardCharsets.UTF_8);

Review Comment:
   In that case, I'll change the cache as such?
   
   ```
   private final HashMap<Schema, byte[]> fingerprintCache = new HashMap<>();
   private final HashMap<byte[], Schema> schemaCache = new HashMap<>();
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] hudi-bot commented on pull request #8307: [HUDI-5992] Fix (de)serialization for avro versions > 1.10.0

Posted by "hudi-bot (via GitHub)" <gi...@apache.org>.
hudi-bot commented on PR #8307:
URL: https://github.com/apache/hudi/pull/8307#issuecomment-1488152426

   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "bc6d6dc5798e71d00da8e48f641307ae1789b285",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15954",
       "triggerID" : "bc6d6dc5798e71d00da8e48f641307ae1789b285",
       "triggerType" : "PUSH"
     }, {
       "hash" : "8aef9327c27d3f716a7f4c40f9a6d0ea6f370d3e",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15957",
       "triggerID" : "8aef9327c27d3f716a7f4c40f9a6d0ea6f370d3e",
       "triggerType" : "PUSH"
     }, {
       "hash" : "8aef9327c27d3f716a7f4c40f9a6d0ea6f370d3e",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15961",
       "triggerID" : "1486933377",
       "triggerType" : "MANUAL"
     }, {
       "hash" : "d8521565c1a8a4e215f779c525b7d123b44b94b3",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15965",
       "triggerID" : "d8521565c1a8a4e215f779c525b7d123b44b94b3",
       "triggerType" : "PUSH"
     }, {
       "hash" : "d8521565c1a8a4e215f779c525b7d123b44b94b3",
       "status" : "CANCELED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15966",
       "triggerID" : "1488030287",
       "triggerType" : "MANUAL"
     }, {
       "hash" : "14984a0d4f5d2e7c05a55c0db380f68819e4a519",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15969",
       "triggerID" : "14984a0d4f5d2e7c05a55c0db380f68819e4a519",
       "triggerType" : "PUSH"
     }, {
       "hash" : "b657f62b1c05863b4b9fa41ecd444d787309f0f9",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "b657f62b1c05863b4b9fa41ecd444d787309f0f9",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * d8521565c1a8a4e215f779c525b7d123b44b94b3 Azure: [FAILURE](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15965) Azure: [CANCELED](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15966) 
   * 14984a0d4f5d2e7c05a55c0db380f68819e4a519 Azure: [PENDING](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15969) 
   * b657f62b1c05863b4b9fa41ecd444d787309f0f9 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     @hudi-bot supports the following commands:
   
    - `@hudi-bot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] danny0405 commented on a diff in pull request #8307: [HUDI-5992] Fix (de)serialization for avro versions > 1.10.0

Posted by "danny0405 (via GitHub)" <gi...@apache.org>.
danny0405 commented on code in PR #8307:
URL: https://github.com/apache/hudi/pull/8307#discussion_r1152939264


##########
hudi-common/src/main/java/org/apache/hudi/avro/GenericAvroSerializer.java:
##########
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.avro;
+
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.Serializer;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+import org.apache.avro.Schema;
+import org.apache.avro.SchemaNormalization;
+import org.apache.avro.generic.GenericContainer;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.generic.GenericDatumWriter;
+import org.apache.avro.io.DatumReader;
+import org.apache.avro.io.DatumWriter;
+import org.apache.avro.io.Decoder;
+import org.apache.avro.io.DecoderFactory;
+import org.apache.avro.io.Encoder;
+import org.apache.avro.io.EncoderFactory;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.HashMap;
+
+
+/**
+ * Custom serializer used for generic Avro containers.
+ * <p>
+ * Heavily adapted from:
+ * <p>
+ * <a href="https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/serializer/GenericAvroSerializer.scala">GenericAvroSerializer.scala</a>
+ * <p>
+ * As {@link org.apache.hudi.common.util.SerializationUtils} is not shared between threads and does not concern any
+ * shuffling operations, compression and decompression cache is omitted as network IO is not a concern.
+ * <p>
+ * Unlike Spark's implementation, the class and constructor is not initialized with a predefined map of avro schemas.
+ * This is the case as schemas to read and write are not known beforehand.
+ *
+ * @param <D> the subtype of [[GenericContainer]] handled by this serializer
+ */
+public class GenericAvroSerializer<D extends GenericContainer> extends Serializer<D> {
+
+  // reuses the same datum reader/writer since the same schema will be used many times
+  private final HashMap<Schema, DatumWriter<D>> writerCache = new HashMap<>();
+  private final HashMap<Schema, DatumReader<D>> readerCache = new HashMap<>();
+
+  // fingerprinting is very expensive so this alleviates most of the work
+  private final HashMap<Schema, Long> fingerprintCache = new HashMap<>();
+  private final HashMap<Long, Schema> schemaCache = new HashMap<>();
+
+  private Long getFingerprint(Schema schema) {
+    if (fingerprintCache.containsKey(schema)) {
+      return fingerprintCache.get(schema);
+    } else {
+      Long fingerprint = SchemaNormalization.parsingFingerprint64(schema);
+      fingerprintCache.put(schema, fingerprint);
+      return fingerprint;
+    }
+  }
+
+  private Schema getSchema(Long fingerprint, byte[] schemaBytes) {
+    if (schemaCache.containsKey(fingerprint)) {
+      return schemaCache.get(fingerprint);
+    } else {
+      String schema = new String(schemaBytes, StandardCharsets.UTF_8);
+      Schema parsedSchema = new Schema.Parser().parse(schema);
+      schemaCache.put(fingerprint, parsedSchema);
+      return parsedSchema;
+    }
+  }
+
+  private DatumWriter<D> getDatumWriter(Schema schema) {
+    DatumWriter<D> writer;
+    if (writerCache.containsKey(schema)) {
+      writer = writerCache.get(schema);
+    } else {
+      writer = new GenericDatumWriter<>(schema);
+      writerCache.put(schema, writer);
+    }
+    return writer;
+  }
+
+  private DatumReader<D> getDatumReader(Schema schema) {
+    DatumReader<D> reader;
+    if (readerCache.containsKey(schema)) {
+      reader = readerCache.get(schema);
+    } else {
+      reader = new GenericDatumReader<>(schema);
+      readerCache.put(schema, reader);
+    }
+    return reader;
+  }
+
+  private void serializeDatum(D datum, Output output) throws IOException {
+    Encoder encoder = EncoderFactory.get().directBinaryEncoder(output, null);
+    Schema schema = datum.getSchema();
+    Long fingerprint = this.getFingerprint(schema);
+    byte[] schemaBytes = schema.toString().getBytes(StandardCharsets.UTF_8);

Review Comment:
   Yeah, a classic CPU and storage trade off.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] hudi-bot commented on pull request #8307: [HUDI-5992] Fix (de)serialization for avro versions > 1.10.0

Posted by "hudi-bot (via GitHub)" <gi...@apache.org>.
hudi-bot commented on PR #8307:
URL: https://github.com/apache/hudi/pull/8307#issuecomment-1486514994

   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "bc6d6dc5798e71d00da8e48f641307ae1789b285",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15954",
       "triggerID" : "bc6d6dc5798e71d00da8e48f641307ae1789b285",
       "triggerType" : "PUSH"
     }, {
       "hash" : "8aef9327c27d3f716a7f4c40f9a6d0ea6f370d3e",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "8aef9327c27d3f716a7f4c40f9a6d0ea6f370d3e",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * bc6d6dc5798e71d00da8e48f641307ae1789b285 Azure: [PENDING](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15954) 
   * 8aef9327c27d3f716a7f4c40f9a6d0ea6f370d3e UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     @hudi-bot supports the following commands:
   
    - `@hudi-bot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] voonhous commented on a diff in pull request #8307: [HUDI-5992] Fix (de)serialization for avro versions > 1.10.0

Posted by "voonhous (via GitHub)" <gi...@apache.org>.
voonhous commented on code in PR #8307:
URL: https://github.com/apache/hudi/pull/8307#discussion_r1151405975


##########
hudi-common/src/main/java/org/apache/hudi/avro/GenericAvroSerializer.java:
##########
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.avro;
+
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.Serializer;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+import org.apache.avro.Schema;
+import org.apache.avro.SchemaNormalization;
+import org.apache.avro.generic.GenericContainer;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.generic.GenericDatumWriter;
+import org.apache.avro.io.DatumReader;
+import org.apache.avro.io.DatumWriter;
+import org.apache.avro.io.Decoder;
+import org.apache.avro.io.DecoderFactory;
+import org.apache.avro.io.Encoder;
+import org.apache.avro.io.EncoderFactory;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.HashMap;
+
+
+/**
+ * Custom serializer used for generic Avro containers.
+ * <p>
+ * Heavily adapted from:
+ * <p>
+ * <a href="https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/serializer/GenericAvroSerializer.scala">GenericAvroSerializer.scala</a>
+ * <p>
+ * As {@link org.apache.hudi.common.util.SerializationUtils} is not shared between threads and does not concern any
+ * shuffling operations, compression and decompression cache is omitted as network IO is not a concern.
+ * <p>
+ * Unlike Spark's implementation, the class and constructor is not initialized with a predefined map of avro schemas.
+ * This is the case as schemas to read and write are not known beforehand.
+ *
+ * @param <D> the subtype of [[GenericContainer]] handled by this serializer
+ */
+public class GenericAvroSerializer<D extends GenericContainer> extends Serializer<D> {
+
+  // reuses the same datum reader/writer since the same schema will be used many times
+  private final HashMap<Schema, DatumWriter<D>> writerCache = new HashMap<>();
+  private final HashMap<Schema, DatumReader<D>> readerCache = new HashMap<>();
+
+  // fingerprinting is very expensive so this alleviates most of the work
+  private final HashMap<Schema, Long> fingerprintCache = new HashMap<>();
+  private final HashMap<Long, Schema> schemaCache = new HashMap<>();
+
+  private Long getFingerprint(Schema schema) {
+    if (fingerprintCache.containsKey(schema)) {
+      return fingerprintCache.get(schema);
+    } else {
+      Long fingerprint = SchemaNormalization.parsingFingerprint64(schema);
+      fingerprintCache.put(schema, fingerprint);
+      return fingerprint;
+    }
+  }
+
+  private Schema getSchema(Long fingerprint, byte[] schemaBytes) {
+    if (schemaCache.containsKey(fingerprint)) {
+      return schemaCache.get(fingerprint);
+    } else {
+      String schema = new String(schemaBytes, StandardCharsets.UTF_8);
+      Schema parsedSchema = new Schema.Parser().parse(schema);
+      schemaCache.put(fingerprint, parsedSchema);
+      return parsedSchema;
+    }
+  }
+
+  private DatumWriter<D> getDatumWriter(Schema schema) {
+    DatumWriter<D> writer;
+    if (writerCache.containsKey(schema)) {
+      writer = writerCache.get(schema);
+    } else {
+      writer = new GenericDatumWriter<>(schema);
+      writerCache.put(schema, writer);
+    }
+    return writer;
+  }
+
+  private DatumReader<D> getDatumReader(Schema schema) {
+    DatumReader<D> reader;
+    if (readerCache.containsKey(schema)) {
+      reader = readerCache.get(schema);
+    } else {
+      reader = new GenericDatumReader<>(schema);
+      readerCache.put(schema, reader);
+    }
+    return reader;
+  }
+
+  private void serializeDatum(D datum, Output output) throws IOException {
+    Encoder encoder = EncoderFactory.get().directBinaryEncoder(output, null);
+    Schema schema = datum.getSchema();
+    Long fingerprint = this.getFingerprint(schema);
+    byte[] schemaBytes = schema.toString().getBytes(StandardCharsets.UTF_8);

Review Comment:
   > When fingleprint is disabled, the schema is decoded directly from the compressee bytes.
   
   Yeap. In this PR, our `schemaCache` serves the purpose of the `decompressCache`, which is the cache used in the `decompress` function.
   
   If you look at Spark's implementation, the schema is not ALWAYS decoded directly from compressed bytes.
   
   If the compressed bytes have been decoded before, it will be skipped as such:
   
   ```scala
   decompressCache.getOrElseUpdate... 
   ```
   
   Please refer to the full snippet that I have pasted below.
   
   # Our PR
   ```java
    private Schema getSchema(Long fingerprint, byte[] schemaBytes) {
       if (schemaCache.containsKey(fingerprint)) {
         return schemaCache.get(fingerprint);
       } else {
         String schema = new String(schemaBytes, StandardCharsets.UTF_8);
         Schema parsedSchema = new Schema.Parser().parse(schema);
         schemaCache.put(fingerprint, parsedSchema);
         return parsedSchema;
       }
     }
   
     private D deserializeDatum(Input input) throws IOException {
       Long fingerprint = input.readLong();
       int schemaBytesLen = input.readInt();
       byte[] schemaBytes = input.readBytes(schemaBytesLen);
       Schema schema = getSchema(fingerprint, schemaBytes);
       Decoder decoder = DecoderFactory.get().directBinaryDecoder(input, null);
       return getDatumReader(schema).read(null, decoder);
     }
   ```
   
   
   # Spark's implementation
   
   ```scala
     def decompress(schemaBytes: ByteBuffer): Schema = decompressCache.getOrElseUpdate(schemaBytes, {
       val bis = new ByteArrayInputStream(
         schemaBytes.array(),
         schemaBytes.arrayOffset() + schemaBytes.position(),
         schemaBytes.remaining())
       val in = codec.compressedInputStream(bis)
       val bytes = Utils.tryWithSafeFinally {
         IOUtils.toByteArray(in)
       } {
         in.close()
       }
       new Schema.Parser().parse(new String(bytes, StandardCharsets.UTF_8))
     })
   ```
   
   Note: our `schemaCache` != Spark's `schemaCache`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] hudi-bot commented on pull request #8307: [HUDI-5992] Fix (de)serialization for avro versions > 1.10.0

Posted by "hudi-bot (via GitHub)" <gi...@apache.org>.
hudi-bot commented on PR #8307:
URL: https://github.com/apache/hudi/pull/8307#issuecomment-1488420480

   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "bc6d6dc5798e71d00da8e48f641307ae1789b285",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15954",
       "triggerID" : "bc6d6dc5798e71d00da8e48f641307ae1789b285",
       "triggerType" : "PUSH"
     }, {
       "hash" : "8aef9327c27d3f716a7f4c40f9a6d0ea6f370d3e",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15957",
       "triggerID" : "8aef9327c27d3f716a7f4c40f9a6d0ea6f370d3e",
       "triggerType" : "PUSH"
     }, {
       "hash" : "8aef9327c27d3f716a7f4c40f9a6d0ea6f370d3e",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15961",
       "triggerID" : "1486933377",
       "triggerType" : "MANUAL"
     }, {
       "hash" : "d8521565c1a8a4e215f779c525b7d123b44b94b3",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15965",
       "triggerID" : "d8521565c1a8a4e215f779c525b7d123b44b94b3",
       "triggerType" : "PUSH"
     }, {
       "hash" : "d8521565c1a8a4e215f779c525b7d123b44b94b3",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15966",
       "triggerID" : "1488030287",
       "triggerType" : "MANUAL"
     }, {
       "hash" : "14984a0d4f5d2e7c05a55c0db380f68819e4a519",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15969",
       "triggerID" : "14984a0d4f5d2e7c05a55c0db380f68819e4a519",
       "triggerType" : "PUSH"
     }, {
       "hash" : "b657f62b1c05863b4b9fa41ecd444d787309f0f9",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15971",
       "triggerID" : "b657f62b1c05863b4b9fa41ecd444d787309f0f9",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * b657f62b1c05863b4b9fa41ecd444d787309f0f9 Azure: [SUCCESS](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15971) 
   
   <details>
   <summary>Bot commands</summary>
     @hudi-bot supports the following commands:
   
    - `@hudi-bot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] danny0405 commented on a diff in pull request #8307: [HUDI-5992] Fix (de)serialization for avro versions > 1.10.0

Posted by "danny0405 (via GitHub)" <gi...@apache.org>.
danny0405 commented on code in PR #8307:
URL: https://github.com/apache/hudi/pull/8307#discussion_r1151359438


##########
hudi-common/src/main/java/org/apache/hudi/avro/GenericAvroSerializer.java:
##########
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.avro;
+
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.Serializer;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+import org.apache.avro.Schema;
+import org.apache.avro.SchemaNormalization;
+import org.apache.avro.generic.GenericContainer;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.generic.GenericDatumWriter;
+import org.apache.avro.io.DatumReader;
+import org.apache.avro.io.DatumWriter;
+import org.apache.avro.io.Decoder;
+import org.apache.avro.io.DecoderFactory;
+import org.apache.avro.io.Encoder;
+import org.apache.avro.io.EncoderFactory;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.HashMap;
+
+
+/**
+ * Custom serializer used for generic Avro containers.
+ * <p>
+ * Heavily adapted from:
+ * <p>
+ * <a href="https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/serializer/GenericAvroSerializer.scala">GenericAvroSerializer.scala</a>
+ * <p>
+ * As {@link org.apache.hudi.common.util.SerializationUtils} is not shared between threads and does not concern any
+ * shuffling operations, compression and decompression cache is omitted as network IO is not a concern.
+ * <p>
+ * Unlike Spark's implementation, the class and constructor is not initialized with a predefined map of avro schemas.
+ * This is the case as schemas to read and write are not known beforehand.
+ *
+ * @param <D> the subtype of [[GenericContainer]] handled by this serializer
+ */
+public class GenericAvroSerializer<D extends GenericContainer> extends Serializer<D> {
+
+  // reuses the same datum reader/writer since the same schema will be used many times
+  private final HashMap<Schema, DatumWriter<D>> writerCache = new HashMap<>();
+  private final HashMap<Schema, DatumReader<D>> readerCache = new HashMap<>();
+
+  // fingerprinting is very expensive so this alleviates most of the work
+  private final HashMap<Schema, Long> fingerprintCache = new HashMap<>();
+  private final HashMap<Long, Schema> schemaCache = new HashMap<>();
+
+  private Long getFingerprint(Schema schema) {
+    if (fingerprintCache.containsKey(schema)) {
+      return fingerprintCache.get(schema);
+    } else {
+      Long fingerprint = SchemaNormalization.parsingFingerprint64(schema);
+      fingerprintCache.put(schema, fingerprint);
+      return fingerprint;
+    }
+  }
+
+  private Schema getSchema(Long fingerprint, byte[] schemaBytes) {
+    if (schemaCache.containsKey(fingerprint)) {
+      return schemaCache.get(fingerprint);
+    } else {
+      String schema = new String(schemaBytes, StandardCharsets.UTF_8);
+      Schema parsedSchema = new Schema.Parser().parse(schema);
+      schemaCache.put(fingerprint, parsedSchema);
+      return parsedSchema;
+    }
+  }
+
+  private DatumWriter<D> getDatumWriter(Schema schema) {
+    DatumWriter<D> writer;
+    if (writerCache.containsKey(schema)) {
+      writer = writerCache.get(schema);
+    } else {
+      writer = new GenericDatumWriter<>(schema);
+      writerCache.put(schema, writer);
+    }
+    return writer;
+  }
+
+  private DatumReader<D> getDatumReader(Schema schema) {
+    DatumReader<D> reader;
+    if (readerCache.containsKey(schema)) {
+      reader = readerCache.get(schema);
+    } else {
+      reader = new GenericDatumReader<>(schema);
+      readerCache.put(schema, reader);
+    }
+    return reader;
+  }
+
+  private void serializeDatum(D datum, Output output) throws IOException {
+    Encoder encoder = EncoderFactory.get().directBinaryEncoder(output, null);
+    Schema schema = datum.getSchema();
+    Long fingerprint = this.getFingerprint(schema);
+    byte[] schemaBytes = schema.toString().getBytes(StandardCharsets.UTF_8);

Review Comment:
   This is Spark impl for serializing:
   
   ```scala
     def serializeDatum(datum: D, output: KryoOutput): Unit = {
       val encoder = EncoderFactory.get.binaryEncoder(output, null)
       val schema = datum.getSchema
       val fingerprint = fingerprintCache.getOrElseUpdate(schema, {
         SchemaNormalization.parsingFingerprint64(schema)
       })
       schemas.get(fingerprint) match {
         case Some(_) =>
           output.writeBoolean(true)
           output.writeLong(fingerprint)
         case None =>
           output.writeBoolean(false)
           val compressedSchema = compress(schema)
           output.writeInt(compressedSchema.length)
           output.writeBytes(compressedSchema)
       }
   
       writerCache.getOrElseUpdate(schema, GenericData.get.createDatumWriter(schema))
         .asInstanceOf[DatumWriter[D]]
         .write(datum, encoder)
       encoder.flush()
     }
   ```
   
   So what's the point of the fingerprint here if we always serialize the schema bytes?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] voonhous commented on a diff in pull request #8307: [HUDI-5992] Fix (de)serialization for avro versions > 1.10.0

Posted by "voonhous (via GitHub)" <gi...@apache.org>.
voonhous commented on code in PR #8307:
URL: https://github.com/apache/hudi/pull/8307#discussion_r1151360834


##########
hudi-common/src/main/java/org/apache/hudi/avro/GenericAvroSerializer.java:
##########
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.avro;
+
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.Serializer;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+import org.apache.avro.Schema;
+import org.apache.avro.SchemaNormalization;
+import org.apache.avro.generic.GenericContainer;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.generic.GenericDatumWriter;
+import org.apache.avro.io.DatumReader;
+import org.apache.avro.io.DatumWriter;
+import org.apache.avro.io.Decoder;
+import org.apache.avro.io.DecoderFactory;
+import org.apache.avro.io.Encoder;
+import org.apache.avro.io.EncoderFactory;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.HashMap;
+
+
+/**
+ * Custom serializer used for generic Avro containers.
+ * <p>
+ * Heavily adapted from:
+ * <p>
+ * <a href="https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/serializer/GenericAvroSerializer.scala">GenericAvroSerializer.scala</a>
+ * <p>
+ * As {@link org.apache.hudi.common.util.SerializationUtils} is not shared between threads and does not concern any
+ * shuffling operations, compression and decompression cache is omitted as network IO is not a concern.
+ * <p>
+ * Unlike Spark's implementation, the class and constructor is not initialized with a predefined map of avro schemas.
+ * This is the case as schemas to read and write are not known beforehand.
+ *
+ * @param <D> the subtype of [[GenericContainer]] handled by this serializer
+ */
+public class GenericAvroSerializer<D extends GenericContainer> extends Serializer<D> {
+
+  // reuses the same datum reader/writer since the same schema will be used many times
+  private final HashMap<Schema, DatumWriter<D>> writerCache = new HashMap<>();
+  private final HashMap<Schema, DatumReader<D>> readerCache = new HashMap<>();
+
+  // fingerprinting is very expensive so this alleviates most of the work
+  private final HashMap<Schema, Long> fingerprintCache = new HashMap<>();
+  private final HashMap<Long, Schema> schemaCache = new HashMap<>();
+
+  private Long getFingerprint(Schema schema) {
+    if (fingerprintCache.containsKey(schema)) {
+      return fingerprintCache.get(schema);
+    } else {
+      Long fingerprint = SchemaNormalization.parsingFingerprint64(schema);
+      fingerprintCache.put(schema, fingerprint);
+      return fingerprint;
+    }
+  }
+
+  private Schema getSchema(Long fingerprint, byte[] schemaBytes) {
+    if (schemaCache.containsKey(fingerprint)) {
+      return schemaCache.get(fingerprint);
+    } else {
+      String schema = new String(schemaBytes, StandardCharsets.UTF_8);
+      Schema parsedSchema = new Schema.Parser().parse(schema);
+      schemaCache.put(fingerprint, parsedSchema);
+      return parsedSchema;
+    }
+  }
+
+  private DatumWriter<D> getDatumWriter(Schema schema) {
+    DatumWriter<D> writer;
+    if (writerCache.containsKey(schema)) {
+      writer = writerCache.get(schema);
+    } else {
+      writer = new GenericDatumWriter<>(schema);
+      writerCache.put(schema, writer);
+    }
+    return writer;
+  }
+
+  private DatumReader<D> getDatumReader(Schema schema) {
+    DatumReader<D> reader;
+    if (readerCache.containsKey(schema)) {
+      reader = readerCache.get(schema);
+    } else {
+      reader = new GenericDatumReader<>(schema);
+      readerCache.put(schema, reader);
+    }
+    return reader;
+  }
+
+  private void serializeDatum(D datum, Output output) throws IOException {
+    Encoder encoder = EncoderFactory.get().directBinaryEncoder(output, null);
+    Schema schema = datum.getSchema();
+    Long fingerprint = this.getFingerprint(schema);
+    byte[] schemaBytes = schema.toString().getBytes(StandardCharsets.UTF_8);

Review Comment:
   Although the (schema) bytes are ALWAYS serialized, it may or may not be parsed by:
   
   ```java
   new Schema.Parser().parse(schema)
   ```
   
   The fingerprint here is used as a key to check if parsing of the bytes -> Schema is required, if it is not required, it will skip this step.
   
   Caching and updating of cache is done in the methods:
   1. getFingerprint
   2. getSchema
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] hudi-bot commented on pull request #8307: [HUDI-5992] Fix (de)serialization for avro versions > 1.10.0

Posted by "hudi-bot (via GitHub)" <gi...@apache.org>.
hudi-bot commented on PR #8307:
URL: https://github.com/apache/hudi/pull/8307#issuecomment-1487931181

   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "bc6d6dc5798e71d00da8e48f641307ae1789b285",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15954",
       "triggerID" : "bc6d6dc5798e71d00da8e48f641307ae1789b285",
       "triggerType" : "PUSH"
     }, {
       "hash" : "8aef9327c27d3f716a7f4c40f9a6d0ea6f370d3e",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15957",
       "triggerID" : "8aef9327c27d3f716a7f4c40f9a6d0ea6f370d3e",
       "triggerType" : "PUSH"
     }, {
       "hash" : "8aef9327c27d3f716a7f4c40f9a6d0ea6f370d3e",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15961",
       "triggerID" : "1486933377",
       "triggerType" : "MANUAL"
     }, {
       "hash" : "d8521565c1a8a4e215f779c525b7d123b44b94b3",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15965",
       "triggerID" : "d8521565c1a8a4e215f779c525b7d123b44b94b3",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 8aef9327c27d3f716a7f4c40f9a6d0ea6f370d3e Azure: [FAILURE](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15957) Azure: [SUCCESS](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15961) 
   * d8521565c1a8a4e215f779c525b7d123b44b94b3 Azure: [PENDING](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15965) 
   
   <details>
   <summary>Bot commands</summary>
     @hudi-bot supports the following commands:
   
    - `@hudi-bot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] voonhous commented on pull request #8307: [HUDI-5992] Fix (de)serialization for avro versions > 1.10.0

Posted by "voonhous (via GitHub)" <gi...@apache.org>.
voonhous commented on PR #8307:
URL: https://github.com/apache/hudi/pull/8307#issuecomment-1486933377

   @hudi-bot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] voonhous commented on a diff in pull request #8307: [HUDI-5992] Fix (de)serialization for avro versions > 1.10.0

Posted by "voonhous (via GitHub)" <gi...@apache.org>.
voonhous commented on code in PR #8307:
URL: https://github.com/apache/hudi/pull/8307#discussion_r1151363115


##########
hudi-common/src/main/java/org/apache/hudi/avro/GenericAvroSerializer.java:
##########
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.avro;
+
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.Serializer;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+import org.apache.avro.Schema;
+import org.apache.avro.SchemaNormalization;
+import org.apache.avro.generic.GenericContainer;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.generic.GenericDatumWriter;
+import org.apache.avro.io.DatumReader;
+import org.apache.avro.io.DatumWriter;
+import org.apache.avro.io.Decoder;
+import org.apache.avro.io.DecoderFactory;
+import org.apache.avro.io.Encoder;
+import org.apache.avro.io.EncoderFactory;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.HashMap;
+
+
+/**
+ * Custom serializer used for generic Avro containers.
+ * <p>
+ * Heavily adapted from:
+ * <p>
+ * <a href="https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/serializer/GenericAvroSerializer.scala">GenericAvroSerializer.scala</a>
+ * <p>
+ * As {@link org.apache.hudi.common.util.SerializationUtils} is not shared between threads and does not concern any
+ * shuffling operations, compression and decompression cache is omitted as network IO is not a concern.
+ * <p>
+ * Unlike Spark's implementation, the class and constructor is not initialized with a predefined map of avro schemas.
+ * This is the case as schemas to read and write are not known beforehand.
+ *
+ * @param <D> the subtype of [[GenericContainer]] handled by this serializer
+ */
+public class GenericAvroSerializer<D extends GenericContainer> extends Serializer<D> {
+
+  // reuses the same datum reader/writer since the same schema will be used many times
+  private final HashMap<Schema, DatumWriter<D>> writerCache = new HashMap<>();
+  private final HashMap<Schema, DatumReader<D>> readerCache = new HashMap<>();
+
+  // fingerprinting is very expensive so this alleviates most of the work
+  private final HashMap<Schema, Long> fingerprintCache = new HashMap<>();
+  private final HashMap<Long, Schema> schemaCache = new HashMap<>();
+
+  private Long getFingerprint(Schema schema) {
+    if (fingerprintCache.containsKey(schema)) {
+      return fingerprintCache.get(schema);
+    } else {
+      Long fingerprint = SchemaNormalization.parsingFingerprint64(schema);
+      fingerprintCache.put(schema, fingerprint);
+      return fingerprint;
+    }
+  }
+
+  private Schema getSchema(Long fingerprint, byte[] schemaBytes) {
+    if (schemaCache.containsKey(fingerprint)) {
+      return schemaCache.get(fingerprint);
+    } else {
+      String schema = new String(schemaBytes, StandardCharsets.UTF_8);
+      Schema parsedSchema = new Schema.Parser().parse(schema);
+      schemaCache.put(fingerprint, parsedSchema);
+      return parsedSchema;
+    }
+  }
+
+  private DatumWriter<D> getDatumWriter(Schema schema) {
+    DatumWriter<D> writer;
+    if (writerCache.containsKey(schema)) {
+      writer = writerCache.get(schema);
+    } else {
+      writer = new GenericDatumWriter<>(schema);
+      writerCache.put(schema, writer);
+    }
+    return writer;
+  }
+
+  private DatumReader<D> getDatumReader(Schema schema) {
+    DatumReader<D> reader;
+    if (readerCache.containsKey(schema)) {
+      reader = readerCache.get(schema);
+    } else {
+      reader = new GenericDatumReader<>(schema);
+      readerCache.put(schema, reader);
+    }
+    return reader;
+  }
+
+  private void serializeDatum(D datum, Output output) throws IOException {
+    Encoder encoder = EncoderFactory.get().directBinaryEncoder(output, null);
+    Schema schema = datum.getSchema();
+    Long fingerprint = this.getFingerprint(schema);
+    byte[] schemaBytes = schema.toString().getBytes(StandardCharsets.UTF_8);

Review Comment:
   If you refer to the Spark's implementation, `schemaCache` is always a subset of `schemas` (which is passed avro schemas registered by the session when constructing the GenericAvroSerializer).
   
   As such, if Spark's `GenericAvroSerializer` is given a schema that is not in `schemas`, it will always serialize/write the compressed bytes.
   
   For this PR, we are skipping the compress/decompress steps, hence, we do not need the compress/decompress cache. 
   
   `schemaCache` in our PR serves the same purpose as Spark's `decompressCache`, which is to skip over the repeated tasks of decompressing + parsing of bytes back to a schema object.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] voonhous commented on a diff in pull request #8307: [HUDI-5992] Fix (de)serialization for avro versions > 1.10.0

Posted by "voonhous (via GitHub)" <gi...@apache.org>.
voonhous commented on code in PR #8307:
URL: https://github.com/apache/hudi/pull/8307#discussion_r1151365031


##########
hudi-common/src/main/java/org/apache/hudi/avro/GenericAvroSerializer.java:
##########
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.avro;
+
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.Serializer;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+import org.apache.avro.Schema;
+import org.apache.avro.SchemaNormalization;
+import org.apache.avro.generic.GenericContainer;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.generic.GenericDatumWriter;
+import org.apache.avro.io.DatumReader;
+import org.apache.avro.io.DatumWriter;
+import org.apache.avro.io.Decoder;
+import org.apache.avro.io.DecoderFactory;
+import org.apache.avro.io.Encoder;
+import org.apache.avro.io.EncoderFactory;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.HashMap;
+
+
+/**
+ * Custom serializer used for generic Avro containers.
+ * <p>
+ * Heavily adapted from:
+ * <p>
+ * <a href="https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/serializer/GenericAvroSerializer.scala">GenericAvroSerializer.scala</a>
+ * <p>
+ * As {@link org.apache.hudi.common.util.SerializationUtils} is not shared between threads and does not concern any
+ * shuffling operations, compression and decompression cache is omitted as network IO is not a concern.
+ * <p>
+ * Unlike Spark's implementation, the class and constructor is not initialized with a predefined map of avro schemas.
+ * This is the case as schemas to read and write are not known beforehand.
+ *
+ * @param <D> the subtype of [[GenericContainer]] handled by this serializer
+ */
+public class GenericAvroSerializer<D extends GenericContainer> extends Serializer<D> {
+
+  // reuses the same datum reader/writer since the same schema will be used many times
+  private final HashMap<Schema, DatumWriter<D>> writerCache = new HashMap<>();
+  private final HashMap<Schema, DatumReader<D>> readerCache = new HashMap<>();
+
+  // fingerprinting is very expensive so this alleviates most of the work
+  private final HashMap<Schema, Long> fingerprintCache = new HashMap<>();
+  private final HashMap<Long, Schema> schemaCache = new HashMap<>();
+
+  private Long getFingerprint(Schema schema) {
+    if (fingerprintCache.containsKey(schema)) {
+      return fingerprintCache.get(schema);
+    } else {
+      Long fingerprint = SchemaNormalization.parsingFingerprint64(schema);
+      fingerprintCache.put(schema, fingerprint);
+      return fingerprint;
+    }
+  }
+
+  private Schema getSchema(Long fingerprint, byte[] schemaBytes) {
+    if (schemaCache.containsKey(fingerprint)) {
+      return schemaCache.get(fingerprint);
+    } else {
+      String schema = new String(schemaBytes, StandardCharsets.UTF_8);
+      Schema parsedSchema = new Schema.Parser().parse(schema);
+      schemaCache.put(fingerprint, parsedSchema);
+      return parsedSchema;
+    }
+  }
+
+  private DatumWriter<D> getDatumWriter(Schema schema) {
+    DatumWriter<D> writer;
+    if (writerCache.containsKey(schema)) {
+      writer = writerCache.get(schema);
+    } else {
+      writer = new GenericDatumWriter<>(schema);
+      writerCache.put(schema, writer);
+    }
+    return writer;
+  }
+
+  private DatumReader<D> getDatumReader(Schema schema) {
+    DatumReader<D> reader;
+    if (readerCache.containsKey(schema)) {
+      reader = readerCache.get(schema);
+    } else {
+      reader = new GenericDatumReader<>(schema);
+      readerCache.put(schema, reader);
+    }
+    return reader;
+  }
+
+  private void serializeDatum(D datum, Output output) throws IOException {
+    Encoder encoder = EncoderFactory.get().directBinaryEncoder(output, null);
+    Schema schema = datum.getSchema();
+    Long fingerprint = this.getFingerprint(schema);
+    byte[] schemaBytes = schema.toString().getBytes(StandardCharsets.UTF_8);

Review Comment:
   > Then at least there is no need to serialize the fingerprint right? Just deserialize from the schema bytes directly should be fine.
   
   So, `schemaCache` will be keyed by byte[]?
   
   ```java
   private final HashMap<byte[], Schema> schemaCache = new HashMap<>();
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] voonhous commented on a diff in pull request #8307: [HUDI-5992] Fix (de)serialization for avro versions > 1.10.0

Posted by "voonhous (via GitHub)" <gi...@apache.org>.
voonhous commented on code in PR #8307:
URL: https://github.com/apache/hudi/pull/8307#discussion_r1151405975


##########
hudi-common/src/main/java/org/apache/hudi/avro/GenericAvroSerializer.java:
##########
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.avro;
+
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.Serializer;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+import org.apache.avro.Schema;
+import org.apache.avro.SchemaNormalization;
+import org.apache.avro.generic.GenericContainer;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.generic.GenericDatumWriter;
+import org.apache.avro.io.DatumReader;
+import org.apache.avro.io.DatumWriter;
+import org.apache.avro.io.Decoder;
+import org.apache.avro.io.DecoderFactory;
+import org.apache.avro.io.Encoder;
+import org.apache.avro.io.EncoderFactory;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.HashMap;
+
+
+/**
+ * Custom serializer used for generic Avro containers.
+ * <p>
+ * Heavily adapted from:
+ * <p>
+ * <a href="https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/serializer/GenericAvroSerializer.scala">GenericAvroSerializer.scala</a>
+ * <p>
+ * As {@link org.apache.hudi.common.util.SerializationUtils} is not shared between threads and does not concern any
+ * shuffling operations, compression and decompression cache is omitted as network IO is not a concern.
+ * <p>
+ * Unlike Spark's implementation, the class and constructor is not initialized with a predefined map of avro schemas.
+ * This is the case as schemas to read and write are not known beforehand.
+ *
+ * @param <D> the subtype of [[GenericContainer]] handled by this serializer
+ */
+public class GenericAvroSerializer<D extends GenericContainer> extends Serializer<D> {
+
+  // reuses the same datum reader/writer since the same schema will be used many times
+  private final HashMap<Schema, DatumWriter<D>> writerCache = new HashMap<>();
+  private final HashMap<Schema, DatumReader<D>> readerCache = new HashMap<>();
+
+  // fingerprinting is very expensive so this alleviates most of the work
+  private final HashMap<Schema, Long> fingerprintCache = new HashMap<>();
+  private final HashMap<Long, Schema> schemaCache = new HashMap<>();
+
+  private Long getFingerprint(Schema schema) {
+    if (fingerprintCache.containsKey(schema)) {
+      return fingerprintCache.get(schema);
+    } else {
+      Long fingerprint = SchemaNormalization.parsingFingerprint64(schema);
+      fingerprintCache.put(schema, fingerprint);
+      return fingerprint;
+    }
+  }
+
+  private Schema getSchema(Long fingerprint, byte[] schemaBytes) {
+    if (schemaCache.containsKey(fingerprint)) {
+      return schemaCache.get(fingerprint);
+    } else {
+      String schema = new String(schemaBytes, StandardCharsets.UTF_8);
+      Schema parsedSchema = new Schema.Parser().parse(schema);
+      schemaCache.put(fingerprint, parsedSchema);
+      return parsedSchema;
+    }
+  }
+
+  private DatumWriter<D> getDatumWriter(Schema schema) {
+    DatumWriter<D> writer;
+    if (writerCache.containsKey(schema)) {
+      writer = writerCache.get(schema);
+    } else {
+      writer = new GenericDatumWriter<>(schema);
+      writerCache.put(schema, writer);
+    }
+    return writer;
+  }
+
+  private DatumReader<D> getDatumReader(Schema schema) {
+    DatumReader<D> reader;
+    if (readerCache.containsKey(schema)) {
+      reader = readerCache.get(schema);
+    } else {
+      reader = new GenericDatumReader<>(schema);
+      readerCache.put(schema, reader);
+    }
+    return reader;
+  }
+
+  private void serializeDatum(D datum, Output output) throws IOException {
+    Encoder encoder = EncoderFactory.get().directBinaryEncoder(output, null);
+    Schema schema = datum.getSchema();
+    Long fingerprint = this.getFingerprint(schema);
+    byte[] schemaBytes = schema.toString().getBytes(StandardCharsets.UTF_8);

Review Comment:
   > When fingleprint is disabled, the schema is decoded directly from the compressee bytes.
   
   Yeap. In this PR, our `schemaCache` serves the purpose of the `decompressCache`, which is the cache used in the `decompress` function.
   
   If you look at Spark's implementation, the schema is not ALWAYS decoded directly from compressed bytes.
   
   If the compressed bytes have been decoded before, it will be skipped as such:
   
   ```scala
   decompressCache.getOrElseUpdate... 
   ```
   
   Please refer to the full snippet that I have pasted below.
   
   # Our PR
   ```java
    private Schema getSchema(Long fingerprint, byte[] schemaBytes) {
       if (schemaCache.containsKey(fingerprint)) {
         return schemaCache.get(fingerprint);
       } else {
         String schema = new String(schemaBytes, StandardCharsets.UTF_8);
         Schema parsedSchema = new Schema.Parser().parse(schema);
         schemaCache.put(fingerprint, parsedSchema);
         return parsedSchema;
       }
     }
   
     private D deserializeDatum(Input input) throws IOException {
       Long fingerprint = input.readLong();
       int schemaBytesLen = input.readInt();
       byte[] schemaBytes = input.readBytes(schemaBytesLen);
       Schema schema = getSchema(fingerprint, schemaBytes);
       Decoder decoder = DecoderFactory.get().directBinaryDecoder(input, null);
       return getDatumReader(schema).read(null, decoder);
     }
   ```
   
   
   # Spark's implementation
   
   ```scala
     def decompress(schemaBytes: ByteBuffer): Schema = decompressCache.getOrElseUpdate(schemaBytes, {
       val bis = new ByteArrayInputStream(
         schemaBytes.array(),
         schemaBytes.arrayOffset() + schemaBytes.position(),
         schemaBytes.remaining())
       val in = codec.compressedInputStream(bis)
       val bytes = Utils.tryWithSafeFinally {
         IOUtils.toByteArray(in)
       } {
         in.close()
       }
       new Schema.Parser().parse(new String(bytes, StandardCharsets.UTF_8))
     })
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] voonhous commented on a diff in pull request #8307: [HUDI-5992] Fix (de)serialization for avro versions > 1.10.0

Posted by "voonhous (via GitHub)" <gi...@apache.org>.
voonhous commented on code in PR #8307:
URL: https://github.com/apache/hudi/pull/8307#discussion_r1151467972


##########
hudi-common/src/main/java/org/apache/hudi/avro/GenericAvroSerializer.java:
##########
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.avro;
+
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.Serializer;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+import org.apache.avro.Schema;
+import org.apache.avro.SchemaNormalization;
+import org.apache.avro.generic.GenericContainer;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.generic.GenericDatumWriter;
+import org.apache.avro.io.DatumReader;
+import org.apache.avro.io.DatumWriter;
+import org.apache.avro.io.Decoder;
+import org.apache.avro.io.DecoderFactory;
+import org.apache.avro.io.Encoder;
+import org.apache.avro.io.EncoderFactory;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.HashMap;
+
+
+/**
+ * Custom serializer used for generic Avro containers.
+ * <p>
+ * Heavily adapted from:
+ * <p>
+ * <a href="https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/serializer/GenericAvroSerializer.scala">GenericAvroSerializer.scala</a>
+ * <p>
+ * As {@link org.apache.hudi.common.util.SerializationUtils} is not shared between threads and does not concern any
+ * shuffling operations, compression and decompression cache is omitted as network IO is not a concern.
+ * <p>
+ * Unlike Spark's implementation, the class and constructor is not initialized with a predefined map of avro schemas.
+ * This is the case as schemas to read and write are not known beforehand.
+ *
+ * @param <D> the subtype of [[GenericContainer]] handled by this serializer
+ */
+public class GenericAvroSerializer<D extends GenericContainer> extends Serializer<D> {
+
+  // reuses the same datum reader/writer since the same schema will be used many times
+  private final HashMap<Schema, DatumWriter<D>> writerCache = new HashMap<>();
+  private final HashMap<Schema, DatumReader<D>> readerCache = new HashMap<>();
+
+  // fingerprinting is very expensive so this alleviates most of the work
+  private final HashMap<Schema, Long> fingerprintCache = new HashMap<>();
+  private final HashMap<Long, Schema> schemaCache = new HashMap<>();
+
+  private Long getFingerprint(Schema schema) {
+    if (fingerprintCache.containsKey(schema)) {
+      return fingerprintCache.get(schema);
+    } else {
+      Long fingerprint = SchemaNormalization.parsingFingerprint64(schema);
+      fingerprintCache.put(schema, fingerprint);
+      return fingerprint;
+    }
+  }
+
+  private Schema getSchema(Long fingerprint, byte[] schemaBytes) {
+    if (schemaCache.containsKey(fingerprint)) {
+      return schemaCache.get(fingerprint);
+    } else {
+      String schema = new String(schemaBytes, StandardCharsets.UTF_8);
+      Schema parsedSchema = new Schema.Parser().parse(schema);
+      schemaCache.put(fingerprint, parsedSchema);
+      return parsedSchema;
+    }
+  }
+
+  private DatumWriter<D> getDatumWriter(Schema schema) {
+    DatumWriter<D> writer;
+    if (writerCache.containsKey(schema)) {
+      writer = writerCache.get(schema);
+    } else {
+      writer = new GenericDatumWriter<>(schema);
+      writerCache.put(schema, writer);
+    }
+    return writer;
+  }
+
+  private DatumReader<D> getDatumReader(Schema schema) {
+    DatumReader<D> reader;
+    if (readerCache.containsKey(schema)) {
+      reader = readerCache.get(schema);
+    } else {
+      reader = new GenericDatumReader<>(schema);
+      readerCache.put(schema, reader);
+    }
+    return reader;
+  }
+
+  private void serializeDatum(D datum, Output output) throws IOException {
+    Encoder encoder = EncoderFactory.get().directBinaryEncoder(output, null);
+    Schema schema = datum.getSchema();
+    Long fingerprint = this.getFingerprint(schema);
+    byte[] schemaBytes = schema.toString().getBytes(StandardCharsets.UTF_8);

Review Comment:
   In that case, I'll change the cache as such?
   
   ```java
   private final HashMap<Schema, byte[]> fingerprintCache = new HashMap<>();
   private final HashMap<ByteBuffer, Schema> schemaCache = new HashMap<>();
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] voonhous commented on a diff in pull request #8307: [HUDI-5992] Fix (de)serialization for avro versions > 1.10.0

Posted by "voonhous (via GitHub)" <gi...@apache.org>.
voonhous commented on code in PR #8307:
URL: https://github.com/apache/hudi/pull/8307#discussion_r1151502732


##########
hudi-common/src/main/java/org/apache/hudi/avro/GenericAvroSerializer.java:
##########
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.avro;
+
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.Serializer;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+import org.apache.avro.Schema;
+import org.apache.avro.SchemaNormalization;
+import org.apache.avro.generic.GenericContainer;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.generic.GenericDatumWriter;
+import org.apache.avro.io.DatumReader;
+import org.apache.avro.io.DatumWriter;
+import org.apache.avro.io.Decoder;
+import org.apache.avro.io.DecoderFactory;
+import org.apache.avro.io.Encoder;
+import org.apache.avro.io.EncoderFactory;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.HashMap;
+
+
+/**
+ * Custom serializer used for generic Avro containers.
+ * <p>
+ * Heavily adapted from:
+ * <p>
+ * <a href="https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/serializer/GenericAvroSerializer.scala">GenericAvroSerializer.scala</a>
+ * <p>
+ * As {@link org.apache.hudi.common.util.SerializationUtils} is not shared between threads and does not concern any
+ * shuffling operations, compression and decompression cache is omitted as network IO is not a concern.
+ * <p>
+ * Unlike Spark's implementation, the class and constructor is not initialized with a predefined map of avro schemas.
+ * This is the case as schemas to read and write are not known beforehand.
+ *
+ * @param <D> the subtype of [[GenericContainer]] handled by this serializer
+ */
+public class GenericAvroSerializer<D extends GenericContainer> extends Serializer<D> {
+
+  // reuses the same datum reader/writer since the same schema will be used many times
+  private final HashMap<Schema, DatumWriter<D>> writerCache = new HashMap<>();
+  private final HashMap<Schema, DatumReader<D>> readerCache = new HashMap<>();
+
+  // fingerprinting is very expensive so this alleviates most of the work
+  private final HashMap<Schema, Long> fingerprintCache = new HashMap<>();
+  private final HashMap<Long, Schema> schemaCache = new HashMap<>();
+
+  private Long getFingerprint(Schema schema) {
+    if (fingerprintCache.containsKey(schema)) {
+      return fingerprintCache.get(schema);
+    } else {
+      Long fingerprint = SchemaNormalization.parsingFingerprint64(schema);
+      fingerprintCache.put(schema, fingerprint);
+      return fingerprint;
+    }
+  }
+
+  private Schema getSchema(Long fingerprint, byte[] schemaBytes) {
+    if (schemaCache.containsKey(fingerprint)) {
+      return schemaCache.get(fingerprint);
+    } else {
+      String schema = new String(schemaBytes, StandardCharsets.UTF_8);
+      Schema parsedSchema = new Schema.Parser().parse(schema);
+      schemaCache.put(fingerprint, parsedSchema);
+      return parsedSchema;
+    }
+  }
+
+  private DatumWriter<D> getDatumWriter(Schema schema) {
+    DatumWriter<D> writer;
+    if (writerCache.containsKey(schema)) {
+      writer = writerCache.get(schema);
+    } else {
+      writer = new GenericDatumWriter<>(schema);
+      writerCache.put(schema, writer);
+    }
+    return writer;
+  }
+
+  private DatumReader<D> getDatumReader(Schema schema) {
+    DatumReader<D> reader;
+    if (readerCache.containsKey(schema)) {
+      reader = readerCache.get(schema);
+    } else {
+      reader = new GenericDatumReader<>(schema);
+      readerCache.put(schema, reader);
+    }
+    return reader;
+  }
+
+  private void serializeDatum(D datum, Output output) throws IOException {
+    Encoder encoder = EncoderFactory.get().directBinaryEncoder(output, null);
+    Schema schema = datum.getSchema();
+    Long fingerprint = this.getFingerprint(schema);
+    byte[] schemaBytes = schema.toString().getBytes(StandardCharsets.UTF_8);

Review Comment:
   Done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] danny0405 commented on a diff in pull request #8307: [HUDI-5992] Fix (de)serialization for avro versions > 1.10.0

Posted by "danny0405 (via GitHub)" <gi...@apache.org>.
danny0405 commented on code in PR #8307:
URL: https://github.com/apache/hudi/pull/8307#discussion_r1151463202


##########
hudi-common/src/main/java/org/apache/hudi/avro/GenericAvroSerializer.java:
##########
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.avro;
+
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.Serializer;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+import org.apache.avro.Schema;
+import org.apache.avro.SchemaNormalization;
+import org.apache.avro.generic.GenericContainer;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.generic.GenericDatumWriter;
+import org.apache.avro.io.DatumReader;
+import org.apache.avro.io.DatumWriter;
+import org.apache.avro.io.Decoder;
+import org.apache.avro.io.DecoderFactory;
+import org.apache.avro.io.Encoder;
+import org.apache.avro.io.EncoderFactory;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.HashMap;
+
+
+/**
+ * Custom serializer used for generic Avro containers.
+ * <p>
+ * Heavily adapted from:
+ * <p>
+ * <a href="https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/serializer/GenericAvroSerializer.scala">GenericAvroSerializer.scala</a>
+ * <p>
+ * As {@link org.apache.hudi.common.util.SerializationUtils} is not shared between threads and does not concern any
+ * shuffling operations, compression and decompression cache is omitted as network IO is not a concern.
+ * <p>
+ * Unlike Spark's implementation, the class and constructor is not initialized with a predefined map of avro schemas.
+ * This is the case as schemas to read and write are not known beforehand.
+ *
+ * @param <D> the subtype of [[GenericContainer]] handled by this serializer
+ */
+public class GenericAvroSerializer<D extends GenericContainer> extends Serializer<D> {
+
+  // reuses the same datum reader/writer since the same schema will be used many times
+  private final HashMap<Schema, DatumWriter<D>> writerCache = new HashMap<>();
+  private final HashMap<Schema, DatumReader<D>> readerCache = new HashMap<>();
+
+  // fingerprinting is very expensive so this alleviates most of the work
+  private final HashMap<Schema, Long> fingerprintCache = new HashMap<>();
+  private final HashMap<Long, Schema> schemaCache = new HashMap<>();
+
+  private Long getFingerprint(Schema schema) {
+    if (fingerprintCache.containsKey(schema)) {
+      return fingerprintCache.get(schema);
+    } else {
+      Long fingerprint = SchemaNormalization.parsingFingerprint64(schema);
+      fingerprintCache.put(schema, fingerprint);
+      return fingerprint;
+    }
+  }
+
+  private Schema getSchema(Long fingerprint, byte[] schemaBytes) {
+    if (schemaCache.containsKey(fingerprint)) {
+      return schemaCache.get(fingerprint);
+    } else {
+      String schema = new String(schemaBytes, StandardCharsets.UTF_8);
+      Schema parsedSchema = new Schema.Parser().parse(schema);
+      schemaCache.put(fingerprint, parsedSchema);
+      return parsedSchema;
+    }
+  }
+
+  private DatumWriter<D> getDatumWriter(Schema schema) {
+    DatumWriter<D> writer;
+    if (writerCache.containsKey(schema)) {
+      writer = writerCache.get(schema);
+    } else {
+      writer = new GenericDatumWriter<>(schema);
+      writerCache.put(schema, writer);
+    }
+    return writer;
+  }
+
+  private DatumReader<D> getDatumReader(Schema schema) {
+    DatumReader<D> reader;
+    if (readerCache.containsKey(schema)) {
+      reader = readerCache.get(schema);
+    } else {
+      reader = new GenericDatumReader<>(schema);
+      readerCache.put(schema, reader);
+    }
+    return reader;
+  }
+
+  private void serializeDatum(D datum, Output output) throws IOException {
+    Encoder encoder = EncoderFactory.get().directBinaryEncoder(output, null);
+    Schema schema = datum.getSchema();
+    Long fingerprint = this.getFingerprint(schema);
+    byte[] schemaBytes = schema.toString().getBytes(StandardCharsets.UTF_8);

Review Comment:
   You are right, we can use the schema bytes as the cache key. But be caution of the `ByteBuffer` equals impls.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] hudi-bot commented on pull request #8307: [HUDI-5992] Fix (de)serialization for avro versions > 1.10.0

Posted by "hudi-bot (via GitHub)" <gi...@apache.org>.
hudi-bot commented on PR #8307:
URL: https://github.com/apache/hudi/pull/8307#issuecomment-1487927175

   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "bc6d6dc5798e71d00da8e48f641307ae1789b285",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15954",
       "triggerID" : "bc6d6dc5798e71d00da8e48f641307ae1789b285",
       "triggerType" : "PUSH"
     }, {
       "hash" : "8aef9327c27d3f716a7f4c40f9a6d0ea6f370d3e",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15957",
       "triggerID" : "8aef9327c27d3f716a7f4c40f9a6d0ea6f370d3e",
       "triggerType" : "PUSH"
     }, {
       "hash" : "8aef9327c27d3f716a7f4c40f9a6d0ea6f370d3e",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15961",
       "triggerID" : "1486933377",
       "triggerType" : "MANUAL"
     }, {
       "hash" : "d8521565c1a8a4e215f779c525b7d123b44b94b3",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "d8521565c1a8a4e215f779c525b7d123b44b94b3",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 8aef9327c27d3f716a7f4c40f9a6d0ea6f370d3e Azure: [FAILURE](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15957) Azure: [SUCCESS](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15961) 
   * d8521565c1a8a4e215f779c525b7d123b44b94b3 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     @hudi-bot supports the following commands:
   
    - `@hudi-bot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] hudi-bot commented on pull request #8307: [HUDI-5992] Fix (de)serialization for avro versions > 1.10.0

Posted by "hudi-bot (via GitHub)" <gi...@apache.org>.
hudi-bot commented on PR #8307:
URL: https://github.com/apache/hudi/pull/8307#issuecomment-1487335083

   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "bc6d6dc5798e71d00da8e48f641307ae1789b285",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15954",
       "triggerID" : "bc6d6dc5798e71d00da8e48f641307ae1789b285",
       "triggerType" : "PUSH"
     }, {
       "hash" : "8aef9327c27d3f716a7f4c40f9a6d0ea6f370d3e",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15957",
       "triggerID" : "8aef9327c27d3f716a7f4c40f9a6d0ea6f370d3e",
       "triggerType" : "PUSH"
     }, {
       "hash" : "8aef9327c27d3f716a7f4c40f9a6d0ea6f370d3e",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15961",
       "triggerID" : "1486933377",
       "triggerType" : "MANUAL"
     } ]
   }-->
   ## CI report:
   
   * 8aef9327c27d3f716a7f4c40f9a6d0ea6f370d3e Azure: [FAILURE](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15957) Azure: [SUCCESS](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15961) 
   
   <details>
   <summary>Bot commands</summary>
     @hudi-bot supports the following commands:
   
    - `@hudi-bot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] voonhous commented on a diff in pull request #8307: [HUDI-5992] Fix (de)serialization for avro versions > 1.10.0

Posted by "voonhous (via GitHub)" <gi...@apache.org>.
voonhous commented on code in PR #8307:
URL: https://github.com/apache/hudi/pull/8307#discussion_r1151360834


##########
hudi-common/src/main/java/org/apache/hudi/avro/GenericAvroSerializer.java:
##########
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.avro;
+
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.Serializer;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+import org.apache.avro.Schema;
+import org.apache.avro.SchemaNormalization;
+import org.apache.avro.generic.GenericContainer;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.generic.GenericDatumWriter;
+import org.apache.avro.io.DatumReader;
+import org.apache.avro.io.DatumWriter;
+import org.apache.avro.io.Decoder;
+import org.apache.avro.io.DecoderFactory;
+import org.apache.avro.io.Encoder;
+import org.apache.avro.io.EncoderFactory;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.HashMap;
+
+
+/**
+ * Custom serializer used for generic Avro containers.
+ * <p>
+ * Heavily adapted from:
+ * <p>
+ * <a href="https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/serializer/GenericAvroSerializer.scala">GenericAvroSerializer.scala</a>
+ * <p>
+ * As {@link org.apache.hudi.common.util.SerializationUtils} is not shared between threads and does not concern any
+ * shuffling operations, compression and decompression cache is omitted as network IO is not a concern.
+ * <p>
+ * Unlike Spark's implementation, the class and constructor is not initialized with a predefined map of avro schemas.
+ * This is the case as schemas to read and write are not known beforehand.
+ *
+ * @param <D> the subtype of [[GenericContainer]] handled by this serializer
+ */
+public class GenericAvroSerializer<D extends GenericContainer> extends Serializer<D> {
+
+  // reuses the same datum reader/writer since the same schema will be used many times
+  private final HashMap<Schema, DatumWriter<D>> writerCache = new HashMap<>();
+  private final HashMap<Schema, DatumReader<D>> readerCache = new HashMap<>();
+
+  // fingerprinting is very expensive so this alleviates most of the work
+  private final HashMap<Schema, Long> fingerprintCache = new HashMap<>();
+  private final HashMap<Long, Schema> schemaCache = new HashMap<>();
+
+  private Long getFingerprint(Schema schema) {
+    if (fingerprintCache.containsKey(schema)) {
+      return fingerprintCache.get(schema);
+    } else {
+      Long fingerprint = SchemaNormalization.parsingFingerprint64(schema);
+      fingerprintCache.put(schema, fingerprint);
+      return fingerprint;
+    }
+  }
+
+  private Schema getSchema(Long fingerprint, byte[] schemaBytes) {
+    if (schemaCache.containsKey(fingerprint)) {
+      return schemaCache.get(fingerprint);
+    } else {
+      String schema = new String(schemaBytes, StandardCharsets.UTF_8);
+      Schema parsedSchema = new Schema.Parser().parse(schema);
+      schemaCache.put(fingerprint, parsedSchema);
+      return parsedSchema;
+    }
+  }
+
+  private DatumWriter<D> getDatumWriter(Schema schema) {
+    DatumWriter<D> writer;
+    if (writerCache.containsKey(schema)) {
+      writer = writerCache.get(schema);
+    } else {
+      writer = new GenericDatumWriter<>(schema);
+      writerCache.put(schema, writer);
+    }
+    return writer;
+  }
+
+  private DatumReader<D> getDatumReader(Schema schema) {
+    DatumReader<D> reader;
+    if (readerCache.containsKey(schema)) {
+      reader = readerCache.get(schema);
+    } else {
+      reader = new GenericDatumReader<>(schema);
+      readerCache.put(schema, reader);
+    }
+    return reader;
+  }
+
+  private void serializeDatum(D datum, Output output) throws IOException {
+    Encoder encoder = EncoderFactory.get().directBinaryEncoder(output, null);
+    Schema schema = datum.getSchema();
+    Long fingerprint = this.getFingerprint(schema);
+    byte[] schemaBytes = schema.toString().getBytes(StandardCharsets.UTF_8);

Review Comment:
   Although we are ALWAYS serializing the bytes, it may or may not be parsed by:
   
   ```java
   new Schema.Parser().parse(schema)
   ```
   
   The fingerprint here is used as a key to check if parsing of the bytes -> Schema is required, if it is not required, it will skip this step.
   
   Caching and updating of cache is done in the methods:
   1. getFingerprint
   2. getSchema
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] voonhous commented on a diff in pull request #8307: [HUDI-5992] Fix (de)serialization for avro versions > 1.10.0

Posted by "voonhous (via GitHub)" <gi...@apache.org>.
voonhous commented on code in PR #8307:
URL: https://github.com/apache/hudi/pull/8307#discussion_r1151363115


##########
hudi-common/src/main/java/org/apache/hudi/avro/GenericAvroSerializer.java:
##########
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.avro;
+
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.Serializer;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+import org.apache.avro.Schema;
+import org.apache.avro.SchemaNormalization;
+import org.apache.avro.generic.GenericContainer;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.generic.GenericDatumWriter;
+import org.apache.avro.io.DatumReader;
+import org.apache.avro.io.DatumWriter;
+import org.apache.avro.io.Decoder;
+import org.apache.avro.io.DecoderFactory;
+import org.apache.avro.io.Encoder;
+import org.apache.avro.io.EncoderFactory;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.HashMap;
+
+
+/**
+ * Custom serializer used for generic Avro containers.
+ * <p>
+ * Heavily adapted from:
+ * <p>
+ * <a href="https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/serializer/GenericAvroSerializer.scala">GenericAvroSerializer.scala</a>
+ * <p>
+ * As {@link org.apache.hudi.common.util.SerializationUtils} is not shared between threads and does not concern any
+ * shuffling operations, compression and decompression cache is omitted as network IO is not a concern.
+ * <p>
+ * Unlike Spark's implementation, the class and constructor is not initialized with a predefined map of avro schemas.
+ * This is the case as schemas to read and write are not known beforehand.
+ *
+ * @param <D> the subtype of [[GenericContainer]] handled by this serializer
+ */
+public class GenericAvroSerializer<D extends GenericContainer> extends Serializer<D> {
+
+  // reuses the same datum reader/writer since the same schema will be used many times
+  private final HashMap<Schema, DatumWriter<D>> writerCache = new HashMap<>();
+  private final HashMap<Schema, DatumReader<D>> readerCache = new HashMap<>();
+
+  // fingerprinting is very expensive so this alleviates most of the work
+  private final HashMap<Schema, Long> fingerprintCache = new HashMap<>();
+  private final HashMap<Long, Schema> schemaCache = new HashMap<>();
+
+  private Long getFingerprint(Schema schema) {
+    if (fingerprintCache.containsKey(schema)) {
+      return fingerprintCache.get(schema);
+    } else {
+      Long fingerprint = SchemaNormalization.parsingFingerprint64(schema);
+      fingerprintCache.put(schema, fingerprint);
+      return fingerprint;
+    }
+  }
+
+  private Schema getSchema(Long fingerprint, byte[] schemaBytes) {
+    if (schemaCache.containsKey(fingerprint)) {
+      return schemaCache.get(fingerprint);
+    } else {
+      String schema = new String(schemaBytes, StandardCharsets.UTF_8);
+      Schema parsedSchema = new Schema.Parser().parse(schema);
+      schemaCache.put(fingerprint, parsedSchema);
+      return parsedSchema;
+    }
+  }
+
+  private DatumWriter<D> getDatumWriter(Schema schema) {
+    DatumWriter<D> writer;
+    if (writerCache.containsKey(schema)) {
+      writer = writerCache.get(schema);
+    } else {
+      writer = new GenericDatumWriter<>(schema);
+      writerCache.put(schema, writer);
+    }
+    return writer;
+  }
+
+  private DatumReader<D> getDatumReader(Schema schema) {
+    DatumReader<D> reader;
+    if (readerCache.containsKey(schema)) {
+      reader = readerCache.get(schema);
+    } else {
+      reader = new GenericDatumReader<>(schema);
+      readerCache.put(schema, reader);
+    }
+    return reader;
+  }
+
+  private void serializeDatum(D datum, Output output) throws IOException {
+    Encoder encoder = EncoderFactory.get().directBinaryEncoder(output, null);
+    Schema schema = datum.getSchema();
+    Long fingerprint = this.getFingerprint(schema);
+    byte[] schemaBytes = schema.toString().getBytes(StandardCharsets.UTF_8);

Review Comment:
   If you refer to the Spark's implementation, `schemaCache` is always a subset of `schemas` (which is passed avro schemas registered by the session when constructing the GenericAvroSerializer).
   
   As such, if Spark's `GenericAvroSerializer` is given a schema that is not in `schemas`, it will always serialize/write the compressed bytes.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] hudi-bot commented on pull request #8307: [HUDI-5992] Fix (de)serialization for avro versions > 1.10.0

Posted by "hudi-bot (via GitHub)" <gi...@apache.org>.
hudi-bot commented on PR #8307:
URL: https://github.com/apache/hudi/pull/8307#issuecomment-1486820819

   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "bc6d6dc5798e71d00da8e48f641307ae1789b285",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15954",
       "triggerID" : "bc6d6dc5798e71d00da8e48f641307ae1789b285",
       "triggerType" : "PUSH"
     }, {
       "hash" : "8aef9327c27d3f716a7f4c40f9a6d0ea6f370d3e",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15957",
       "triggerID" : "8aef9327c27d3f716a7f4c40f9a6d0ea6f370d3e",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 8aef9327c27d3f716a7f4c40f9a6d0ea6f370d3e Azure: [FAILURE](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15957) 
   
   <details>
   <summary>Bot commands</summary>
     @hudi-bot supports the following commands:
   
    - `@hudi-bot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] hudi-bot commented on pull request #8307: [HUDI-5992] Fix (de)serialization for avro versions > 1.10.0

Posted by "hudi-bot (via GitHub)" <gi...@apache.org>.
hudi-bot commented on PR #8307:
URL: https://github.com/apache/hudi/pull/8307#issuecomment-1486353485

   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "bc6d6dc5798e71d00da8e48f641307ae1789b285",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "bc6d6dc5798e71d00da8e48f641307ae1789b285",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * bc6d6dc5798e71d00da8e48f641307ae1789b285 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     @hudi-bot supports the following commands:
   
    - `@hudi-bot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] voonhous commented on a diff in pull request #8307: [HUDI-5992] Fix (de)serialization for avro versions > 1.10.0

Posted by "voonhous (via GitHub)" <gi...@apache.org>.
voonhous commented on code in PR #8307:
URL: https://github.com/apache/hudi/pull/8307#discussion_r1150240926


##########
hudi-common/src/main/java/org/apache/hudi/avro/GenericAvroSerializer.java:
##########
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.avro;
+
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.Serializer;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+import org.apache.avro.Schema;
+import org.apache.avro.SchemaNormalization;
+import org.apache.avro.generic.GenericContainer;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.generic.GenericDatumWriter;
+import org.apache.avro.io.DatumReader;
+import org.apache.avro.io.DatumWriter;
+import org.apache.avro.io.Decoder;
+import org.apache.avro.io.DecoderFactory;
+import org.apache.avro.io.Encoder;
+import org.apache.avro.io.EncoderFactory;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.HashMap;
+
+
+/**
+ * Custom serializer used for generic Avro containers.
+ * <p>
+ * Heavily adapted from:
+ * <p>
+ * https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/serializer/GenericAvroSerializer.scala
+ * <p>
+ * As {@link org.apache.hudi.common.util.SerializationUtils} is not shared between threads and does not concern any
+ * shuffling operations, compression and decompression cache is omitted.
+ *

Review Comment:
   Done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] hudi-bot commented on pull request #8307: [HUDI-5992] Fix (de)serialization for avro versions > 1.10.0

Posted by "hudi-bot (via GitHub)" <gi...@apache.org>.
hudi-bot commented on PR #8307:
URL: https://github.com/apache/hudi/pull/8307#issuecomment-1486527239

   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "bc6d6dc5798e71d00da8e48f641307ae1789b285",
       "status" : "CANCELED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15954",
       "triggerID" : "bc6d6dc5798e71d00da8e48f641307ae1789b285",
       "triggerType" : "PUSH"
     }, {
       "hash" : "8aef9327c27d3f716a7f4c40f9a6d0ea6f370d3e",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15957",
       "triggerID" : "8aef9327c27d3f716a7f4c40f9a6d0ea6f370d3e",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * bc6d6dc5798e71d00da8e48f641307ae1789b285 Azure: [CANCELED](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15954) 
   * 8aef9327c27d3f716a7f4c40f9a6d0ea6f370d3e Azure: [PENDING](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15957) 
   
   <details>
   <summary>Bot commands</summary>
     @hudi-bot supports the following commands:
   
    - `@hudi-bot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] hudi-bot commented on pull request #8307: [HUDI-5992] Fix (de)serialization for avro versions > 1.10.0

Posted by "hudi-bot (via GitHub)" <gi...@apache.org>.
hudi-bot commented on PR #8307:
URL: https://github.com/apache/hudi/pull/8307#issuecomment-1486363276

   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "bc6d6dc5798e71d00da8e48f641307ae1789b285",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15954",
       "triggerID" : "bc6d6dc5798e71d00da8e48f641307ae1789b285",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * bc6d6dc5798e71d00da8e48f641307ae1789b285 Azure: [PENDING](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15954) 
   
   <details>
   <summary>Bot commands</summary>
     @hudi-bot supports the following commands:
   
    - `@hudi-bot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] voonhous commented on a diff in pull request #8307: [HUDI-5992] Fix (de)serialization for avro versions > 1.10.0

Posted by "voonhous (via GitHub)" <gi...@apache.org>.
voonhous commented on code in PR #8307:
URL: https://github.com/apache/hudi/pull/8307#discussion_r1151368487


##########
hudi-common/src/main/java/org/apache/hudi/avro/GenericAvroSerializer.java:
##########
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.avro;
+
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.Serializer;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+import org.apache.avro.Schema;
+import org.apache.avro.SchemaNormalization;
+import org.apache.avro.generic.GenericContainer;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.generic.GenericDatumWriter;
+import org.apache.avro.io.DatumReader;
+import org.apache.avro.io.DatumWriter;
+import org.apache.avro.io.Decoder;
+import org.apache.avro.io.DecoderFactory;
+import org.apache.avro.io.Encoder;
+import org.apache.avro.io.EncoderFactory;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.HashMap;
+
+
+/**
+ * Custom serializer used for generic Avro containers.
+ * <p>
+ * Heavily adapted from:
+ * <p>
+ * <a href="https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/serializer/GenericAvroSerializer.scala">GenericAvroSerializer.scala</a>
+ * <p>
+ * As {@link org.apache.hudi.common.util.SerializationUtils} is not shared between threads and does not concern any
+ * shuffling operations, compression and decompression cache is omitted as network IO is not a concern.
+ * <p>
+ * Unlike Spark's implementation, the class and constructor is not initialized with a predefined map of avro schemas.
+ * This is the case as schemas to read and write are not known beforehand.
+ *
+ * @param <D> the subtype of [[GenericContainer]] handled by this serializer
+ */
+public class GenericAvroSerializer<D extends GenericContainer> extends Serializer<D> {
+
+  // reuses the same datum reader/writer since the same schema will be used many times
+  private final HashMap<Schema, DatumWriter<D>> writerCache = new HashMap<>();
+  private final HashMap<Schema, DatumReader<D>> readerCache = new HashMap<>();
+
+  // fingerprinting is very expensive so this alleviates most of the work
+  private final HashMap<Schema, Long> fingerprintCache = new HashMap<>();
+  private final HashMap<Long, Schema> schemaCache = new HashMap<>();
+
+  private Long getFingerprint(Schema schema) {
+    if (fingerprintCache.containsKey(schema)) {
+      return fingerprintCache.get(schema);
+    } else {
+      Long fingerprint = SchemaNormalization.parsingFingerprint64(schema);
+      fingerprintCache.put(schema, fingerprint);
+      return fingerprint;
+    }
+  }
+
+  private Schema getSchema(Long fingerprint, byte[] schemaBytes) {
+    if (schemaCache.containsKey(fingerprint)) {
+      return schemaCache.get(fingerprint);
+    } else {
+      String schema = new String(schemaBytes, StandardCharsets.UTF_8);
+      Schema parsedSchema = new Schema.Parser().parse(schema);
+      schemaCache.put(fingerprint, parsedSchema);
+      return parsedSchema;
+    }
+  }
+
+  private DatumWriter<D> getDatumWriter(Schema schema) {
+    DatumWriter<D> writer;
+    if (writerCache.containsKey(schema)) {
+      writer = writerCache.get(schema);
+    } else {
+      writer = new GenericDatumWriter<>(schema);
+      writerCache.put(schema, writer);
+    }
+    return writer;
+  }
+
+  private DatumReader<D> getDatumReader(Schema schema) {
+    DatumReader<D> reader;
+    if (readerCache.containsKey(schema)) {
+      reader = readerCache.get(schema);
+    } else {
+      reader = new GenericDatumReader<>(schema);
+      readerCache.put(schema, reader);
+    }
+    return reader;
+  }
+
+  private void serializeDatum(D datum, Output output) throws IOException {
+    Encoder encoder = EncoderFactory.get().directBinaryEncoder(output, null);
+    Schema schema = datum.getSchema();
+    Long fingerprint = this.getFingerprint(schema);
+    byte[] schemaBytes = schema.toString().getBytes(StandardCharsets.UTF_8);

Review Comment:
   However, compressBytes and decompressBytes are cached right?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] hudi-bot commented on pull request #8307: [HUDI-5992] Fix (de)serialization for avro versions > 1.10.0

Posted by "hudi-bot (via GitHub)" <gi...@apache.org>.
hudi-bot commented on PR #8307:
URL: https://github.com/apache/hudi/pull/8307#issuecomment-1488030487

   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "bc6d6dc5798e71d00da8e48f641307ae1789b285",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15954",
       "triggerID" : "bc6d6dc5798e71d00da8e48f641307ae1789b285",
       "triggerType" : "PUSH"
     }, {
       "hash" : "8aef9327c27d3f716a7f4c40f9a6d0ea6f370d3e",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15957",
       "triggerID" : "8aef9327c27d3f716a7f4c40f9a6d0ea6f370d3e",
       "triggerType" : "PUSH"
     }, {
       "hash" : "8aef9327c27d3f716a7f4c40f9a6d0ea6f370d3e",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15961",
       "triggerID" : "1486933377",
       "triggerType" : "MANUAL"
     }, {
       "hash" : "d8521565c1a8a4e215f779c525b7d123b44b94b3",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15965",
       "triggerID" : "d8521565c1a8a4e215f779c525b7d123b44b94b3",
       "triggerType" : "PUSH"
     }, {
       "hash" : "d8521565c1a8a4e215f779c525b7d123b44b94b3",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15966",
       "triggerID" : "1488030287",
       "triggerType" : "MANUAL"
     } ]
   }-->
   ## CI report:
   
   * d8521565c1a8a4e215f779c525b7d123b44b94b3 Azure: [FAILURE](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15965) Azure: [PENDING](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15966) 
   
   <details>
   <summary>Bot commands</summary>
     @hudi-bot supports the following commands:
   
    - `@hudi-bot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] hudi-bot commented on pull request #8307: [HUDI-5992] Fix (de)serialization for avro versions > 1.10.0

Posted by "hudi-bot (via GitHub)" <gi...@apache.org>.
hudi-bot commented on PR #8307:
URL: https://github.com/apache/hudi/pull/8307#issuecomment-1488088305

   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "bc6d6dc5798e71d00da8e48f641307ae1789b285",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15954",
       "triggerID" : "bc6d6dc5798e71d00da8e48f641307ae1789b285",
       "triggerType" : "PUSH"
     }, {
       "hash" : "8aef9327c27d3f716a7f4c40f9a6d0ea6f370d3e",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15957",
       "triggerID" : "8aef9327c27d3f716a7f4c40f9a6d0ea6f370d3e",
       "triggerType" : "PUSH"
     }, {
       "hash" : "8aef9327c27d3f716a7f4c40f9a6d0ea6f370d3e",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15961",
       "triggerID" : "1486933377",
       "triggerType" : "MANUAL"
     }, {
       "hash" : "d8521565c1a8a4e215f779c525b7d123b44b94b3",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15965",
       "triggerID" : "d8521565c1a8a4e215f779c525b7d123b44b94b3",
       "triggerType" : "PUSH"
     }, {
       "hash" : "d8521565c1a8a4e215f779c525b7d123b44b94b3",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15966",
       "triggerID" : "1488030287",
       "triggerType" : "MANUAL"
     }, {
       "hash" : "14984a0d4f5d2e7c05a55c0db380f68819e4a519",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "14984a0d4f5d2e7c05a55c0db380f68819e4a519",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * d8521565c1a8a4e215f779c525b7d123b44b94b3 Azure: [FAILURE](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15965) Azure: [PENDING](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15966) 
   * 14984a0d4f5d2e7c05a55c0db380f68819e4a519 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     @hudi-bot supports the following commands:
   
    - `@hudi-bot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] hudi-bot commented on pull request #8307: [HUDI-5992] Fix (de)serialization for avro versions > 1.10.0

Posted by "hudi-bot (via GitHub)" <gi...@apache.org>.
hudi-bot commented on PR #8307:
URL: https://github.com/apache/hudi/pull/8307#issuecomment-1488164241

   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "bc6d6dc5798e71d00da8e48f641307ae1789b285",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15954",
       "triggerID" : "bc6d6dc5798e71d00da8e48f641307ae1789b285",
       "triggerType" : "PUSH"
     }, {
       "hash" : "8aef9327c27d3f716a7f4c40f9a6d0ea6f370d3e",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15957",
       "triggerID" : "8aef9327c27d3f716a7f4c40f9a6d0ea6f370d3e",
       "triggerType" : "PUSH"
     }, {
       "hash" : "8aef9327c27d3f716a7f4c40f9a6d0ea6f370d3e",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15961",
       "triggerID" : "1486933377",
       "triggerType" : "MANUAL"
     }, {
       "hash" : "d8521565c1a8a4e215f779c525b7d123b44b94b3",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15965",
       "triggerID" : "d8521565c1a8a4e215f779c525b7d123b44b94b3",
       "triggerType" : "PUSH"
     }, {
       "hash" : "d8521565c1a8a4e215f779c525b7d123b44b94b3",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15966",
       "triggerID" : "1488030287",
       "triggerType" : "MANUAL"
     }, {
       "hash" : "14984a0d4f5d2e7c05a55c0db380f68819e4a519",
       "status" : "CANCELED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15969",
       "triggerID" : "14984a0d4f5d2e7c05a55c0db380f68819e4a519",
       "triggerType" : "PUSH"
     }, {
       "hash" : "b657f62b1c05863b4b9fa41ecd444d787309f0f9",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "b657f62b1c05863b4b9fa41ecd444d787309f0f9",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 14984a0d4f5d2e7c05a55c0db380f68819e4a519 Azure: [CANCELED](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15969) 
   * b657f62b1c05863b4b9fa41ecd444d787309f0f9 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     @hudi-bot supports the following commands:
   
    - `@hudi-bot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] hudi-bot commented on pull request #8307: [HUDI-5992] Fix (de)serialization for avro versions > 1.10.0

Posted by "hudi-bot (via GitHub)" <gi...@apache.org>.
hudi-bot commented on PR #8307:
URL: https://github.com/apache/hudi/pull/8307#issuecomment-1488248791

   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "bc6d6dc5798e71d00da8e48f641307ae1789b285",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15954",
       "triggerID" : "bc6d6dc5798e71d00da8e48f641307ae1789b285",
       "triggerType" : "PUSH"
     }, {
       "hash" : "8aef9327c27d3f716a7f4c40f9a6d0ea6f370d3e",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15957",
       "triggerID" : "8aef9327c27d3f716a7f4c40f9a6d0ea6f370d3e",
       "triggerType" : "PUSH"
     }, {
       "hash" : "8aef9327c27d3f716a7f4c40f9a6d0ea6f370d3e",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15961",
       "triggerID" : "1486933377",
       "triggerType" : "MANUAL"
     }, {
       "hash" : "d8521565c1a8a4e215f779c525b7d123b44b94b3",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15965",
       "triggerID" : "d8521565c1a8a4e215f779c525b7d123b44b94b3",
       "triggerType" : "PUSH"
     }, {
       "hash" : "d8521565c1a8a4e215f779c525b7d123b44b94b3",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15966",
       "triggerID" : "1488030287",
       "triggerType" : "MANUAL"
     }, {
       "hash" : "14984a0d4f5d2e7c05a55c0db380f68819e4a519",
       "status" : "CANCELED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15969",
       "triggerID" : "14984a0d4f5d2e7c05a55c0db380f68819e4a519",
       "triggerType" : "PUSH"
     }, {
       "hash" : "b657f62b1c05863b4b9fa41ecd444d787309f0f9",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15971",
       "triggerID" : "b657f62b1c05863b4b9fa41ecd444d787309f0f9",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 14984a0d4f5d2e7c05a55c0db380f68819e4a519 Azure: [CANCELED](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15969) 
   * b657f62b1c05863b4b9fa41ecd444d787309f0f9 Azure: [PENDING](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15971) 
   
   <details>
   <summary>Bot commands</summary>
     @hudi-bot supports the following commands:
   
    - `@hudi-bot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] voonhous commented on a diff in pull request #8307: [HUDI-5992] Fix (de)serialization for avro versions > 1.10.0

Posted by "voonhous (via GitHub)" <gi...@apache.org>.
voonhous commented on code in PR #8307:
URL: https://github.com/apache/hudi/pull/8307#discussion_r1151699770


##########
hudi-common/src/main/java/org/apache/hudi/avro/GenericAvroSerializer.java:
##########
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.avro;
+
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.Serializer;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+import org.apache.avro.Schema;
+import org.apache.avro.SchemaNormalization;
+import org.apache.avro.generic.GenericContainer;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.generic.GenericDatumWriter;
+import org.apache.avro.io.DatumReader;
+import org.apache.avro.io.DatumWriter;
+import org.apache.avro.io.Decoder;
+import org.apache.avro.io.DecoderFactory;
+import org.apache.avro.io.Encoder;
+import org.apache.avro.io.EncoderFactory;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.HashMap;
+
+
+/**
+ * Custom serializer used for generic Avro containers.
+ * <p>
+ * Heavily adapted from:
+ * <p>
+ * <a href="https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/serializer/GenericAvroSerializer.scala">GenericAvroSerializer.scala</a>
+ * <p>
+ * As {@link org.apache.hudi.common.util.SerializationUtils} is not shared between threads and does not concern any
+ * shuffling operations, compression and decompression cache is omitted as network IO is not a concern.
+ * <p>
+ * Unlike Spark's implementation, the class and constructor is not initialized with a predefined map of avro schemas.
+ * This is the case as schemas to read and write are not known beforehand.
+ *
+ * @param <D> the subtype of [[GenericContainer]] handled by this serializer
+ */
+public class GenericAvroSerializer<D extends GenericContainer> extends Serializer<D> {
+
+  // reuses the same datum reader/writer since the same schema will be used many times
+  private final HashMap<Schema, DatumWriter<D>> writerCache = new HashMap<>();
+  private final HashMap<Schema, DatumReader<D>> readerCache = new HashMap<>();
+
+  // fingerprinting is very expensive so this alleviates most of the work
+  private final HashMap<Schema, Long> fingerprintCache = new HashMap<>();
+  private final HashMap<Long, Schema> schemaCache = new HashMap<>();
+
+  private Long getFingerprint(Schema schema) {
+    if (fingerprintCache.containsKey(schema)) {
+      return fingerprintCache.get(schema);
+    } else {
+      Long fingerprint = SchemaNormalization.parsingFingerprint64(schema);
+      fingerprintCache.put(schema, fingerprint);
+      return fingerprint;
+    }
+  }
+
+  private Schema getSchema(Long fingerprint, byte[] schemaBytes) {
+    if (schemaCache.containsKey(fingerprint)) {
+      return schemaCache.get(fingerprint);
+    } else {
+      String schema = new String(schemaBytes, StandardCharsets.UTF_8);
+      Schema parsedSchema = new Schema.Parser().parse(schema);
+      schemaCache.put(fingerprint, parsedSchema);
+      return parsedSchema;
+    }
+  }
+
+  private DatumWriter<D> getDatumWriter(Schema schema) {
+    DatumWriter<D> writer;
+    if (writerCache.containsKey(schema)) {
+      writer = writerCache.get(schema);
+    } else {
+      writer = new GenericDatumWriter<>(schema);
+      writerCache.put(schema, writer);
+    }
+    return writer;
+  }
+
+  private DatumReader<D> getDatumReader(Schema schema) {
+    DatumReader<D> reader;
+    if (readerCache.containsKey(schema)) {
+      reader = readerCache.get(schema);
+    } else {
+      reader = new GenericDatumReader<>(schema);
+      readerCache.put(schema, reader);
+    }
+    return reader;
+  }
+
+  private void serializeDatum(D datum, Output output) throws IOException {
+    Encoder encoder = EncoderFactory.get().directBinaryEncoder(output, null);
+    Schema schema = datum.getSchema();
+    Long fingerprint = this.getFingerprint(schema);
+    byte[] schemaBytes = schema.toString().getBytes(StandardCharsets.UTF_8);

Review Comment:
   Also, something to consider, IIUC, serializing the the (Long) fingerprints allows for faster deserialization reading. 
   
   ```java
   @Test
   public void testRocksDbDiskMapPutDecimal() throws IOException {
     RocksDbDiskMap<String, HoodieRecord> rocksDbBasedMap = new RocksDbDiskMap<>(basePath);
     HoodieRecord avroRecord = createAvroRecordWithDecimalOrderingField();
     rocksDbBasedMap.put(avroRecord.getRecordKey(), avroRecord);
     assertDoesNotThrow(() -> rocksDbBasedMap.get(avroRecord.getRecordKey()));
     
     long start;
     for (int h = 0; h < 5; h++) {
       start = System.currentTimeMillis();
       for (int i = 0; i < 5999999; i++) {
         rocksDbBasedMap.get(avroRecord.getRecordKey());
       }
       System.out.println(System.currentTimeMillis() - start);
     }
   }
   ```
   
   ```
   Using HashMap<ByteBuffer, Schema> cache
   27746
   23477
   20766
   20541
   20247
   
   Using HashMap<Long, Schema> cache
   26384
   20606
   17815
   16544
   16070
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org