You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by GitBox <gi...@apache.org> on 2020/10/05 17:29:14 UTC

[GitHub] [hudi] bradleyhurley opened a new issue #2146: Bulk Insert - java.io.NotSerializableException: org.apache.hudi.common.util.RocksDBDAO

bradleyhurley opened a new issue #2146:
URL: https://github.com/apache/hudi/issues/2146


   **Describe the problem you faced**
   
   When attempting to run the DeltaStreamer in BULK_INSERT mode we are experiencing a ` java.io.NotSerializableException: org.apache.hudi.common.util.RocksDBDAO` exception.
   
   **Spark Submit**
   ```
   spark-submit
   --class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer
   --jars s3://data-lake-adp-qa/hudi/libraries/hudi-ext-1.0-SNAPSHOT.jar
   --master yarn
   --deploy-mode client
   --num-executors 200
   --executor-cores 3
   --executor-memory 20G
   --driver-memory 6g
   --op BULK_INSERT
   --filter-dupes
   --conf "spark.driver.extraJavaOptions=-XX:+UseG1GC -XX:+PrintGCDetails -XX:+PrintGCTimeStamps -XX:InitiatingHeapOccupancyPercent=35"
   --conf "spark.executor.extraJavaOptions=-XX:+UseG1GC -XX:+PrintGCDetails -XX:+PrintGCTimeStamps -XX:InitiatingHeapOccupancyPercent=35"
   --conf spark.yarn.executor.memoryOverhead=5G
   --conf spark.task.maxFailures=10
   --conf spark.memory.fraction=0.4
   --conf spark.rdd.compress=true
   --conf spark.kryoserializer.buffer.max=200m
   --conf spark.serializer=org.apache.spark.serializer.KryoSerializer
   --conf spark.dynamicAllocation.enabled=True
   --conf spark.reducer.maxReqsInFlight=1
   --conf spark.shuffle.io.retryWait=60s
   --conf spark.shuffle.io.maxRetries=10
   --conf spark.port.maxRetries=100 
   /usr/lib/hudi/hudi-utilities-bundle.jar
   --table-type COPY_ON_WRITE
   --source-class org.apache.hudi.utilities.sources.AvroKafkaSource
   --source-ordering-field {{ ordering_field }}
   --target-base-path {{ s3_path }}
   --target-table {{ table_name }}
   --props s3://data-lake-adp-qa//hudi/config/hudi.properties
   --schemaprovider-class org.apache.hudi.utilities.schema.SchemaRegistryProvider
   --hoodie-conf hoodie.embed.timeline.server=true
   --hoodie-conf hoodie.filesystem.view.type=EMBEDDED_KV_STORE
   --hoodie-conf hoodie.compact.inline=false
   --hoodie-conf hoodie.datasource.write.recordkey.field={{ record_key }}
   --hoodie-conf hoodie.datasource.write.partitionpath.field={{ partition_path }}
   --hoodie-conf hoodie.deltastreamer.schemaprovider.registry.url= {{ table_schema_url }}
   --hoodie-conf hoodie.deltastreamer.schemaprovider.registry.targetUrl={{ table_schema_url }}
   --hoodie-conf schema.registry.url={{ schema_registry_url }}
   --hoodie-conf hoodie.deltastreamer.source.kafka.topic= {{ kafka_topic }}
   --hoodie-conf group.id={{ group_id }}
   --hoodie-conf enable.auto.commit=false
   --hoodie-conf bootstrap.servers={{ brokers }}
   --hoodie-conf auto.offset.reset=earliest
   --hoodie-conf hoodie.consistency.check.enabled=true
   --hoodie-conf security.protocol=SSL
   --hoodie-conf ssl.keystore.location=/usr/lib/jvm/jre/lib/security/cacerts
   --hoodie-conf ssl.keystore.password={{ cert password }}
   --hoodie-conf hoodie.deltastreamer.kafka.source.maxEvents=10000000
   --hoodie-conf hoodie.insert.shuffle.parallelism=600
   --hoodie-conf hoodie.upsert.shuffle.parallelism=600
   --enable-hive-sync
   --hoodie-conf hoodie.datasource.hive_sync.table={{ table_name }}
   --hoodie-conf hoodie.datasource.hive_sync.database={{ database_name }}
   --hoodie-conf hoodie.datasource.hive_sync.partition_extractor_class=org.apache.hudi.hive.NonPartitionedExtractor
   --hoodie-conf hoodie.bulkinsert.shuffle.parallelism=1205
   --hoodie-conf hoodie.datasource.hive_sync.partition_extractor_class=org.apache.hudi.hive.MultiPartKeysValueExtractor
   --hoodie-conf hoodie.datasource.write.keygenerator.class=org.apache.hudi.keygen.SimpleKeyGenDecimalSupport
   --hoodie-conf hoodie.datasource.hive_sync.partition_fields={{ partition_field }}
   ```
   
   
   
   **To Reproduce**
   
   Steps to reproduce the behavior:
   
   1. Use the provided configuration to read data out of Kafka with the op flag set to `BULK_INSERT`
   
   **Expected behavior**
   
   I would expect that the DeltaStreamer job would complete successfully, and the table would be registered with the Glue catalog and queryable in Athena.
   
   **Environment Description**
   
   AWS EMR 5.30. Spark Jobs are submitted via the EMR Step API.
   
   Hudi version : 0.5.2-inc
   
   Spark version : 2.4.5
   
   Hive version : 2.3.6
   
   Hadoop version : 2.8.5
   
   Storage (HDFS/S3/GCS..) : S3
   
   Running on Docker? (yes/no) : No
   
   
   **Additional context**
   
   When running the same configuration, but without providing the `op` flag set to `BULK_INSERT` allowing the default behavior of `UPSERT` run the data is processed without issue.
   
   The `SimpleKeyGenDecimalSupport` is a very slightly modified version of the `SimpleKeyGen` class enhanced to support using decimal values for the record key and partition path values.
   
   I am still able to manually register the table by running the `/usr/lib/hudi/bin/run_sync_tool.sh`tool.
   
   ```
   /usr/lib/hudi/bin/run_sync_tool.sh  --jdbc-url jdbc:hive2://localhost:10000 \
     --user hive \
     --pass {{ hive_password }} \
     --base-path  {{ s3_path}}  \
     --database {{ database_name }}  \
     --table {{ table_name }} \
     --partitioned-by {{ partition_path }} \
     --partition-value-extractor org.apache.hudi.hive.MultiPartKeysValueExtractor
   ```
   
   **Stacktrace**
   
   ```
   ERROR HoodieDeltaStreamer: Got error running delta sync once. Shutting down
   org.apache.spark.SparkException: Task not serializable
   	at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:403)
   	at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:393)
   	at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:162)
   	at org.apache.spark.SparkContext.clean(SparkContext.scala:2327)
   	at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:393)
   	at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:392)
   	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
   	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
   	at org.apache.spark.rdd.RDD.withScope(RDD.scala:385)
   	at org.apache.spark.rdd.RDD.map(RDD.scala:392)
   	at org.apache.spark.api.java.JavaRDDLike$class.map(JavaRDDLike.scala:93)
   	at org.apache.spark.api.java.AbstractJavaRDDLike.map(JavaRDDLike.scala:45)
   	at org.apache.hudi.table.HoodieCopyOnWriteTable.scheduleClean(HoodieCopyOnWriteTable.java:304)
   	at org.apache.hudi.client.HoodieCleanClient.scheduleClean(HoodieCleanClient.java:114)
   	at org.apache.hudi.client.HoodieCleanClient.clean(HoodieCleanClient.java:91)
   	at org.apache.hudi.client.HoodieWriteClient.clean(HoodieWriteClient.java:835)
   	at org.apache.hudi.client.HoodieWriteClient.postCommit(HoodieWriteClient.java:512)
   	at org.apache.hudi.client.AbstractHoodieWriteClient.commit(AbstractHoodieWriteClient.java:157)
   	at org.apache.hudi.client.AbstractHoodieWriteClient.commit(AbstractHoodieWriteClient.java:101)
   	at org.apache.hudi.utilities.deltastreamer.DeltaSync.writeToSink(DeltaSync.java:395)
   	at org.apache.hudi.utilities.deltastreamer.DeltaSync.syncOnce(DeltaSync.java:238)
   	at org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.sync(HoodieDeltaStreamer.java:121)
   	at org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.main(HoodieDeltaStreamer.java:294)
   	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
   	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
   	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
   	at java.lang.reflect.Method.invoke(Method.java:498)
   	at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
   	at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:853)
   	at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:161)
   	at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:184)
   	at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:86)
   	at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:928)
   	at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:937)
   	at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
   Caused by: java.io.NotSerializableException: org.apache.hudi.common.util.RocksDBDAO
   Serialization stack:
   	- object not serializable (class: org.apache.hudi.common.util.RocksDBDAO, value: org.apache.hudi.common.util.RocksDBDAO@57e9cd2)
   	- field (class: org.apache.hudi.common.table.view.RocksDbBasedFileSystemView, name: rocksDB, type: class org.apache.hudi.common.util.RocksDBDAO)
   	- object (class org.apache.hudi.common.table.view.RocksDbBasedFileSystemView, org.apache.hudi.common.table.view.RocksDbBasedFileSystemView@306a9cd8)
   	- field (class: org.apache.hudi.table.CleanHelper, name: fileSystemView, type: interface org.apache.hudi.common.table.SyncableFileSystemView)
   	- object (class org.apache.hudi.table.CleanHelper, org.apache.hudi.table.CleanHelper@651a399)
   	- element of array (index: 0)
   	- array (class [Ljava.lang.Object;, size 1)
   	- field (class: java.lang.invoke.SerializedLambda, name: capturedArgs, type: class [Ljava.lang.Object;)
   	- object (class java.lang.invoke.SerializedLambda, SerializedLambda[capturingClass=class org.apache.hudi.table.HoodieCopyOnWriteTable, functionalInterfaceMethod=org/apache/spark/api/java/function/Function.call:(Ljava/lang/Object;)Ljava/lang/Object;, implementation=invokeStatic org/apache/hudi/table/HoodieCopyOnWriteTable.lambda$scheduleClean$d4c53b8f$1:(Lorg/apache/hudi/table/CleanHelper;Ljava/lang/String;)Lorg/apache/hudi/common/util/collection/Pair;, instantiatedMethodType=(Ljava/lang/String;)Lorg/apache/hudi/common/util/collection/Pair;, numCaptured=1])
   	- writeReplace data (class: java.lang.invoke.SerializedLambda)
   	- object (class org.apache.hudi.table.HoodieCopyOnWriteTable$$Lambda$234/745138736, org.apache.hudi.table.HoodieCopyOnWriteTable$$Lambda$234/745138736@46e32574)
   	- field (class: org.apache.spark.api.java.JavaPairRDD$$anonfun$toScalaFunction$1, name: fun$1, type: interface org.apache.spark.api.java.function.Function)
   	- object (class org.apache.spark.api.java.JavaPairRDD$$anonfun$toScalaFunction$1, <function1>)
   	at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)
   	at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:46)
   	at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100)
   	at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:400)
   ```
   
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] bradleyhurley commented on issue #2146: Bulk Insert - java.io.NotSerializableException: org.apache.hudi.common.util.RocksDBDAO

Posted by GitBox <gi...@apache.org>.
bradleyhurley commented on issue #2146:
URL: https://github.com/apache/hudi/issues/2146#issuecomment-726850471


   Thanks for the confirmation @geekalien. I am going to close now. We upgraded to 0.6.0 and verified we no longer had the issue.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] bvaradar commented on issue #2146: Bulk Insert - java.io.NotSerializableException: org.apache.hudi.common.util.RocksDBDAO

Posted by GitBox <gi...@apache.org>.
bvaradar commented on issue #2146:
URL: https://github.com/apache/hudi/issues/2146#issuecomment-703783191


   @bradleyhurley : 0.5.2 is pretty old. Can you use 0.5.3. I checked code in 0.5.3 and you should not be seeing this issue in 0.5.3


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] bradleyhurley closed issue #2146: Bulk Insert - java.io.NotSerializableException: org.apache.hudi.common.util.RocksDBDAO

Posted by GitBox <gi...@apache.org>.
bradleyhurley closed issue #2146:
URL: https://github.com/apache/hudi/issues/2146


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] geekalien commented on issue #2146: Bulk Insert - java.io.NotSerializableException: org.apache.hudi.common.util.RocksDBDAO

Posted by GitBox <gi...@apache.org>.
geekalien commented on issue #2146:
URL: https://github.com/apache/hudi/issues/2146#issuecomment-726837951


   We ran into the very same issue in our AWS environment. I can confirm that upgrading to 0.5.3 version resolved it.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] bradleyhurley commented on issue #2146: Bulk Insert - java.io.NotSerializableException: org.apache.hudi.common.util.RocksDBDAO

Posted by GitBox <gi...@apache.org>.
bradleyhurley commented on issue #2146:
URL: https://github.com/apache/hudi/issues/2146#issuecomment-703784334


   Let me see if I can test 0.5.3. I agree its pretty old, but its the latest bundled version provided by AWS.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org