You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by GitBox <gi...@apache.org> on 2022/03/08 07:20:51 UTC

[GitHub] [hudi] boneanxs commented on pull request #4954: [HUDI-3561] Avoid including whole `MultipleSparkJobExecutionStrategy` object into the closure for Spark to serialize

boneanxs commented on pull request #4954:
URL: https://github.com/apache/hudi/pull/4954#issuecomment-1061481475


   Hi guys, I also met this exception when enable async clustering in a HoodieSparkStreaming job, not the same as the stacktrace this issue hit, following is the stacktrace I met,
   
   ```java
    ERROR AsyncClusteringService: Clustering executor failed java.util.concurrent.CompletionException: org.apache.spark.SparkException: Task not serializable 
   at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:273) 
   at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:280) 
   at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1606) 
   at java.util.concurrent.CompletableFuture$AsyncSupply.exec(CompletableFuture.java:1596) 
   at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289) 
   at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056) 
   at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692) 
   at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157)
   Caused by: org.apache.spark.SparkException: Task not serializable 
   at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:416) 
   at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:406) 
   at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:162) 
   at org.apache.spark.SparkContext.clean(SparkContext.scala:2467) 
   at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndex$1(RDD.scala:912) 
   at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) 
   at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112) 
   at org.apache.spark.rdd.RDD.withScope(RDD.scala:414) 
   at org.apache.spark.rdd.RDD.mapPartitionsWithIndex(RDD.scala:911) 
   at org.apache.spark.api.java.JavaRDDLike.mapPartitionsWithIndex(JavaRDDLike.scala:103) 
   at org.apache.spark.api.java.JavaRDDLike.mapPartitionsWithIndex$(JavaRDDLike.scala:99) 
   at org.apache.spark.api.java.AbstractJavaRDDLike.mapPartitionsWithIndex(JavaRDDLike.scala:45) 
   at org.apache.hudi.table.action.commit.SparkBulkInsertHelper.bulkInsert(SparkBulkInsertHelper.java:115) 
   at org.apache.hudi.client.clustering.run.strategy.SparkSortAndSizeExecutionStrategy.performClusteringWithRecordsRDD(SparkSortAndSizeExecutionStrategy.java:68) 
   at org.apache.hudi.client.clustering.run.strategy.MultipleSparkJobExecutionStrategy.lambda$runClusteringForGroupAsync$4(MultipleSparkJobExecutionStrategy.java:175) 
   at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1604) ... 5 more
   
   Caused by: java.util.ConcurrentModificationException 
   at java.util.LinkedHashMap$LinkedHashIterator.nextNode(LinkedHashMap.java:719) 
   at java.util.LinkedHashMap$LinkedKeyIterator.next(LinkedHashMap.java:742) 
   at java.util.HashSet.writeObject(HashSet.java:287) 
   at sun.reflect.GeneratedMethodAccessor54.invoke(Unknown Source) 
   at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 
   at java.lang.reflect.Method.invoke(Method.java:498) 
   at java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1140) 
   at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496) 
   at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) 
   at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) 
   at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) 
   at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) 
   at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) 
   at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) 
   at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) 
   at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) 
   at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) 
   at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) 
   at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) 
   at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) 
   at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) 
   at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) 
   at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) 
   at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) 
   at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) 
   at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) 
   at java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1378) 
   at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1174) 
   at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) 
   at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) 
   at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) 
   at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) 
   at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348) 
   at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:44) 
   at org.apache.spark.serializer.JavaSerializerInstance
   ```
   
   From my perspective, it might be `TypedProperties#keys` is not thread safe, and another thread is trying to change this HashSet(like `put` or `putall` from TypedProperties) while spark is trying to iterate it to serialize it at the same time. `TypedProperties` could be used by HoodieTable's config(HoodieWriteConfig), so this pr could fix it by avoiding HoodieTable to be serialized.
   
   But when I'm trying to solve it with the same way as this pr used, Unfortunately found there could be a lot changes to avoid serializing HoodieTable (Change construction methods of `BulkInsertMapFunction`, `SparkLazyInsertIterable`, `HoodieLazyInsertIterable`, and many kinds of `WriteHandler`), I'm afraid this could be a huge change.
   
   Another solution is to make `TypedProperties` thread-safe, there are two ways to make `TypedProperties` thread-safe
   1. Only change keys to be `Collections.newSetFromMap(new ConcurrentHashMap<>())`, this could avoid `ConcurrentModificationException`, but `TypedProperties` is not really thread-safe, as modify attribute `keys` and save key-value pair is divided into two steps, for example,
   
   ```java
   // Synchronized is not work actually, because get methods are not synchronized
     public synchronized Object put(Object key, Object value) {
       keys.remove(key);
       keys.add(key);
      // This could cause key is added in keys, but its value is not saved by TypedProperties
       return super.put(key, value);
     }
   ```
   2. Not let `TypedProperties` to extend `Properties`, use an internal `ConcurrentHashMap` to save key and values, this could make `TypedProperties` to be real thread-safe.
   
   ```java
   public class TypedProperties implements Serializable {
   
     private final ConcurrentHashMap<Object, Object> props = new ConcurrentHashMap<Object, Object>();
   
     public TypedProperties() {
   
     }
   
     public TypedProperties(Properties defaults) {
       if (Objects.nonNull(defaults)) {
         for (String key : defaults.stringPropertyNames()) {
           put(key, defaults.getProperty(key));
         }
       }
     }
   
     public Enumeration<Object> keys() {
       return Collections.enumeration(props.keySet());
     }
   ...
   ```
   
   Do you guys have any other suggestions? Thanks~


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org