You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by "Lee Huynh (Jira)" <ji...@apache.org> on 2020/10/21 17:17:00 UTC

[jira] [Commented] (HUDI-1205) Serialization fail when log file is larger than 2GB

    [ https://issues.apache.org/jira/browse/HUDI-1205?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17218427#comment-17218427 ] 

Lee Huynh commented on HUDI-1205:
---------------------------------

ASSUMPTION: The following comment assumes ValueMetaData.sizeOfValue overflowing is the actual cause of the issue.

The [Github issue #1|https://github.com/apache/hudi/issues/1890] points to [ValueMetaData.sizeOfValue|https://github.com/apache/hudi/blob/master/hudi-common/src/main/java/org/apache/hudi/common/util/collection/DiskBasedMap.java#L353] as the field that is overflowing.  ValueMetaData.sizeOfValue is only set in the constructor of ValueMetaData.  The constructor is called only in [DiskBasedMap.put() method|https://github.com/apache/hudi/blob/master/hudi-common/src/main/java/org/apache/hudi/common/util/collection/DiskBasedMap.java#L229], copied below (comment about valueSize is mine):

 
{code:java}
  private synchronized R put(T key, R value, boolean flush) {
    try {
      byte[] val = SerializationUtils.serialize(value);
      Integer valueSize = val.length;
      Long timestamp = System.currentTimeMillis();
      this.valueMetadataMap.put(key,
          new DiskBasedMap.ValueMetadata(this.filePath, valueSize, filePosition.get(), timestamp));   // ValueMetaData.sizeOfValue is set to valueSize
      byte[] serializedKey = SerializationUtils.serialize(key);
      filePosition
          .set(SpillableMapUtils.spillToDisk(writeOnlyFileHandle, new FileEntry(SpillableMapUtils.generateChecksum(val),
              serializedKey.length, valueSize, serializedKey, val, timestamp)));
      if (flush) {
        flushToDisk();
      }
    } catch (IOException io) {
      throw new HoodieIOException("Unable to store data in Disk Based map", io);
    }
    return value;
  }
{code}
We see that valueSize is the length of the byte array that is serialized.  Cursory web searches show that the maximum array length in Java is 2^31 elements.  Therefore, simply changing ValueMetaData.sizeOfValue to Long will not fix this issue.  Also, changing ValueMetaData.sizeOfValue to unsigned won't work either.

Furthermore, the [FileEntry.readInternal()|https://github.com/apache/hudi/blob/master/hudi-common/src/main/java/org/apache/hudi/common/util/SpillableMapUtils.java#L52] method receives the value of ValueMetaData.sizeOfValue for its valueLength parameter and writes out 4 bytes.  If we change ValueMetaData.sizeOfValue to a Long, then it cannot be written out to the file without breaking backwards compatibility.  Changing it to unsigned will work in this particular point, but the previous point about the max array length still stands.

Somehow, the value serialized must be limited to 2 GB, or the way SerializationUtils.serialize() returns its contents must be changed.

 

> Serialization fail when log file is larger than 2GB
> ---------------------------------------------------
>
>                 Key: HUDI-1205
>                 URL: https://issues.apache.org/jira/browse/HUDI-1205
>             Project: Apache Hudi
>          Issue Type: Bug
>            Reporter: Yanjia Gary Li
>            Priority: Major
>
> When scanning the log file, if the log file(or log file group) is larger than 2GB, serialization will fail because Hudi uses Integer to store size in byte for the log file. The maximum integer representing bytes is 2GB.
> Caused by: com.esotericsoftware.kryo.KryoException: Unable to find class: org.apache.hudi.common.model.OverwriteWithLatestAvroPayload$$Lambda$45/62103784
> Serialization trace:
> orderingVal (org.apache.hudi.common.model.OverwriteWithLatestAvroPayload)
> data (org.apache.hudi.common.model.HoodieRecord)
> at com.esotericsoftware.kryo.util.DefaultClassResolver.readName(DefaultClassResolver.java:160)
> at com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:133)
> at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:693)
> at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:118)
> at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:543)
> at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:731)
> at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125)
> at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:543)
> at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:813)
> at org.apache.hudi.common.util.SerializationUtils$KryoSerializerInstance.deserialize(SerializationUtils.java:107)
> at org.apache.hudi.common.util.SerializationUtils.deserialize(SerializationUtils.java:81)
> at org.apache.hudi.common.util.collection.DiskBasedMap.get(DiskBasedMap.java:217)
> at org.apache.hudi.common.util.collection.DiskBasedMap.get(DiskBasedMap.java:211)
> at org.apache.hudi.common.util.collection.DiskBasedMap.get(DiskBasedMap.java:207)
> at org.apache.hudi.common.util.collection.ExternalSpillableMap.get(ExternalSpillableMap.java:168)
> at org.apache.hudi.common.util.collection.ExternalSpillableMap.get(ExternalSpillableMap.java:55)
> at org.apache.hudi.HoodieMergeOnReadRDD$$anon$1.hasNext(HoodieMergeOnReadRDD.scala:128)
> at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.agg_doAggregateWithoutKey_0$(Unknown Source)
> at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
> at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
> at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$11$$anon$1.hasNext(WholeStageCodegenExec.scala:624)
> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
> at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125)
> at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
> at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55)
> at org.apache.spark.scheduler.Task.run(Task.scala:121)
> at org.apache.spark.executor.Executor$TaskRunner$$anonfun$11.apply(Executor.scala:407)
> at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1408)
> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:413)
> at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: java.lang.ClassNotFoundException: org.apache.hudi.common.model.OverwriteWithLatestAvroPayload$$Lambda$45/62103784
> at java.lang.Class.forName0(Native Method)
> at java.lang.Class.forName(Class.java:348)
> at com.esotericsoftware.kryo.util.DefaultClassResolver.readName(DefaultClassResolver.java:154)
> ... 31 more



--
This message was sent by Atlassian Jira
(v8.3.4#803005)