You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by "Hui An (Jira)" <ji...@apache.org> on 2022/03/09 11:34:00 UTC

[jira] [Commented] (HUDI-3593) AsyncClustering failed because of ConcurrentModificationException

    [ https://issues.apache.org/jira/browse/HUDI-3593?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17503517#comment-17503517 ] 

Hui An commented on HUDI-3593:
------------------------------

From my perspective, it might be TypedProperties#keys is not thread safe, and another thread is trying to change this HashSet(like put or putall from TypedProperties) while spark is trying to iterate it to serialize it at the same time. TypedProperties could be used by HoodieTable's config(HoodieWriteConfig), so this pr could fix it by avoiding HoodieTable to be serialized.

But when I'm trying to solve it with the same way as this pr used([HUDI-3561|https://github.com/apache/hudi/pull/4954]), Unfortunately found there could be a lot changes to avoid serializing HoodieTable (Change construction methods of BulkInsertMapFunction, SparkLazyInsertIterable, HoodieLazyInsertIterable, and many kinds of WriteHandler), I'm afraid this could be a huge change.

Another solution is to make TypedProperties thread-safe, there are two ways to make TypedProperties thread-safe

1. Only change keys to be Collections.newSetFromMap(new ConcurrentHashMap<>()), this could avoid ConcurrentModificationException, but TypedProperties is not really thread-safe, as modify attribute keys and save key-value pair is divided into two steps, for example,
{code:java}
// Synchronized is not work actually, because get methods are not synchronized
  public synchronized Object put(Object key, Object value) {
    keys.remove(key);
    keys.add(key);
   // This could cause key is added in keys, but its value is not saved by TypedProperties
    return super.put(key, value);
  }
{code}

2. Not let TypedProperties to extend Properties, use an internal ConcurrentHashMap to save key and values, this could make TypedProperties to be real thread-safe.
{code:java}
public class TypedProperties implements Serializable {

  private final ConcurrentHashMap<Object, Object> props = new ConcurrentHashMap<Object, Object>();

  public TypedProperties() {

  }

  public TypedProperties(Properties defaults) {
    if (Objects.nonNull(defaults)) {
      for (String key : defaults.stringPropertyNames()) {
        put(key, defaults.getProperty(key));
      }
    }
  }

  public Enumeration<Object> keys() {
    return Collections.enumeration(props.keySet());
  }
...
{code}

> AsyncClustering failed because of ConcurrentModificationException
> -----------------------------------------------------------------
>
>                 Key: HUDI-3593
>                 URL: https://issues.apache.org/jira/browse/HUDI-3593
>             Project: Apache Hudi
>          Issue Type: Bug
>            Reporter: Hui An
>            Assignee: Hui An
>            Priority: Major
>
> Following is the stacktrace I met,
> {code:java}
>  ERROR AsyncClusteringService: Clustering executor failed java.util.concurrent.CompletionException: org.apache.spark.SparkException: Task not serializable 
> at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:273) 
> at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:280) 
> at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1606) 
> at java.util.concurrent.CompletableFuture$AsyncSupply.exec(CompletableFuture.java:1596) 
> at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289) 
> at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056) 
> at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692) 
> at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157)
> Caused by: org.apache.spark.SparkException: Task not serializable 
> at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:416) 
> at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:406) 
> at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:162) 
> at org.apache.spark.SparkContext.clean(SparkContext.scala:2467) 
> at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndex$1(RDD.scala:912) 
> at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) 
> at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112) 
> at org.apache.spark.rdd.RDD.withScope(RDD.scala:414) 
> at org.apache.spark.rdd.RDD.mapPartitionsWithIndex(RDD.scala:911) 
> at org.apache.spark.api.java.JavaRDDLike.mapPartitionsWithIndex(JavaRDDLike.scala:103) 
> at org.apache.spark.api.java.JavaRDDLike.mapPartitionsWithIndex$(JavaRDDLike.scala:99) 
> at org.apache.spark.api.java.AbstractJavaRDDLike.mapPartitionsWithIndex(JavaRDDLike.scala:45) 
> at org.apache.hudi.table.action.commit.SparkBulkInsertHelper.bulkInsert(SparkBulkInsertHelper.java:115) 
> at org.apache.hudi.client.clustering.run.strategy.SparkSortAndSizeExecutionStrategy.performClusteringWithRecordsRDD(SparkSortAndSizeExecutionStrategy.java:68) 
> at org.apache.hudi.client.clustering.run.strategy.MultipleSparkJobExecutionStrategy.lambda$runClusteringForGroupAsync$4(MultipleSparkJobExecutionStrategy.java:175) 
> at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1604) ... 5 more
> Caused by: java.util.ConcurrentModificationException 
> at java.util.LinkedHashMap$LinkedHashIterator.nextNode(LinkedHashMap.java:719) 
> at java.util.LinkedHashMap$LinkedKeyIterator.next(LinkedHashMap.java:742) 
> at java.util.HashSet.writeObject(HashSet.java:287) 
> at sun.reflect.GeneratedMethodAccessor54.invoke(Unknown Source) 
> at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 
> at java.lang.reflect.Method.invoke(Method.java:498) 
> at java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1140) 
> at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496) 
> at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) 
> at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) 
> at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) 
> at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) 
> at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) 
> at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) 
> at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) 
> at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) 
> at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) 
> at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) 
> at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) 
> at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) 
> at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) 
> at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) 
> at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) 
> at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) 
> at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) 
> at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) 
> at java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1378) 
> at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1174) 
> at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) 
> at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) 
> at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) 
> at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) 
> at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348) 
> at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:44) 
> at org.apache.spark.serializer.JavaSerializerInstance
> {code}



--
This message was sent by Atlassian Jira
(v8.20.1#820001)