You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Vadim Semenov <va...@datadoghq.com.INVALID> on 2019/09/03 20:45:24 UTC

Re: EMR Spark 2.4.3 executor hang

Try "spark.shuffle.io.numConnectionsPerPeer=10"

On Fri, Aug 30, 2019 at 10:22 AM Daniel Zhang <ja...@hotmail.com> wrote:

> Hi, All:
> We are testing the EMR and compare with our on-premise HDP solution. We
> use one application as the test:
> EMR (5.21.1) with Hadoop 2.8.5 + Spark 2.4.3 vs HDP (2.6.3) with Hadoop
> 2.7.3 + Spark 2.2.0
> The application is very simple, just read Parquet raw file, then do a
> DS.repartition(id_col).flatMap().write.partitionBy(col).save() operation.
>
> For the testing data on HDP with 6 slave nodes (32G each), the whole
> application can finish around 3 hours. We are fine with it.
> This application will run a Spark application with 2 stages. The 2nd stage
> will run with 200 tasks as default.
> On EMR, we observed that 2 of 200 tasks is hanging for more than 10 hours,
> while the rests are done, and we have to give up.
>
> The first test is to read the raw parquet file from S3 and use AWS S3 as
> the output directly. So I think it could be some issue with S3 output
> committer. So we change the test to read parquet file from S3 and use EMR
> HDFS as the output location.
> To my surprise, we observed the same behavior using HDFS, 2 of 200 tasks
> hanging forever, and they are on different executors. These 2 executors are
> normal to process other tasks but just hang for these 2 tasks, while all
> the rest finished.
>
> This looks like data skew, but we know it is not. As the same application
> and the same data work fine on HDP, and we saw well-balanced data across
> all 200 tasks.
>
> Now I checked more careful for the executors log on EMR for using HDFS
> test case, and I know the S3 is not an issue here, as all the parquet raw
> data being read in the first stage of the job WITHOUT any delay.
>
> Sample log from the finished executor on EMR:
> *19/08/29 20:18:49 INFO Executor: Finished task 157.0 in stage 2.0 (TID
> 170). 3854 bytes result sent to driver*
> *19/08/29 20:18:49 INFO CoarseGrainedExecutorBackend: Got assigned task
> 179*
> *19/08/29 20:18:49 INFO Executor: Running task 166.0 in stage 2.0 (TID
> 179)*
> *19/08/29 20:18:49 INFO ShuffleBlockFetcherIterator: Getting 12 non-empty
> blocks including 1 local blocks and 11 remote blocks*
> *19/08/29 20:18:49 INFO TransportClientFactory: Found inactive connection
> to ip-10-51-51-147.ec2.internal/10.51.51.147:7337
> <http://10.51.51.147:7337>, creating a new one.*
> *19/08/29 20:18:49 INFO TransportClientFactory: Successfully created
> connection to ip-10-51-51-147.ec2.internal/10.51.51.147:7337
> <http://10.51.51.147:7337> after 0 ms (0 ms spent in bootstraps)*
> *19/08/29 20:18:49 INFO TransportClientFactory: Found inactive connection
> to ip-10-51-51-141.ec2.internal/10.51.51.141:7337
> <http://10.51.51.141:7337>, creating a new one.*
> *19/08/29 20:18:49 INFO TransportClientFactory: Successfully created
> connection to ip-10-51-51-141.ec2.internal/10.51.51.141:7337
> <http://10.51.51.141:7337> after 0 ms (0 ms spent in bootstraps)*
> *19/08/29 20:18:49 INFO TransportClientFactory: Found inactive connection
> to ip-10-51-51-155.ec2.internal/10.51.51.155:7337
> <http://10.51.51.155:7337>, creating a new one.*
> *19/08/29 20:18:49 INFO TransportClientFactory: Successfully created
> connection to ip-10-51-51-155.ec2.internal/10.51.51.155:7337
> <http://10.51.51.155:7337> after 0 ms (0 ms spent in bootstraps)*
> *19/08/29 20:18:49 INFO TransportClientFactory: Found inactive connection
> to ip-10-51-51-142.ec2.internal/10.51.51.142:7337
> <http://10.51.51.142:7337>, creating a new one.*
> *19/08/29 20:18:49 INFO TransportClientFactory: Successfully created
> connection to ip-10-51-51-142.ec2.internal/10.51.51.142:7337
> <http://10.51.51.142:7337> after 0 ms (0 ms spent in bootstraps)*
> *19/08/29 20:18:49 INFO TransportClientFactory: Found inactive connection
> to ip-10-51-51-140.ec2.internal/10.51.51.140:7337
> <http://10.51.51.140:7337>, creating a new one.*
> *19/08/29 20:18:49 INFO TransportClientFactory: Successfully created
> connection to ip-10-51-51-140.ec2.internal/10.51.51.140:7337
> <http://10.51.51.140:7337> after 0 ms (0 ms spent in bootstraps)*
> *19/08/29 20:18:49 INFO TransportClientFactory: Found inactive connection
> to ip-10-51-51-157.ec2.internal/10.51.51.157:7337
> <http://10.51.51.157:7337>, creating a new one.*
> *19/08/29 20:18:49 INFO TransportClientFactory: Successfully created
> connection to ip-10-51-51-157.ec2.internal/10.51.51.157:7337
> <http://10.51.51.157:7337> after 0 ms (0 ms spent in bootstraps)*
> *19/08/29 20:18:49 INFO ShuffleBlockFetcherIterator: Started 11 remote
> fetches in 61 ms*
> *19/08/29 20:28:55 INFO FileOutputCommitter: File Output Committer
> Algorithm version is 1*
> .................
>
> The last log from the hanging executor on EMR:
> *19/08/29 19:40:40 INFO Executor: Finished task 78.0 in stage 2.0 (TID
> 91). 3854 bytes result sent to driver*
> *19/08/29 19:40:40 INFO CoarseGrainedExecutorBackend: Got assigned task
> 101*
> *19/08/29 19:40:40 INFO Executor: Running task 88.0 in stage 2.0 (TID 101)*
> *19/08/29 19:40:40 INFO ShuffleBlockFetcherIterator: Getting 12 non-empty
> blocks including 1 local blocks and 11 remote blocks*
> *19/08/29 19:40:40 INFO TransportClientFactory: Found inactive connection
> to ip-10-51-51-147.ec2.internal/10.51.51.147:7337
> <http://10.51.51.147:7337>, creating a new one.*
> *19/08/29 19:40:40 INFO TransportClientFactory: Successfully created
> connection to ip-10-51-51-147.ec2.internal/10.51.51.147:7337
> <http://10.51.51.147:7337> after 0 ms (0 ms spent in bootstraps)*
> *19/08/29 19:40:40 INFO TransportClientFactory: Found inactive connection
> to ip-10-51-51-157.ec2.internal/10.51.51.157:7337
> <http://10.51.51.157:7337>, creating a new one.*
> *19/08/29 19:40:40 INFO TransportClientFactory: Successfully created
> connection to ip-10-51-51-157.ec2.internal/10.51.51.157:7337
> <http://10.51.51.157:7337> after 1 ms (0 ms spent in bootstraps)*
> *19/08/29 19:40:40 INFO TransportClientFactory: Found inactive connection
> to ip-10-51-51-142.ec2.internal/10.51.51.142:7337
> <http://10.51.51.142:7337>, creating a new one.*
> *19/08/29 19:40:40 INFO TransportClientFactory: Successfully created
> connection to ip-10-51-51-142.ec2.internal/10.51.51.142:7337
> <http://10.51.51.142:7337> after 1 ms (0 ms spent in bootstraps)*
> *19/08/29 19:40:40 INFO TransportClientFactory: Found inactive connection
> to ip-10-51-51-141.ec2.internal/10.51.51.141:7337
> <http://10.51.51.141:7337>, creating a new one.*
> *19/08/29 19:40:40 INFO TransportClientFactory: Successfully created
> connection to ip-10-51-51-141.ec2.internal/10.51.51.141:7337
> <http://10.51.51.141:7337> after 0 ms (0 ms spent in bootstraps)*
> *19/08/29 19:40:40 INFO TransportClientFactory: Found inactive connection
> to ip-10-51-51-155.ec2.internal/10.51.51.155:7337
> <http://10.51.51.155:7337>, creating a new one.*
> *19/08/29 19:40:40 INFO TransportClientFactory: Successfully created
> connection to ip-10-51-51-155.ec2.internal/10.51.51.155:7337
> <http://10.51.51.155:7337> after 0 ms (0 ms spent in bootstraps)*
> *19/08/29 19:40:40 INFO TransportClientFactory: Found inactive connection
> to ip-10-51-51-140.ec2.internal/10.51.51.140:7337
> <http://10.51.51.140:7337>, creating a new one.*
> *19/08/29 19:40:40 INFO TransportClientFactory: Successfully created
> connection to ip-10-51-51-140.ec2.internal/10.51.51.140:7337
> <http://10.51.51.140:7337> after 0 ms (0 ms spent in bootstraps)*
> *19/08/29 19:40:40 INFO ShuffleBlockFetcherIterator: Started 11 remote
> fetches in 73 ms*
>
> It shows that on the hanging executor, it started fetching data for task
> "101", but never reached "FileOutputCommitter", for this particular task
> "101". There were other tasks "91" finished without any issue on this
> executor before.
> I checked the HDFS output location:
> *[hadoop@ip-10-51-51-151 ~]$ hadoop fs -ls -R
> /user/hadoop/permalink/working/working/20190829190515/data/_temporary/0/task_20190829190552_0002_m_000101*
> *drwxr-xr-x   - hadoop hadoop          0 2019-08-29 19:51
> /user/hadoop/permalink/working/working/20190829190515/data/_temporary/0/task_20190829190552_0002_m_000101/dt=2019-08-01*
> *-rw-r--r--   2 hadoop hadoop  170976376 2019-08-29 19:51
> /user/hadoop/permalink/working/working/20190829190515/data/_temporary/0/task_20190829190552_0002_m_000101/dt=2019-08-01/part-00101-cc72b7f3-cb01-4bdf-992d-1567cfc9ebd6.c000.snappy.parquet*
> *drwxr-xr-x   - hadoop hadoop          0 2019-08-29 19:51
> /user/hadoop/permalink/working/working/20190829190515/data/_temporary/0/task_20190829190552_0002_m_000101/dt=2019-08-02*
> *-rw-r--r--   2 hadoop hadoop  102985213 2019-08-29 19:51
> /user/hadoop/permalink/working/working/20190829190515/data/_temporary/0/task_20190829190552_0002_m_000101/dt=2019-08-02/part-00101-cc72b7f3-cb01-4bdf-992d-1567cfc9ebd6.c000.snappy.parquet*
> *drwxr-xr-x   - hadoop hadoop          0 2019-08-29 19:51
> /user/hadoop/permalink/working/working/20190829190515/data/_temporary/0/task_20190829190552_0002_m_000101/dt=2019-08-03*
> *-rw-r--r--   2 hadoop hadoop   58306503 2019-08-29 19:51
> /user/hadoop/permalink/working/working/20190829190515/data/_temporary/0/task_20190829190552_0002_m_000101/dt=2019-08-03/part-00101-cc72b7f3-cb01-4bdf-992d-1567cfc9ebd6.c000.snappy.parquet*
> *drwxr-xr-x   - hadoop hadoop          0 2019-08-29 19:51
> /user/hadoop/permalink/working/working/20190829190515/data/_temporary/0/task_20190829190552_0002_m_000101/dt=UNKNOWN*
> *-rw-r--r--   2 hadoop hadoop  258330267 2019-08-29 19:51
> /user/hadoop/permalink/working/working/20190829190515/data/_temporary/0/task_20190829190552_0002_m_000101/dt=UNKNOWN/part-00101-cc72b7f3-cb01-4bdf-992d-1567cfc9ebd6.c000.snappy.parquet*
>
> In fact, for me, all the intermedia data for this task "101" *SHOULD
> ALREADY BE DONE *on HDFS at "19:51". The output parquet files size is
> close to other tasks' output which already was finished.
>
> So my questions are:
>
> 1) What COULD stop these 2 executors reaching "FileOutputCommitter" in
> Spark 2.4.3 in this case? I really don't believe at this time they were
> still fetching data from remote.
> 2) Of course, this Spark 2.4.3 is running on EMR, and AWS gave us the
> following configurations may related to the above issue as below:
>
> spark.hadoop.yarn.timeline-service.enabled false
> spark.yarn.appMasterEnv.SPARK_PUBLIC_DNS $(hostname -f)
> spark.files.fetchFailure.unRegisterOutputOnHost true
> spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version.emr_internal_use_only.EmrFileSystem
> 2
> spark.hadoop.mapreduce.fileoutputcommitter.cleanup-failures.ignored.emr_internal_use_only.EmrFileSystem
> true
> spark.sql.parquet.output.committer.class
> com.amazon.emr.committer.EmrOptimizedSparkSqlParquetOutputCommitter
> spark.sql.parquet.fs.optimized.committer.optimization-enabled true
> spark.sql.emr.internal.extensions
> com.amazonaws.emr.spark.EmrSparkSessionExtensions
>
> Can anyone give me some idea what could cause this issue?
>
> Thanks
>
> Yong
>


-- 
Sent from my iPhone