You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Mullaivendhan Ariaputhri (Jira)" <ji...@apache.org> on 2020/01/30 08:24:00 UTC

[jira] [Updated] (SPARK-30677) Spark Streaming Job stuck when Kinesis Shard is increased when the job is running

     [ https://issues.apache.org/jira/browse/SPARK-30677?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Mullaivendhan Ariaputhri updated SPARK-30677:
---------------------------------------------
    Description: 
Spark job stopped processing when the number of shards is increased when the job is already running.

We have observed the below exceptions.

 

2020-01-27 06:42:29 WARN FileBasedWriteAheadLog_ReceivedBlockTracker:66 - Failed to write to write ahead log
 2020-01-27 06:42:29 WARN FileBasedWriteAheadLog_ReceivedBlockTracker:66 - Failed to write to write ahead log
 2020-01-27 06:42:29 ERROR FileBasedWriteAheadLog_ReceivedBlockTracker:70 - Failed to write to write ahead log after 3 failures
 2020-01-27 06:42:29 WARN BatchedWriteAheadLog:87 - BatchedWriteAheadLog Writer failed to write ArrayBuffer(Record(java.nio.HeapByteBuffer[pos=0 lim=1845 cap=1845],1580107349095,Future(<not completed>)))
 java.io.IOException: Not supported
 at com.amazon.ws.emr.hadoop.fs.s3n.S3NativeFileSystem.append(S3NativeFileSystem.java:588)
 at org.apache.hadoop.fs.FileSystem.append(FileSystem.java:1181)
 at com.amazon.ws.emr.hadoop.fs.EmrFileSystem.append(EmrFileSystem.java:295)
 at org.apache.spark.streaming.util.HdfsUtils$.getOutputStream(HdfsUtils.scala:35)
 at org.apache.spark.streaming.util.FileBasedWriteAheadLogWriter.stream$lzycompute(FileBasedWriteAheadLogWriter.scala:32)
 at org.apache.spark.streaming.util.FileBasedWriteAheadLogWriter.stream(FileBasedWriteAheadLogWriter.scala:32)
 at org.apache.spark.streaming.util.FileBasedWriteAheadLogWriter.<init>(FileBasedWriteAheadLogWriter.scala:35)
 at org.apache.spark.streaming.util.FileBasedWriteAheadLog.getLogWriter(FileBasedWriteAheadLog.scala:229)
 at org.apache.spark.streaming.util.FileBasedWriteAheadLog.write(FileBasedWriteAheadLog.scala:94)
 at org.apache.spark.streaming.util.FileBasedWriteAheadLog.write(FileBasedWriteAheadLog.scala:50)
 at org.apache.spark.streaming.util.BatchedWriteAheadLog.org$apache$spark$streaming$util$BatchedWriteAheadLog$$flushRecords(BatchedWriteAheadLog.scala:175)
 at org.apache.spark.streaming.util.BatchedWriteAheadLog$$anon$1.run(BatchedWriteAheadLog.scala:142)
 at java.lang.Thread.run(Thread.java:748)
 2020-01-27 06:42:29 WARN ReceivedBlockTracker:87 - Exception thrown while writing record: BlockAdditionEvent(ReceivedBlockInfo(0,Some(36),Some(SequenceNumberRanges(SequenceNumberRange(XXXXXXXXXXX,shardId-000000000006,49603657998853972269624727295162770770442241924489281634,49603657998853972269624727295206292099948368574778703970,36))),WriteAheadLogBasedStoreResult(input-0-1580106915391,Some(36),FileBasedWriteAheadLogSegment(s3://XXXXXXXXXXX/spark/checkpoint/XX/XXXXXXXXXXX/receivedData/0/log-1580107349000-1580107409000,0,31769)))) to the WriteAheadLog.
 org.apache.spark.SparkException: Exception thrown in awaitResult: 
 at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:226)
 at org.apache.spark.streaming.util.BatchedWriteAheadLog.write(BatchedWriteAheadLog.scala:84)
 at org.apache.spark.streaming.scheduler.ReceivedBlockTracker.writeToLog(ReceivedBlockTracker.scala:242)
 at org.apache.spark.streaming.scheduler.ReceivedBlockTracker.addBlock(ReceivedBlockTracker.scala:89)
 at org.apache.spark.streaming.scheduler.ReceiverTracker.org$apache$spark$streaming$scheduler$ReceiverTracker$$addBlock(ReceiverTracker.scala:347)
 at org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverTrackerEndpoint$$anonfun$receiveAndReply$1$$anon$1$$anonfun$run$1.apply$mcV$sp(ReceiverTracker.scala:522)
 at org.apache.spark.util.Utils$.tryLogNonFatalError(Utils.scala:1340)
 at org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverTrackerEndpoint$$anonfun$receiveAndReply$1$$anon$1.run(ReceiverTracker.scala:520)
 at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
 at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
 at java.lang.Thread.run(Thread.java:748)
 Caused by: java.io.IOException: Not supported
 at com.amazon.ws.emr.hadoop.fs.s3n.S3NativeFileSystem.append(S3NativeFileSystem.java:588)
 at org.apache.hadoop.fs.FileSystem.append(FileSystem.java:1181)
 at com.amazon.ws.emr.hadoop.fs.EmrFileSystem.append(EmrFileSystem.java:295)
 at org.apache.spark.streaming.util.HdfsUtils$.getOutputStream(HdfsUtils.scala:35)
 at org.apache.spark.streaming.util.FileBasedWriteAheadLogWriter.stream$lzycompute(FileBasedWriteAheadLogWriter.scala:32)
 at org.apache.spark.streaming.util.FileBasedWriteAheadLogWriter.stream(FileBasedWriteAheadLogWriter.scala:32)
 at org.apache.spark.streaming.util.FileBasedWriteAheadLogWriter.<init>(FileBasedWriteAheadLogWriter.scala:35)
 at org.apache.spark.streaming.util.FileBasedWriteAheadLog.getLogWriter(FileBasedWriteAheadLog.scala:229)
 at org.apache.spark.streaming.util.FileBasedWriteAheadLog.write(FileBasedWriteAheadLog.scala:94)
 at org.apache.spark.streaming.util.FileBasedWriteAheadLog.write(FileBasedWriteAheadLog.scala:50)
 at org.apache.spark.streaming.util.BatchedWriteAheadLog.org$apache$spark$streaming$util$BatchedWriteAheadLog$$flushRecords(BatchedWriteAheadLog.scala:175)
 at org.apache.spark.streaming.util.BatchedWriteAheadLog$$anon$1.run(BatchedWriteAheadLog.scala:142)
 ... 1 more
 2020-01-27 06:42:29 INFO BlockManagerInfo:54 - Added input-0-1580106915392 in memory on XXXXXXXXXXX:38393 (size: 31.0 KB, free: 3.4 GB)
 2020-01-27 06:42:29 INFO MultipartUploadOutputStream:414 - close closed:false s3://XXXXXXXXXXX/spark/checkpoint/XXXXXXXXXXX/XXXXXXXXXXX/receivedBlockMetadata/log-1580107349323-1580107409323
 2020-01-27 06:42:29 INFO BlockManagerInfo:54 - Added input-3-1580106915123 in memory on XXXXXXXXXXX:42027 (size: 25.9 KB, free: 3.4 GB)
 2020-01-27 06:42:29 INFO MultipartUploadOutputStream:414 - close closed:false s3://XXXXXXXXXXX/spark/checkpoint/XXXXXXXXXXX/XXXXXXXXXXX/receivedBlockMetadata/log-1580107349908-1580107409908
 2020-01-27 06:42:30 INFO BlockManagerInfo:54 - Added input-2-1580106915311 in memory on XXXXXXXXXXX:38393 (size: 29.3 KB, free: 3.4 GB)
 2020-01-27 06:42:30 INFO BlockManagerInfo:54 - Added input-0-1580106915393 in memory on XXXXXXXXXXX:38393 (size: 31.0 KB, free: 3.4 GB)
 2020-01-27 06:42:30 INFO MultipartUploadOutputStream:414 - close closed:false s3://XXXXXXXXXXX/spark/checkpoint/XXXXXXXXXXX/XXXXXXXXXXX/receivedBlockMetadata/log-1580107350000-1580107410000
 2020-01-27 06:42:30 INFO JobScheduler:54 - Added jobs for time 1580107350000 ms
 2020-01-27 06:42:30 INFO JobGenerator:54 - Checkpointing graph for time 1580107350000 ms
 2020-01-27 06:42:30 INFO DStreamGraph:54 - Updating checkpoint data for time 1580107350000 ms
 2020-01-27 06:42:30 INFO DStreamGraph:54 - Updated checkpoint data for time 1580107350000 ms
 2020-01-27 06:42:30 INFO CheckpointWriter:54 - Submitted checkpoint of time 1580107350000 ms to writer queue

 

Note : 
1. Writeahead logs and Checkpoint is being maitained in AWS S3 bucket

2. Spark submit Configuration as below:

spark-submit --deploy-mode cluster --executor-memory 4608M --driver-memory 4608M
--conf spark.yarn.driver.memoryOverhead=710M
--conf spark.yarn.executor.memoryOverhead=710M --driver-cores 3 --executor-cores 3
--conf spark.dynamicAllocation.minExecutors=1
--conf spark.dynamicAllocation.maxExecutors=2
--conf spark.dynamicAllocation.initialExecutors=2
--conf spark.locality.wait.node=0
--conf spark.dynamicAllocation.enabled=true
--conf maximizeResourceAllocation=false --class XXXXXXXXXXXX
--conf spark.streaming.driver.writeAheadLog.closeFileAfterWrite=true
--conf spark.scheduler.mode=FAIR
--conf spark.metrics.conf=XXXXXXXXXXXX.properties --files=s3://XXXXXXXXXXXX/XXXXXXXXXXXX.properties
--conf spark.streaming.receiver.writeAheadLog.closeFileAfterWrite=true
--conf spark.streaming.receiver.writeAheadLog.enable=true
--conf spark.streaming.receiver.blockStoreTimeout=59
--conf spark.streaming.driver.writeAheadLog.batchingTimeout=30000
--conf spark.streaming.receiver.maxRate=120 s3://XXXXXXXXXXXX/XXXXXXXXXXXX.jar yarn XXXXXXXXXXXX applicationContext-XXXXXXXXXXXX-streaming.xml root kinesis 60 &

3. EMR Version - 5.26

4. Hadoop Distribution - Amazon 2.8.5

5. Hardware Config
 * Master (3 instances - Multi Master Cluster)
c5.2xlarge
8 vCore, 16 GiB memory, EBS only storage
EBS Storage:64 GiB
 * Core (6 instances [Min - 2, Max - 6])
c5.4xlarge
16 vCore, 32 GiB memory, EBS only storage
EBS Storage:1000 GiB

6. There are 3 spark jobs running on the same cluster

7. Streaming - Kinesis

8. Cluster Config and Instance Config is attached

  was:
Spark job stopped processing when the number of shards is increased when the job is already running.

We have observed the below exceptions.

 

2020-01-27 06:42:29 WARN FileBasedWriteAheadLog_ReceivedBlockTracker:66 - Failed to write to write ahead log
2020-01-27 06:42:29 WARN FileBasedWriteAheadLog_ReceivedBlockTracker:66 - Failed to write to write ahead log
2020-01-27 06:42:29 ERROR FileBasedWriteAheadLog_ReceivedBlockTracker:70 - Failed to write to write ahead log after 3 failures
2020-01-27 06:42:29 WARN BatchedWriteAheadLog:87 - BatchedWriteAheadLog Writer failed to write ArrayBuffer(Record(java.nio.HeapByteBuffer[pos=0 lim=1845 cap=1845],1580107349095,Future(<not completed>)))
java.io.IOException: Not supported
 at com.amazon.ws.emr.hadoop.fs.s3n.S3NativeFileSystem.append(S3NativeFileSystem.java:588)
 at org.apache.hadoop.fs.FileSystem.append(FileSystem.java:1181)
 at com.amazon.ws.emr.hadoop.fs.EmrFileSystem.append(EmrFileSystem.java:295)
 at org.apache.spark.streaming.util.HdfsUtils$.getOutputStream(HdfsUtils.scala:35)
 at org.apache.spark.streaming.util.FileBasedWriteAheadLogWriter.stream$lzycompute(FileBasedWriteAheadLogWriter.scala:32)
 at org.apache.spark.streaming.util.FileBasedWriteAheadLogWriter.stream(FileBasedWriteAheadLogWriter.scala:32)
 at org.apache.spark.streaming.util.FileBasedWriteAheadLogWriter.<init>(FileBasedWriteAheadLogWriter.scala:35)
 at org.apache.spark.streaming.util.FileBasedWriteAheadLog.getLogWriter(FileBasedWriteAheadLog.scala:229)
 at org.apache.spark.streaming.util.FileBasedWriteAheadLog.write(FileBasedWriteAheadLog.scala:94)
 at org.apache.spark.streaming.util.FileBasedWriteAheadLog.write(FileBasedWriteAheadLog.scala:50)
 at org.apache.spark.streaming.util.BatchedWriteAheadLog.org$apache$spark$streaming$util$BatchedWriteAheadLog$$flushRecords(BatchedWriteAheadLog.scala:175)
 at org.apache.spark.streaming.util.BatchedWriteAheadLog$$anon$1.run(BatchedWriteAheadLog.scala:142)
 at java.lang.Thread.run(Thread.java:748)
2020-01-27 06:42:29 WARN ReceivedBlockTracker:87 - Exception thrown while writing record: BlockAdditionEvent(ReceivedBlockInfo(0,Some(36),Some(SequenceNumberRanges(SequenceNumberRange(XXXXXXXXXXX,shardId-000000000006,49603657998853972269624727295162770770442241924489281634,49603657998853972269624727295206292099948368574778703970,36))),WriteAheadLogBasedStoreResult(input-0-1580106915391,Some(36),FileBasedWriteAheadLogSegment(s3://XXXXXXXXXXX/spark/checkpoint/XX/XXXXXXXXXXX/receivedData/0/log-1580107349000-1580107409000,0,31769)))) to the WriteAheadLog.
org.apache.spark.SparkException: Exception thrown in awaitResult: 
 at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:226)
 at org.apache.spark.streaming.util.BatchedWriteAheadLog.write(BatchedWriteAheadLog.scala:84)
 at org.apache.spark.streaming.scheduler.ReceivedBlockTracker.writeToLog(ReceivedBlockTracker.scala:242)
 at org.apache.spark.streaming.scheduler.ReceivedBlockTracker.addBlock(ReceivedBlockTracker.scala:89)
 at org.apache.spark.streaming.scheduler.ReceiverTracker.org$apache$spark$streaming$scheduler$ReceiverTracker$$addBlock(ReceiverTracker.scala:347)
 at org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverTrackerEndpoint$$anonfun$receiveAndReply$1$$anon$1$$anonfun$run$1.apply$mcV$sp(ReceiverTracker.scala:522)
 at org.apache.spark.util.Utils$.tryLogNonFatalError(Utils.scala:1340)
 at org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverTrackerEndpoint$$anonfun$receiveAndReply$1$$anon$1.run(ReceiverTracker.scala:520)
 at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
 at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
 at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.IOException: Not supported
 at com.amazon.ws.emr.hadoop.fs.s3n.S3NativeFileSystem.append(S3NativeFileSystem.java:588)
 at org.apache.hadoop.fs.FileSystem.append(FileSystem.java:1181)
 at com.amazon.ws.emr.hadoop.fs.EmrFileSystem.append(EmrFileSystem.java:295)
 at org.apache.spark.streaming.util.HdfsUtils$.getOutputStream(HdfsUtils.scala:35)
 at org.apache.spark.streaming.util.FileBasedWriteAheadLogWriter.stream$lzycompute(FileBasedWriteAheadLogWriter.scala:32)
 at org.apache.spark.streaming.util.FileBasedWriteAheadLogWriter.stream(FileBasedWriteAheadLogWriter.scala:32)
 at org.apache.spark.streaming.util.FileBasedWriteAheadLogWriter.<init>(FileBasedWriteAheadLogWriter.scala:35)
 at org.apache.spark.streaming.util.FileBasedWriteAheadLog.getLogWriter(FileBasedWriteAheadLog.scala:229)
 at org.apache.spark.streaming.util.FileBasedWriteAheadLog.write(FileBasedWriteAheadLog.scala:94)
 at org.apache.spark.streaming.util.FileBasedWriteAheadLog.write(FileBasedWriteAheadLog.scala:50)
 at org.apache.spark.streaming.util.BatchedWriteAheadLog.org$apache$spark$streaming$util$BatchedWriteAheadLog$$flushRecords(BatchedWriteAheadLog.scala:175)
 at org.apache.spark.streaming.util.BatchedWriteAheadLog$$anon$1.run(BatchedWriteAheadLog.scala:142)
 ... 1 more
2020-01-27 06:42:29 INFO BlockManagerInfo:54 - Added input-0-1580106915392 in memory on XXXXXXXXXXX:38393 (size: 31.0 KB, free: 3.4 GB)
2020-01-27 06:42:29 INFO MultipartUploadOutputStream:414 - close closed:false s3://XXXXXXXXXXX/spark/checkpoint/XXXXXXXXXXX/XXXXXXXXXXX/receivedBlockMetadata/log-1580107349323-1580107409323
2020-01-27 06:42:29 INFO BlockManagerInfo:54 - Added input-3-1580106915123 in memory on XXXXXXXXXXX:42027 (size: 25.9 KB, free: 3.4 GB)
2020-01-27 06:42:29 INFO MultipartUploadOutputStream:414 - close closed:false s3://XXXXXXXXXXX/spark/checkpoint/XXXXXXXXXXX/XXXXXXXXXXX/receivedBlockMetadata/log-1580107349908-1580107409908
2020-01-27 06:42:30 INFO BlockManagerInfo:54 - Added input-2-1580106915311 in memory on XXXXXXXXXXX:38393 (size: 29.3 KB, free: 3.4 GB)
2020-01-27 06:42:30 INFO BlockManagerInfo:54 - Added input-0-1580106915393 in memory on XXXXXXXXXXX:38393 (size: 31.0 KB, free: 3.4 GB)
2020-01-27 06:42:30 INFO MultipartUploadOutputStream:414 - close closed:false s3://XXXXXXXXXXX/spark/checkpoint/XXXXXXXXXXX/XXXXXXXXXXX/receivedBlockMetadata/log-1580107350000-1580107410000
2020-01-27 06:42:30 INFO JobScheduler:54 - Added jobs for time 1580107350000 ms
2020-01-27 06:42:30 INFO JobGenerator:54 - Checkpointing graph for time 1580107350000 ms
2020-01-27 06:42:30 INFO DStreamGraph:54 - Updating checkpoint data for time 1580107350000 ms
2020-01-27 06:42:30 INFO DStreamGraph:54 - Updated checkpoint data for time 1580107350000 ms
2020-01-27 06:42:30 INFO CheckpointWriter:54 - Submitted checkpoint of time 1580107350000 ms to writer queue


> Spark Streaming Job stuck when Kinesis Shard is increased when the job is running
> ---------------------------------------------------------------------------------
>
>                 Key: SPARK-30677
>                 URL: https://issues.apache.org/jira/browse/SPARK-30677
>             Project: Spark
>          Issue Type: Bug
>          Components: Block Manager, Structured Streaming
>    Affects Versions: 2.4.3
>            Reporter: Mullaivendhan Ariaputhri
>            Priority: Major
>
> Spark job stopped processing when the number of shards is increased when the job is already running.
> We have observed the below exceptions.
>  
> 2020-01-27 06:42:29 WARN FileBasedWriteAheadLog_ReceivedBlockTracker:66 - Failed to write to write ahead log
>  2020-01-27 06:42:29 WARN FileBasedWriteAheadLog_ReceivedBlockTracker:66 - Failed to write to write ahead log
>  2020-01-27 06:42:29 ERROR FileBasedWriteAheadLog_ReceivedBlockTracker:70 - Failed to write to write ahead log after 3 failures
>  2020-01-27 06:42:29 WARN BatchedWriteAheadLog:87 - BatchedWriteAheadLog Writer failed to write ArrayBuffer(Record(java.nio.HeapByteBuffer[pos=0 lim=1845 cap=1845],1580107349095,Future(<not completed>)))
>  java.io.IOException: Not supported
>  at com.amazon.ws.emr.hadoop.fs.s3n.S3NativeFileSystem.append(S3NativeFileSystem.java:588)
>  at org.apache.hadoop.fs.FileSystem.append(FileSystem.java:1181)
>  at com.amazon.ws.emr.hadoop.fs.EmrFileSystem.append(EmrFileSystem.java:295)
>  at org.apache.spark.streaming.util.HdfsUtils$.getOutputStream(HdfsUtils.scala:35)
>  at org.apache.spark.streaming.util.FileBasedWriteAheadLogWriter.stream$lzycompute(FileBasedWriteAheadLogWriter.scala:32)
>  at org.apache.spark.streaming.util.FileBasedWriteAheadLogWriter.stream(FileBasedWriteAheadLogWriter.scala:32)
>  at org.apache.spark.streaming.util.FileBasedWriteAheadLogWriter.<init>(FileBasedWriteAheadLogWriter.scala:35)
>  at org.apache.spark.streaming.util.FileBasedWriteAheadLog.getLogWriter(FileBasedWriteAheadLog.scala:229)
>  at org.apache.spark.streaming.util.FileBasedWriteAheadLog.write(FileBasedWriteAheadLog.scala:94)
>  at org.apache.spark.streaming.util.FileBasedWriteAheadLog.write(FileBasedWriteAheadLog.scala:50)
>  at org.apache.spark.streaming.util.BatchedWriteAheadLog.org$apache$spark$streaming$util$BatchedWriteAheadLog$$flushRecords(BatchedWriteAheadLog.scala:175)
>  at org.apache.spark.streaming.util.BatchedWriteAheadLog$$anon$1.run(BatchedWriteAheadLog.scala:142)
>  at java.lang.Thread.run(Thread.java:748)
>  2020-01-27 06:42:29 WARN ReceivedBlockTracker:87 - Exception thrown while writing record: BlockAdditionEvent(ReceivedBlockInfo(0,Some(36),Some(SequenceNumberRanges(SequenceNumberRange(XXXXXXXXXXX,shardId-000000000006,49603657998853972269624727295162770770442241924489281634,49603657998853972269624727295206292099948368574778703970,36))),WriteAheadLogBasedStoreResult(input-0-1580106915391,Some(36),FileBasedWriteAheadLogSegment(s3://XXXXXXXXXXX/spark/checkpoint/XX/XXXXXXXXXXX/receivedData/0/log-1580107349000-1580107409000,0,31769)))) to the WriteAheadLog.
>  org.apache.spark.SparkException: Exception thrown in awaitResult: 
>  at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:226)
>  at org.apache.spark.streaming.util.BatchedWriteAheadLog.write(BatchedWriteAheadLog.scala:84)
>  at org.apache.spark.streaming.scheduler.ReceivedBlockTracker.writeToLog(ReceivedBlockTracker.scala:242)
>  at org.apache.spark.streaming.scheduler.ReceivedBlockTracker.addBlock(ReceivedBlockTracker.scala:89)
>  at org.apache.spark.streaming.scheduler.ReceiverTracker.org$apache$spark$streaming$scheduler$ReceiverTracker$$addBlock(ReceiverTracker.scala:347)
>  at org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverTrackerEndpoint$$anonfun$receiveAndReply$1$$anon$1$$anonfun$run$1.apply$mcV$sp(ReceiverTracker.scala:522)
>  at org.apache.spark.util.Utils$.tryLogNonFatalError(Utils.scala:1340)
>  at org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverTrackerEndpoint$$anonfun$receiveAndReply$1$$anon$1.run(ReceiverTracker.scala:520)
>  at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>  at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>  at java.lang.Thread.run(Thread.java:748)
>  Caused by: java.io.IOException: Not supported
>  at com.amazon.ws.emr.hadoop.fs.s3n.S3NativeFileSystem.append(S3NativeFileSystem.java:588)
>  at org.apache.hadoop.fs.FileSystem.append(FileSystem.java:1181)
>  at com.amazon.ws.emr.hadoop.fs.EmrFileSystem.append(EmrFileSystem.java:295)
>  at org.apache.spark.streaming.util.HdfsUtils$.getOutputStream(HdfsUtils.scala:35)
>  at org.apache.spark.streaming.util.FileBasedWriteAheadLogWriter.stream$lzycompute(FileBasedWriteAheadLogWriter.scala:32)
>  at org.apache.spark.streaming.util.FileBasedWriteAheadLogWriter.stream(FileBasedWriteAheadLogWriter.scala:32)
>  at org.apache.spark.streaming.util.FileBasedWriteAheadLogWriter.<init>(FileBasedWriteAheadLogWriter.scala:35)
>  at org.apache.spark.streaming.util.FileBasedWriteAheadLog.getLogWriter(FileBasedWriteAheadLog.scala:229)
>  at org.apache.spark.streaming.util.FileBasedWriteAheadLog.write(FileBasedWriteAheadLog.scala:94)
>  at org.apache.spark.streaming.util.FileBasedWriteAheadLog.write(FileBasedWriteAheadLog.scala:50)
>  at org.apache.spark.streaming.util.BatchedWriteAheadLog.org$apache$spark$streaming$util$BatchedWriteAheadLog$$flushRecords(BatchedWriteAheadLog.scala:175)
>  at org.apache.spark.streaming.util.BatchedWriteAheadLog$$anon$1.run(BatchedWriteAheadLog.scala:142)
>  ... 1 more
>  2020-01-27 06:42:29 INFO BlockManagerInfo:54 - Added input-0-1580106915392 in memory on XXXXXXXXXXX:38393 (size: 31.0 KB, free: 3.4 GB)
>  2020-01-27 06:42:29 INFO MultipartUploadOutputStream:414 - close closed:false s3://XXXXXXXXXXX/spark/checkpoint/XXXXXXXXXXX/XXXXXXXXXXX/receivedBlockMetadata/log-1580107349323-1580107409323
>  2020-01-27 06:42:29 INFO BlockManagerInfo:54 - Added input-3-1580106915123 in memory on XXXXXXXXXXX:42027 (size: 25.9 KB, free: 3.4 GB)
>  2020-01-27 06:42:29 INFO MultipartUploadOutputStream:414 - close closed:false s3://XXXXXXXXXXX/spark/checkpoint/XXXXXXXXXXX/XXXXXXXXXXX/receivedBlockMetadata/log-1580107349908-1580107409908
>  2020-01-27 06:42:30 INFO BlockManagerInfo:54 - Added input-2-1580106915311 in memory on XXXXXXXXXXX:38393 (size: 29.3 KB, free: 3.4 GB)
>  2020-01-27 06:42:30 INFO BlockManagerInfo:54 - Added input-0-1580106915393 in memory on XXXXXXXXXXX:38393 (size: 31.0 KB, free: 3.4 GB)
>  2020-01-27 06:42:30 INFO MultipartUploadOutputStream:414 - close closed:false s3://XXXXXXXXXXX/spark/checkpoint/XXXXXXXXXXX/XXXXXXXXXXX/receivedBlockMetadata/log-1580107350000-1580107410000
>  2020-01-27 06:42:30 INFO JobScheduler:54 - Added jobs for time 1580107350000 ms
>  2020-01-27 06:42:30 INFO JobGenerator:54 - Checkpointing graph for time 1580107350000 ms
>  2020-01-27 06:42:30 INFO DStreamGraph:54 - Updating checkpoint data for time 1580107350000 ms
>  2020-01-27 06:42:30 INFO DStreamGraph:54 - Updated checkpoint data for time 1580107350000 ms
>  2020-01-27 06:42:30 INFO CheckpointWriter:54 - Submitted checkpoint of time 1580107350000 ms to writer queue
>  
> Note : 
> 1. Writeahead logs and Checkpoint is being maitained in AWS S3 bucket
> 2. Spark submit Configuration as below:
> spark-submit --deploy-mode cluster --executor-memory 4608M --driver-memory 4608M
> --conf spark.yarn.driver.memoryOverhead=710M
> --conf spark.yarn.executor.memoryOverhead=710M --driver-cores 3 --executor-cores 3
> --conf spark.dynamicAllocation.minExecutors=1
> --conf spark.dynamicAllocation.maxExecutors=2
> --conf spark.dynamicAllocation.initialExecutors=2
> --conf spark.locality.wait.node=0
> --conf spark.dynamicAllocation.enabled=true
> --conf maximizeResourceAllocation=false --class XXXXXXXXXXXX
> --conf spark.streaming.driver.writeAheadLog.closeFileAfterWrite=true
> --conf spark.scheduler.mode=FAIR
> --conf spark.metrics.conf=XXXXXXXXXXXX.properties --files=s3://XXXXXXXXXXXX/XXXXXXXXXXXX.properties
> --conf spark.streaming.receiver.writeAheadLog.closeFileAfterWrite=true
> --conf spark.streaming.receiver.writeAheadLog.enable=true
> --conf spark.streaming.receiver.blockStoreTimeout=59
> --conf spark.streaming.driver.writeAheadLog.batchingTimeout=30000
> --conf spark.streaming.receiver.maxRate=120 s3://XXXXXXXXXXXX/XXXXXXXXXXXX.jar yarn XXXXXXXXXXXX applicationContext-XXXXXXXXXXXX-streaming.xml root kinesis 60 &
> 3. EMR Version - 5.26
> 4. Hadoop Distribution - Amazon 2.8.5
> 5. Hardware Config
>  * Master (3 instances - Multi Master Cluster)
> c5.2xlarge
> 8 vCore, 16 GiB memory, EBS only storage
> EBS Storage:64 GiB
>  * Core (6 instances [Min - 2, Max - 6])
> c5.4xlarge
> 16 vCore, 32 GiB memory, EBS only storage
> EBS Storage:1000 GiB
> 6. There are 3 spark jobs running on the same cluster
> 7. Streaming - Kinesis
> 8. Cluster Config and Instance Config is attached



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org