You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by GitBox <gi...@apache.org> on 2021/08/09 02:31:59 UTC

[GitHub] [hudi] nochimow opened a new issue #3431: [SUPPORT] Failed to upsert for commit time

nochimow opened a new issue #3431:
URL: https://github.com/apache/hudi/issues/3431


   **Describe the problem you faced**
   Hi all, 
   We are currently facing some sporadic issues with the error: org.apache.hudi.exception.HoodieUpsertException: Failed to upsert for commit time.
   
   I searched about this error and all i found was that it is related with multiple-writing scenario, that it's not my case, we are using the single writter Hudi config, using hudi 0.8 in AWS Glue jobs and our jobs do not run in parallel for the same dataset.
   
   This error it's not fully reproducible, but it mainly happens in my biggest datasets, for some time, increasing the number and size of the AWS Glue machines fixed this error, but I also had this problem even with 14 * G.2X machines. But, we did not think that this problem is 100% related to data size, cause some bigger datasets worked on the same machines configs.
   
   Can you guys help me found what other causes may throw this error?
   
   **Stack error**
   `ERROR:__main__:WRITE:HUDI:TABLE:S3:ERROR: An error occurred while calling o193.save. : org.apache.hudi.exception.HoodieUpsertException: Failed to upsert for commit time 20210806184859 at org.apache.hudi.table.action.commit.AbstractWriteHelper.write(AbstractWriteHelper.java:62) at org.apache.hudi.table.action.commit.SparkUpsertCommitActionExecutor.execute(SparkUpsertCommitActionExecutor.java:46) at org.apache.hudi.table.HoodieSparkCopyOnWriteTable.upsert(HoodieSparkCopyOnWriteTable.java:94) at org.apache.hudi.table.HoodieSparkCopyOnWriteTable.upsert(HoodieSparkCopyOnWriteTable.java:84) at org.apache.hudi.client.SparkRDDWriteClient.upsert(SparkRDDWriteClient.java:154) at org.apache.hudi.DataSourceUtils.doWriteOperation(DataSourceUtils.java:214) at org.apache.hudi.HoodieSparkSqlWriter$.write(HoodieSparkSqlWriter.scala:186) at org.apache.hudi.DefaultSource.createRelation(DefaultSource.scala:145) at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataS
 ourceCommand.scala:45) at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70) at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68) at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:86) at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131) at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127) at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152) at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127) at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:80) at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:8
 0) at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:676) at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:676) at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:78) at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125) at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73) at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:676) at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:285) at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:271) at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:229) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMetho
 dAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) at py4j.Gateway.invoke(Gateway.java:282) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.GatewayConnection.run(GatewayConnection.java:238) at java.lang.Thread.run(Thread.java:748) Caused by: java.lang.IllegalArgumentException at org.apache.hudi.common.util.ValidationUtils.checkArgument(ValidationUtils.java:31) at org.apache.hudi.common.table.timeline.HoodieActiveTimeline.transitionState(HoodieActiveTimeline.java:396) at org.apache.hudi.common.table.timeline.HoodieActiveTimeline.transitionRequestedToInflight(HoodieActiveTimeline.java:453) at org.apache.hudi.table.action.commit.BaseCommitActionExecutor.saveWorkloadProfileMetadataToInflight(BaseCommitActionExecutor.java:114) at org.apache.hud
 i.table.action.commit.BaseSparkCommitActionExecutor.execute(BaseSparkCommitActionExecutor.java:128) at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.execute(BaseSparkCommitActionExecutor.java:78) at org.apache.hudi.table.action.commit.AbstractWriteHelper.write(AbstractWriteHelper.java:55) ... 39 more`
   
   **Hoodie configs:**
   "hoodie.datasource.write.keygenerator.class": "org.apache.hudi.keygen.ComplexKeyGenerator",
   "hoodie.datasource.write.payload.class": "org.apache.hudi.common.model.DefaultHoodieRecordPayload",
   "hoodie.datasource.hive_sync.partition_extractor_class": "org.apache.hudi.hive.MultiPartKeysValueExtractor",
   "hoodie.table.name": table_name,
   "hoodie.datasource.write.recordkey.field": IDX_COL,
   "hoodie.datasource.write.partitionpath.field": pks,
   "hoodie.datasource.write.hive_style_partitioning": "true",
   "hoodie.datasource.write.precombine.field": tiebreaker,
   "hoodie.datasource.write.operation": operation,
   "hoodie.write.concurrency.mode": "single_writer",
   "hoodie.cleaner.commits.retained": 1,
   "hoodie.fail.on.timeline.archiving": False,
   "hoodie.keep.max.commits": 3,
   "hoodie.keep.min.commits": 2,
   "hoodie.bloom.index.use.caching": True,
   "hoodie.parquet.compression.codec": "snappy"
                           
   **Environment Description**
   AWS Glue Job
   * Hudi version :
   0.8
   * Spark version :
   "Spark 2.4 - Python 3 with improved job times (Glue Version 2.0)"
   
   * Storage (HDFS/S3/GCS..) :
   S3
   * Running on Docker? (yes/no) :
   No
   
   **Additional context**
   
   Infrastructure: Glue Job + S3


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] nochimow commented on issue #3431: [SUPPORT] Failed to upsert for commit time

Posted by GitBox <gi...@apache.org>.
nochimow commented on issue #3431:
URL: https://github.com/apache/hudi/issues/3431#issuecomment-915577392


   Hi @nsivabalan, 
   
   After some other testing, we figured out what was our problem.
   Based on the AWS recommendation, we were wrongly assuming that the any memory parameter passed on the glue spark context were being automatically replaced by Glue. (Like some indicators showed us on SparkUI), but, in some ways this parameters were affecting in a negative way the Glue native memory management.
   
   Basically, removing all the manual memory settings (like memory and memory overhead) on our glue spark context made the jobs running properly without any memory issues. (We also tested the same jobs with the double of the files and with fewer machines, and it also worked well)
   
   Thank you for help on this case.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] nochimow edited a comment on issue #3431: [SUPPORT] Failed to upsert for commit time

Posted by GitBox <gi...@apache.org>.
nochimow edited a comment on issue #3431:
URL: https://github.com/apache/hudi/issues/3431#issuecomment-910251510


   Hi,
   Thanks for the reply.
   
   I was trying in some ways to change the memory and memory-overhead parameters without success.
   Since i am using the AWS Glue to run this, i opened a ticket with AWS support and received this response:
   
   > These 'conf' settings are not available for override. [1] This allows AWS to manage the resources dynamically and provide efficient performance. Below are  several argument names used by AWS Glue internally that you should never set:
   --conf — Internal to AWS Glue. Do not set!
   --debug — Internal to AWS Glue. Do not set!
   --mode — Internal to AWS Glue. Do not set!
   --JOB_NAME — Internal to AWS Glue. Do not set!
   
   >For G.1X Worker nodes:
   The maximum amount of driver memory you can provide is 10GB.
   Each executor is configured with 10 GB memory
   Each executor is configured with 8 spark cores
   Each worker is configured with 1 executor
   Each worker maps to 1 DPU (4 vCPU, 16 GB off memory, 64 GB disk), and provides 1 executor per worker.
   
   >For G.2X Worker nodes:
   The maximum amount of driver memory you can provide is 20GB.
   Each executor is configured with 20 GB memory
   Each executor is configured with 16 spark cores
   Each worker is configured with 1 executor
   Each worker maps to 2 DPU (8 vCPU, 32 GB of memory, 128 GB disk), and provides 1 executor per worker
   
   >Each executor has several task slots (or CPU cores) for running tasks in parallel.
     * numExecutors =
           * (DPU - 1) * 2 - 1 if WorkerType is Standard
           * (NumberOfWorkers - 1) if WorkerType is G.1X or G.2X

       * numSlotsPerExecutor =
           * 4 if WorkerType is Standard
           * 8 if WorkerType is G.1X
           * 16 if WorkerType is G.2X

       * numSlots = numSlotsPerExecutor * numExecutors_
   
   Reference: https://docs.aws.amazon.com/glue/latest/dg/aws-glue-programming-etl-glue-arguments.html
   
   So, in this case, the best option on AWS Glue is to use the G.2X machines, that we are already using, that sets the following parameters by default (and can't be overriden) 
   
   --conf spark.dynamicAllocation.enabled=true 
   --conf spark.shuffle.service.enabled=true 
   --conf spark.dynamicAllocation.minExecutors=1 
   --conf spark.dynamicAllocation.maxExecutors=6 
   --conf spark.executor.memory=20g 
   --conf spark.executor.cores=16 
   --conf spark.driver.memory=20g
   --conf spark.default.parallelism=112 
   --conf spark.sql.shuffle.partitions=112 --conf 
   
   Like i mentioned on my initial post, we used 14 * G.2X machines and also got this error.
   Since these parameters can't be change, there is any tuning that can be done on Hudi configuration side or any other alternative?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] nochimow edited a comment on issue #3431: [SUPPORT] Failed to upsert for commit time

Posted by GitBox <gi...@apache.org>.
nochimow edited a comment on issue #3431:
URL: https://github.com/apache/hudi/issues/3431#issuecomment-905793757


   Hi, 
   Sorry for the delay on reply, i had lost the execution history, and had to execute this same scenario again.
   Basically we are doing a ingestion of 57 avro files with typical sizes of 70-128MB giving 2,75GB of input data. We create a spark data-frame loading all these files and writing into Hudi. 
   This data is equal 186 million of rows. 
   The schema of the table is composed of 7 string columns, 2 BigInt columns, partitioned by 3 String columns (Day, Month, Year).
   I also checked that this table only have inserts, and this execution was related of a single partition (the current day)
   
   I also attached the glue job driver detailed logs in 2 .csv 
   
    (The Start and the end) the mid of it as far i saw don't show nothing useful, but if you need it, please let me know.
   
   Also, in this case we are running the glue job with the following infrastructure parameters:
   Worker Type: G2X
   Number of Workers: 7
   Max Concurrency 999
   Job Timeout 2880
   
   But we also did some tests on the past with 14 Workers and we had the same issue.
   
   Thank you in advance.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] nochimow edited a comment on issue #3431: [SUPPORT] Failed to upsert for commit time

Posted by GitBox <gi...@apache.org>.
nochimow edited a comment on issue #3431:
URL: https://github.com/apache/hudi/issues/3431#issuecomment-910251510


   Hi,
   Thanks for the reply.
   
   I was trying in some ways to change the memory and memory-overhead parameters without success.
   Since i am using the AWS Glue to run this, i opened a ticket with AWS support and received this response:
   
   > These 'conf' settings are not available for override. [1] This allows AWS to manage the resources dynamically and provide efficient performance. Below are  several argument names used by AWS Glue internally that you should never set:
   --conf — Internal to AWS Glue. Do not set!
   --debug — Internal to AWS Glue. Do not set!
   --mode — Internal to AWS Glue. Do not set!
   --JOB_NAME — Internal to AWS Glue. Do not set!
   
   >For G.1X Worker nodes:
   The maximum amount of driver memory you can provide is 10GB.
   Each executor is configured with 10 GB memory
   Each executor is configured with 8 spark cores
   Each worker is configured with 1 executor
   Each worker maps to 1 DPU (4 vCPU, 16 GB off memory, 64 GB disk), and provides 1 executor per worker.
   
   >For G.2X Worker nodes:
   The maximum amount of driver memory you can provide is 20GB.
   Each executor is configured with 20 GB memory
   Each executor is configured with 16 spark cores
   Each worker is configured with 1 executor
   Each worker maps to 2 DPU (8 vCPU, 32 GB of memory, 128 GB disk), and provides 1 executor per worker
   
   >Each executor has several task slots (or CPU cores) for running tasks in parallel.
    > * numExecutors =
           * (DPU - 1) * 2 - 1 if WorkerType is Standard
           * (NumberOfWorkers - 1) if WorkerType is G.1X or G.2X

       * numSlotsPerExecutor =
           * 4 if WorkerType is Standard
           * 8 if WorkerType is G.1X
           * 16 if WorkerType is G.2X

       * numSlots = numSlotsPerExecutor * numExecutors_
   
   Reference: https://docs.aws.amazon.com/glue/latest/dg/aws-glue-programming-etl-glue-arguments.html
   
   So, in this case, the best option on AWS Glue is to use the G.2X machines, that we are already using, that sets the following parameters by default (and can't be overriden) 
   
   --conf spark.dynamicAllocation.enabled=true 
   --conf spark.shuffle.service.enabled=true 
   --conf spark.dynamicAllocation.minExecutors=1 
   --conf spark.dynamicAllocation.maxExecutors=6 
   --conf spark.executor.memory=20g 
   --conf spark.executor.cores=16 
   --conf spark.driver.memory=20g
   --conf spark.default.parallelism=112 
   --conf spark.sql.shuffle.partitions=112 --conf 
   
   Like i mentioned on my initial post, we used 14 * G.2X machines and also got this error.
   Since these parameters can't be change, there is any tuning that can be done on Hudi configuration side or any other alternative?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] nochimow edited a comment on issue #3431: [SUPPORT] Failed to upsert for commit time

Posted by GitBox <gi...@apache.org>.
nochimow edited a comment on issue #3431:
URL: https://github.com/apache/hudi/issues/3431#issuecomment-905793757


   Hi, 
   Sorry for the delay on reply, i had lost the execution history, and had to execute this same scenario again.
   Basically we are doing a ingestion of 57 avro files with typical sizes of 70-128MB giving 2,75GB of input data. We create a spark data-frame loading all these files and writing into Hudi. 
   This data is equal 186 million of rows. 
   The schema of the table is composed of 7 string columns, 2 BigInt columns, partitioned by 3 String columns (Day, Month, Year).
   I also checked that this table only have inserts, and this execution was related of a single partition (the current day)
   
   I also attached the glue job driver detailed logs in 2 .csv 
   [start.csv](https://github.com/apache/hudi/files/7049222/start.csv)
   [end.csv](https://github.com/apache/hudi/files/7049220/end.csv)
   
    (The Start and the end) the mid of it as far i saw don't show nothing useful, but if you need it, please let me know.
   
   Also, in this case we are running the glue job with the following infrastructure parameters:
   Worker Type: G2X
   Number of Workers: 7
   Max Concurrency 999
   Job Timeout 2880
   
   But we also did some tests on the past with 14 Workers and we had the same issue.
   
   Thank you in advance.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] codope commented on issue #3431: [SUPPORT] Failed to upsert for commit time

Posted by GitBox <gi...@apache.org>.
codope commented on issue #3431:
URL: https://github.com/apache/hudi/issues/3431#issuecomment-907163686


   @nochimow Thanks for providing the logs.  For convenience, I am pasting the relevant stacktrace below.
   ```
   Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: ShuffleMapStage 18 (countByKey at BaseSparkCommitActionExecutor.java:158) has failed the maximum allowable number of times: 4. Most recent failure reason: org.apache.spark.shuffle.FetchFailedException: Failed to connect to /172.35.196.242:35965 	at org.apache.spark.storage.ShuffleBlockFetcherIterator.throwFetchFailedException(ShuffleBlockFetcherIterator.scala:554) 	at org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:485) 	at org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:64) 	at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:435) 	at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:441) 	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409) 	at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:31) 	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIt
 erator.scala:37) 	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409) 	at org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:156) 	at org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$4.apply(CoGroupedRDD.scala:154) 	at org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$4.apply(CoGroupedRDD.scala:153) 	at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:733) 	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) 	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) 	at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:732) 	at org.apache.spark.rdd.CoGroupedRDD.compute(CoGroupedRDD.scala:153) 	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324) 	at org.apache.spark.rdd.RDD.iterator(RDD.scala:288) 	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) 	at org.apache.spark.rdd.RDD.com
 puteOrReadCheckpoint(RDD.scala:324) 	at org.apache.spark.rdd.RDD.iterator(RDD.scala:288) 	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) 	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324) 	at org.apache.spark.rdd.RDD.iterator(RDD.scala:288) 	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) 	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324) 	at org.apache.spark.rdd.RDD.iterator(RDD.scala:288) 	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) 	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324) 	at org.apache.spark.rdd.RDD.iterator(RDD.scala:288) 	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) 	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324) 	at org.apache.spark.rdd.RDD$$anonfun$7.apply(RDD.scala:337) 	at org.apache.spark.rdd.RDD$$anonfun$7.apply(RDD.scala:335) 	at org.apache.spark.storage.BlockManager$$anon
 fun$doPutIterator$1.apply(BlockManager.scala:1182) 	at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1156) 	at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:1091) 	at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1156) 	at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:882) 	at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:335) 	at org.apache.spark.rdd.RDD.iterator(RDD.scala:286) 	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) 	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324) 	at org.apache.spark.rdd.RDD.iterator(RDD.scala:288) 	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) 	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324) 	at org.apache.spark.rdd.RDD.iterator(RDD.scala:288) 	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99) 	at org.apache.spark.schedu
 ler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55) 	at org.apache.spark.scheduler.Task.run(Task.scala:121) 	at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408) 	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360) 	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414) 	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) 	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) 	at java.lang.Thread.run(Thread.java:748) Caused by: java.io.IOException: Failed to connect to /172.35.196.242:35965 	at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:245) 	at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:187) 	at org.apache.spark.network.netty.NettyBlockTransferService$$anon$2.createAndStart(NettyBlockTransferService.scala:114) 	at org.apache.spark.network.shuffle.Retr
 yingBlockFetcher.fetchAllOutstanding(RetryingBlockFetcher.java:141) 	at org.apache.spark.network.shuffle.RetryingBlockFetcher.lambda$initiateRetry$0(RetryingBlockFetcher.java:169) 	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) 	at java.util.concurrent.FutureTask.run(FutureTask.java:266) 	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) 	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) 	at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:138) 	... 1 more Caused by: io.netty.channel.AbstractChannel$AnnotatedConnectException: Connection refused: /172.35.196.242:35965 	at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method) 	at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:716) 	at io.netty.channel.socket.nio.NioSocketChannel.doFinishConnect(NioSocketChannel.java:323) 	at io.netty.channel.nio.AbstractNioChannel$Abstr
 actNioUnsafe.finishConnect(AbstractNioChannel.java:340) 	at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:633) 	at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:580) 	at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:497) 	at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:459) 	at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:858) 	... 2 more Caused by: java.net.ConnectException: Connection refused 	... 11 more 
   	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1889)
   	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1877)
   	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1876)
   	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
   	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
   	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1876)
   	at org.apache.spark.scheduler.DAGScheduler.handleTaskCompletion(DAGScheduler.scala:1493)
   	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2107)
   	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2059)
   	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2048)
   	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
   	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:737)
   	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2061)
   	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2082)
   	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2101)
   	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2126)
   	at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:945)
   	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
   	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
   	at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
   	at org.apache.spark.rdd.RDD.collect(RDD.scala:944)
   	at org.apache.spark.rdd.PairRDDFunctions$$anonfun$countByKey$1.apply(PairRDDFunctions.scala:370)
   	at org.apache.spark.rdd.PairRDDFunctions$$anonfun$countByKey$1.apply(PairRDDFunctions.scala:370)
   	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
   	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
   	at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
   	at org.apache.spark.rdd.PairRDDFunctions.countByKey(PairRDDFunctions.scala:369)
   	at org.apache.spark.api.java.JavaPairRDD.countByKey(JavaPairRDD.scala:312)
   	at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.buildProfile(BaseSparkCommitActionExecutor.java:158)
   	at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.execute(BaseSparkCommitActionExecutor.java:126)
   	at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.execute(BaseSparkCommitActionExecutor.java:78)
   	at org.apache.hudi.table.action.commit.AbstractWriteHelper.write(AbstractWriteHelper.java:55)
   	... 39 more
   ```
   
   Note that,
   ```
   ShuffleMapStage 18 (countByKey at BaseSparkCommitActionExecutor.java:158) has failed the maximum allowable number of times: 4. Most recent failure reason: org.apache.spark.shuffle.FetchFailedException: Failed to connect to /172.35.196.242:35965
   ```
   I am not sure if that's a Hudi issue. `FetchFailedException` and failure to connect typically happens due to memory pressure on the executors. Consider tuning the executor memory  and memory overhead parameters.
   
   cc: @nsivabalan 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] nochimow edited a comment on issue #3431: [SUPPORT] Failed to upsert for commit time

Posted by GitBox <gi...@apache.org>.
nochimow edited a comment on issue #3431:
URL: https://github.com/apache/hudi/issues/3431#issuecomment-910251510


   Hi,
   Thanks for the reply.
   
   I was trying in some ways to change the memory and memory-overhead parameters without success.
   Since i am using the AWS Glue to run this, i opened a ticket with AWS support and received this response:
   
   > These 'conf' settings are not available for override. [1] This allows AWS to manage the resources dynamically and provide efficient performance. Below are  several argument names used by AWS Glue internally that you should never set:
   --conf — Internal to AWS Glue. Do not set!
   --debug — Internal to AWS Glue. Do not set!
   --mode — Internal to AWS Glue. Do not set!
   --JOB_NAME — Internal to AWS Glue. Do not set!
   
   >For G.1X Worker nodes:
   The maximum amount of driver memory you can provide is 10GB.
   Each executor is configured with 10 GB memory
   Each executor is configured with 8 spark cores
   Each worker is configured with 1 executor
   Each worker maps to 1 DPU (4 vCPU, 16 GB off memory, 64 GB disk), and provides 1 executor per worker.
   
   >For G.2X Worker nodes:
   The maximum amount of driver memory you can provide is 20GB.
   Each executor is configured with 20 GB memory
   Each executor is configured with 16 spark cores
   Each worker is configured with 1 executor
   Each worker maps to 2 DPU (8 vCPU, 32 GB of memory, 128 GB disk), and provides 1 executor per worker
   
   >Each executor has several task slots (or CPU cores) for running tasks in parallel.
     * numExecutors =
           * (DPU - 1) * 2 - 1 if WorkerType is Standard
           * (NumberOfWorkers - 1) if WorkerType is G.1X or G.2X

       * numSlotsPerExecutor =
           * 4 if WorkerType is Standard
           * 8 if WorkerType is G.1X
           * 16 if WorkerType is G.2X

       * numSlots = numSlotsPerExecutor * numExecutors_
   
   Reference: https://docs.aws.amazon.com/glue/latest/dg/aws-glue-programming-etl-glue-arguments.html
   
   So, in this case, the best option on AWS Glue is to use the G.2X machines, that we are already using, that sets the following parameters by default (and can't be overriden) 
   
   --conf spark.dynamicAllocation.enabled=true 
   --conf spark.shuffle.service.enabled=true 
   --conf spark.dynamicAllocation.minExecutors=1 
   --conf spark.dynamicAllocation.maxExecutors=6 
   --conf spark.executor.memory=20g 
   --conf spark.executor.cores=16 
   --conf spark.driver.memory=20g
   --conf spark.default.parallelism=112 
   --conf spark.sql.shuffle.partitions=112 --conf 
   
   Like i mentioned on my initial post, we used 14 * G.2X machines and also got this error.
   Since these parameters can't be change, there is any tuning that can be done on Hudi configuration side? 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] nsivabalan commented on issue #3431: [SUPPORT] Failed to upsert for commit time

Posted by GitBox <gi...@apache.org>.
nsivabalan commented on issue #3431:
URL: https://github.com/apache/hudi/issues/3431#issuecomment-917431404


   awesome, thnx for the update. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] nochimow commented on issue #3431: [SUPPORT] Failed to upsert for commit time

Posted by GitBox <gi...@apache.org>.
nochimow commented on issue #3431:
URL: https://github.com/apache/hudi/issues/3431#issuecomment-905793757


   Hi, 
   Sorry for the delay on reply, i had lost the execution history, and had to execute this same scenario again.
   Basically we are doing a ingestion of 57 avro files with typical sizes of 70-128MB giving 2,75GB of input data. We create a spark data-frame loading all these files and writing into Hudi. 
   This data is equal 186 million of rows. 
   The schema of the table is composed of 7 string columns, 2 BigInt columns, partitioned by 3 String columns (Day, Month, Year)
   I also checked that this table only have inserts.
   
   I also attached the glue job driver detailed logs in 2 .csv 
   [start.csv](https://github.com/apache/hudi/files/7049222/start.csv)
   [end.csv](https://github.com/apache/hudi/files/7049220/end.csv)
   
    (The Start and the end) the mid of it as far i saw don't show nothing useful, but if you need it, please let me know.
   
   Also, in this case we are running the glue job with the following infrastructure parameters:
   Worker Type: G2X
   Number of Workers: 7
   Max Concurrency 999
   Job Timeout 2880
   
   But we also did some tests on the past with 14 Workers and we had the same issue.
   
   Thank you in advance.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] nsivabalan commented on issue #3431: [SUPPORT] Failed to upsert for commit time

Posted by GitBox <gi...@apache.org>.
nsivabalan commented on issue #3431:
URL: https://github.com/apache/hudi/issues/3431#issuecomment-901500066


   hmmm, interesting. do you happened to have any driver logs related to this? Stack trace given above does not have much info :(.
   what do you mean by biggest datasets? 
   I mean, I understand your hudi dataset size is large. But does your write batch is large too? can you give us some insights on your dataset characteristics in general. like # partitions, data size, updates vs inserts, etc. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] nochimow commented on issue #3431: [SUPPORT] Failed to upsert for commit time

Posted by GitBox <gi...@apache.org>.
nochimow commented on issue #3431:
URL: https://github.com/apache/hudi/issues/3431#issuecomment-910251510


   Hi,
   Thanks for the reply.
   
   I was trying in some ways to change the memory and memory-overhead parameters without success.
   Since i am using the AWS Glue to run this, i opened a ticket with AWS support and received this response:
   
   
   _These 'conf' settings are not available for override. [1] This allows AWS to manage the resources dynamically and provide efficient performance. Below are  several argument names used by AWS Glue internally that you should never set:
   
   --conf — Internal to AWS Glue. Do not set!
   --debug — Internal to AWS Glue. Do not set!
   --mode — Internal to AWS Glue. Do not set!
   --JOB_NAME — Internal to AWS Glue. Do not set!
   
   I am writing down difference between different worker types below .  
   
   -->For G.1X Worker nodes:
   
   The maximum amount of driver memory you can provide is 10GB.
   Each executor is configured with 10 GB memory
   Each executor is configured with 8 spark cores
   Each worker is configured with 1 executor
   Each worker maps to 1 DPU (4 vCPU, 16 GB off memory, 64 GB disk), and provides 1 executor per worker.
   
   For G.2X Worker nodes:
   
   The maximum amount of driver memory you can provide is 20GB.
   Each executor is configured with 20 GB memory
   Each executor is configured with 16 spark cores
   Each worker is configured with 1 executor
   Each worker maps to 2 DPU (8 vCPU, 32 GB of memory, 128 GB disk), and provides 1 executor per worker
   
   Each executor has several task slots (or CPU cores) for running tasks in parallel [4].
   
     * numExecutors =
           * (DPU - 1) * 2 - 1 if WorkerType is Standard
           * (NumberOfWorkers - 1) if WorkerType is G.1X or G.2X

       * numSlotsPerExecutor =
           * 4 if WorkerType is Standard
           * 8 if WorkerType is G.1X
           * 16 if WorkerType is G.2X

       * numSlots = numSlotsPerExecutor * numExecutors_
   
   Reference: https://docs.aws.amazon.com/glue/latest/dg/aws-glue-programming-etl-glue-arguments.html
   
   So, in this case, the best option on AWS Glue is to use the G.2X machines, that we are already using, that sets the following parameters by default (and can't be overriden) 
   
   --conf spark.dynamicAllocation.enabled=true 
   --conf spark.shuffle.service.enabled=true 
   --conf spark.dynamicAllocation.minExecutors=1 
   --conf spark.dynamicAllocation.maxExecutors=6 
   --conf spark.executor.memory=20g 
   --conf spark.executor.cores=16 
   --conf spark.driver.memory=20g
   --conf spark.default.parallelism=112 
   --conf spark.sql.shuffle.partitions=112 --conf 
   
   Like i mentioned on my initial post, we used 14 * G.2X machines and also got this error.
   Since these parameters can't be change, there is any tuning that can be done on Hudi configuration side? 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] nsivabalan commented on issue #3431: [SUPPORT] Failed to upsert for commit time

Posted by GitBox <gi...@apache.org>.
nsivabalan commented on issue #3431:
URL: https://github.com/apache/hudi/issues/3431#issuecomment-912846498


   Have you tried tuning GC by any chance? 
   
   Do you mind posting spark UI stages screenshot to see if there are any skews are GC overhead. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] nochimow closed issue #3431: [SUPPORT] Failed to upsert for commit time

Posted by GitBox <gi...@apache.org>.
nochimow closed issue #3431:
URL: https://github.com/apache/hudi/issues/3431


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org