You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by GitBox <gi...@apache.org> on 2021/12/31 04:29:39 UTC

[GitHub] [spark] pan3793 opened a new pull request #35076: [SPARK-37793][CORE][SHUFFLE] Fallback to fetch original blocks when noLocalMergedBlockDataError

pan3793 opened a new pull request #35076:
URL: https://github.com/apache/spark/pull/35076


   ### What changes were proposed in this pull request?
   
   When enable push-based shuffle, there is a chance that task hang at 
   
   ```
   59	Executor task launch worker for task 424.0 in stage 753.0 (TID 106778)	WAITING	Lock(java.util.concurrent.ThreadPoolExecutor$Worker@1660371198})
   sun.misc.Unsafe.park(Native Method)
   java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
   java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2044)
   java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)
   org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:756)
   org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:85)
   org.apache.spark.util.CompletionIterator.next(CompletionIterator.scala:29)
   scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:486)
   scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:492)
   scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
   org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:31)
   org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
   scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
   org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage6.sort_addToSorter_0$(Unknown Source)
   org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage6.processNext(Unknown Source)
   org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
   org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:760)
   org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage7.smj_findNextJoinRows_0$(Unknown Source)
   org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage7.hashAgg_doAggregateWithKeys_1$(Unknown Source)
   org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage7.hashAgg_doAggregateWithKeys_0$(Unknown Source)
   org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage7.hashAgg_doAggregateWithoutKey_0$(Unknown Source)
   org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage7.processNext(Unknown Source)
   org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
   org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$2.hasNext(WholeStageCodegenExec.scala:779)
   scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
   org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:140)
   org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
   org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
   org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
   org.apache.spark.scheduler.Task.run(Task.scala:136)
   org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:507)
   org.apache.spark.executor.Executor$TaskRunner$$Lambda$518/852390142.apply(Unknown Source)
   org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1470)
   org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:510)
   java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
   java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
   java.lang.Thread.run(Thread.java:748)
   ```
   
   And `org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:756)` is 
   ```
   while (result == null) {
     ...
     result = results.take() // line 756
     ...
   }
   ```
   
   After some investigations, found that the last `FetchResult` put into `result` is `PushMergedLocalMetaFetchResult`, and there is a chance that `bufs` is empty, will cause no `SuccessFetchResult` be added to `results`, and thread hang if no other `FetchResult` is put into `results`.
   
   ```scala
   while (result == null) {
     ...
     result = results.take()
     ...
   
     result match {
       case r @ SuccessFetchResult(blockId, mapIndex, address, size, buf, isNetworkReqDone) =>
         ...
         case PushMergedLocalMetaFetchResult(
           shuffleId, shuffleMergeId, reduceId, bitmaps, localDirs) =>
           val shuffleBlockId = ShuffleMergedBlockId(shuffleId, shuffleMergeId, reduceId)
           try {
             val bufs: Seq[ManagedBuffer] = blockManager.getLocalMergedBlockData(shuffleBlockId,
               localDirs)
             // THERE IS A CHANCE THAT bufs.isEmpty!
             ...
             bufs.zipWithIndex.foreach { case (buf, chunkId) =>
               buf.retain()
               val shuffleChunkId = ShuffleBlockChunkId(shuffleId, shuffleMergeId, reduceId,
                 chunkId)
               pushBasedFetchHelper.addChunk(shuffleChunkId, bitmaps(chunkId))
               results.put(SuccessFetchResult(shuffleChunkId, SHUFFLE_PUSH_MAP_ID,
                 pushBasedFetchHelper.localShuffleMergerBlockMgrId, buf.size(), buf,
                 isNetworkReqDone = false))
             }
           } catch {
             case e: Exception =>
               pushBasedFetchHelper.initiateFallbackFetchForPushMergedBlock(
                 shuffleBlockId, pushBasedFetchHelper.localShuffleMergerBlockMgrId)
           }
           result = null
       ...
     }
   }
   ```
   
   ### Why are the changes needed?
   
   Fallback to fetch original blocks when noLocalMergedBlockDataError to avoid task hang.
   
   ### Does this PR introduce _any_ user-facing change?
   Bug fix, to make push-based shuffle more stable.
   
   ### How was this patch tested?
   Pass 1T TPC-DS tests


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] pan3793 edited a comment on pull request #35076: [SPARK-37793][CORE][SHUFFLE] Fallback to fetch original blocks when noLocalMergedBlockDataError

Posted by GitBox <gi...@apache.org>.
pan3793 edited a comment on pull request #35076:
URL: https://github.com/apache/spark/pull/35076#issuecomment-1005368068


   @otterc I agree with you that `bufs` should not be empty in design, and #34934 also does. 
   
   Besides those 2 issues, I also met the shuffle data corrupt issues frequently.
   ```
   Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Task 144 in stage 1921.0 failed 4 times, most recent failure: Lost task 144.3 in stage 1921.0 (TID 139025) (beta-spark4 executor 85): java.io.EOFException: reached end of stream after reading 46 bytes; 48 bytes expected
   	at org.sparkproject.guava.io.ByteStreams.readFully(ByteStreams.java:735)
   	at org.apache.spark.sql.execution.UnsafeRowSerializerInstance$$anon$2$$anon$3.next(UnsafeRowSerializer.scala:127)
   	at org.apache.spark.sql.execution.UnsafeRowSerializerInstance$$anon$2$$anon$3.next(UnsafeRowSerializer.scala:110)
           ...
   ```
   Both hardware(disk) issue and network issue may cause shuffle data corruption, and due to the lack of checksum mechanism of push-based shuffle, there is a chance we pass the corrupt data to `xxSerializer` layer, then cause exception and fail the task.
   
   So I think except to the code bug, there still has opportunity to read the corrupt metadata from disk/network, even the possibility is lower than shuffle data because metadata usually smaller, and when it happens, fallback to fetch the original blocks should be safe.
   
   With this patch and #34934, the data corruption is the only critical issue(I mean can fail the job) in our dozen rounds of 1T TPC-DS test, and I think add the checksum should solve that issue.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] pan3793 edited a comment on pull request #35076: [SPARK-37793][CORE][SHUFFLE] Fallback to fetch original blocks when noLocalMergedBlockDataError

Posted by GitBox <gi...@apache.org>.
pan3793 edited a comment on pull request #35076:
URL: https://github.com/apache/spark/pull/35076#issuecomment-1005368068


   @otterc I agree with you that `bufs` should not be empty in design, and #34934 also does. 
   
   Besides those 2 issues, I also met the shuffle data corrupt issues frequently.
   ```
   Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Task 144 in stage 1921.0 failed 4 times, most recent failure: Lost task 144.3 in stage 1921.0 (TID 139025) (beta-spark4 executor 85): java.io.EOFException: reached end of stream after reading 46 bytes; 48 bytes expected
   	at org.sparkproject.guava.io.ByteStreams.readFully(ByteStreams.java:735)
   	at org.apache.spark.sql.execution.UnsafeRowSerializerInstance$$anon$2$$anon$3.next(UnsafeRowSerializer.scala:127)
   	at org.apache.spark.sql.execution.UnsafeRowSerializerInstance$$anon$2$$anon$3.next(UnsafeRowSerializer.scala:110)
           ...
   ```
   Both hardware(disk) issue and network issue may cause shuffle data corruption, and due to the lack of checksum mechanism of push-based shuffle, there is a chance we pass the corrupt data to `xxSerializer` layer, then cause exception and fail the task.
   
   So I think except to the code bug, there still has opportunity to read the corrupt metadata from disk/network, even the possibility is lower than shuffle data because metadata usually smaller, and when it happens, fallback to fetch the original blocks should be safe.
   
   With this patch and #34934, the data corruption is the only issue in our dozen rounds of 1T TPC-DS test, and I think add the checksum should solve that issue.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] pan3793 edited a comment on pull request #35076: [SPARK-37793][CORE][SHUFFLE] Fallback to fetch original blocks when noLocalMergedBlockDataError

Posted by GitBox <gi...@apache.org>.
pan3793 edited a comment on pull request #35076:
URL: https://github.com/apache/spark/pull/35076#issuecomment-1005118384


   No particular query, but easy to reproduce when run all 1T TPCDS queries.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] pan3793 commented on pull request #35076: [SPARK-37793][CORE][SHUFFLE] Fallback to fetch original blocks when noLocalMergedBlockDataError

Posted by GitBox <gi...@apache.org>.
pan3793 commented on pull request #35076:
URL: https://github.com/apache/spark/pull/35076#issuecomment-1009617312


   @otterc Thanks for reply. I misunderstanded the code at first, if any `truncate` failed, the metadata of that partition will NOT be added to the `mergeStatuses`, then `mergeStatuses` should always exactly be consistent with the files on disk.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] mridulm edited a comment on pull request #35076: [SPARK-37793][CORE][SHUFFLE] Fallback to fetch original blocks when noLocalMergedBlockDataError

Posted by GitBox <gi...@apache.org>.
mridulm edited a comment on pull request #35076:
URL: https://github.com/apache/spark/pull/35076#issuecomment-1008446824


   I had written up my comment before your log messages @pan3793.
   What I had detailed is a potential bug - whether that is impacting this specific issue needs to be seen, though looks unlikely.
   
   The discrepancy between data file size and data length seems to indicate some other issue here - though I cant make much out from the details provided unfortunately: particularly the file sizes are extremely confusing observation.
   I will let @otterc reproduce.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] pan3793 commented on pull request #35076: [SPARK-37793][CORE][SHUFFLE] Fallback to fetch original blocks when noLocalMergedBlockDataError

Posted by GitBox <gi...@apache.org>.
pan3793 commented on pull request #35076:
URL: https://github.com/apache/spark/pull/35076#issuecomment-1007354842


   Hi @otterc I got more information for this issue.
   
   Add assertion and debug log in `RemoteBlockPushResolver`(ESS side)
   
   ```java
   public MergeStatuses finalizeShuffleMerge(FinalizeShuffleMerge msg) {
       ...
       for (AppShufflePartitionInfo partition: shuffleMergePartitions.values()) {
           synchronized (partition) {
             try {
               // This can throw IOException which will marks this shuffle partition as not merged.
               partition.finalizePartition();
               bitmaps.add(partition.mapTracker);
               reduceIds.add(partition.reduceId);
               sizes.add(partition.getLastChunkOffset());
             } catch (IOException ioe) {
               logger.warn("Exception while finalizing shuffle partition {}_{} {} {}", msg.appId,
                 msg.appAttemptId, msg.shuffleId, partition.reduceId, ioe);
             } finally {
               partition.closeAllFilesAndDeleteIfNeeded(false);
             }
           }
           assert partition.dataFile.length() == partition.lastChunkOffset;
           assert partition.indexFile.file.length() == partition.indexFile.getPos();
           assert partition.metaFile.file.length() == partition.metaFile.getPos();
           logger.info("shuffle partition {}_{} {} {}, chunk_size={}, meta_length={}, data_length={}",
                   msg.appId, msg.appAttemptId, msg.shuffleId, partition.reduceId,
                   partition.indexFile.getPos() / 8 - 1,
                   partition.metaFile.getPos(),
                   partition.lastChunkOffset);
         }
         mergeStatuses = new MergeStatuses(msg.shuffleId, msg.shuffleMergeId,
           bitmaps.toArray(new RoaringBitmap[0]), Ints.toArray(reduceIds),
           Longs.toArray(sizes));
       }
       ...
   }
   ```
   
   ```
   2022-01-07 19:40:46,795 INFO shuffle.RemoteBlockPushResolver: shuffle partition application_1640143179334_0148_-1 126 4877, chunk_size=1, meta_length=18, data_length=157
   ```
   
   Add assertion and debug log in `IndexShuffleBlockResolver`(Reducer side)
   ```scala
   override def getMergedBlockData(
       blockId: ShuffleMergedBlockId,
       dirs: Option[Array[String]]): Seq[ManagedBuffer] = {
     val indexFile =
       getMergedBlockIndexFile(conf.getAppId, blockId.shuffleId, blockId.shuffleMergeId,
         blockId.reduceId, dirs)
     val dataFile = getMergedBlockDataFile(conf.getAppId, blockId.shuffleId,
       blockId.shuffleMergeId, blockId.reduceId, dirs)
     val metaFile = getMergedBlockMetaFile(conf.getAppId, blockId.shuffleId,
       blockId.shuffleMergeId, blockId.reduceId, dirs)
     // Load all the indexes in order to identify all chunks in the specified merged shuffle file.
     val size = indexFile.length.toInt
     val offsets = Utils.tryWithResource {
       new DataInputStream(Files.newInputStream(indexFile.toPath))
     } { dis =>
       val buffer = ByteBuffer.allocate(size)
       dis.readFully(buffer.array)
       buffer.asLongBuffer
     }
     // Number of chunks is number of indexes - 1
     val numChunks = size / 8 - 1
     if (numChunks == 0) {
       val indexBackupPath = java.nio.file.Paths.get(s"/tmp/${indexFile.toPath.getFileName}")
       val dataBackupPath = java.nio.file.Paths.get(s"/tmp/${dataFile.toPath.getFileName}")
       val metaBackupPath = java.nio.file.Paths.get(s"/tmp/${metaFile.toPath.getFileName}")
       logError(s"$blockId chunk_size is 0, " +
         s"index_file is $indexFile, backup to $indexBackupPath" +
         s"data_file is $dataFile, backup to $dataBackupPath" +
         s"meta_file is $metaFile, backup to $metaBackupPath")
       Files.copy(indexFile.toPath, indexBackupPath)
       Files.copy(dataFile.toPath, dataBackupPath)
       Files.copy(metaFile.toPath, metaBackupPath)
       assert(false)
     }
     for (index <- 0 until numChunks) yield {
       new FileSegmentManagedBuffer(transportConf, dataFile,
         offsets.get(index),
         offsets.get(index + 1) - offsets.get(index))
     }
   }
   ```
   
   Then I run TPCDS several rounds and reproduce the exception.
   
   ```log
   01-07 19:42:08 INFO [dag-scheduler-event-loop] scheduler.DAGScheduler@61: ShuffleMapStage 453 (save at QueryRunner.scala:98) failed in 1.811 s due to Job aborted due to stage failure: Task 122 in stage 453.0 failed 4 times, most recent failure: Lost task 122.3 in stage 453.0 (TID 278831) (beta-spark4 executor 562): java.lang.AssertionError: assertion failed
   	at scala.Predef$.assert(Predef.scala:208)
   	at org.apache.spark.shuffle.IndexShuffleBlockResolver.getMergedBlockData(IndexShuffleBlockResolver.scala:504)
   	at org.apache.spark.storage.BlockManager.getLocalMergedBlockData(BlockManager.scala:777)
   	at org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:945)
   	at org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:85)
   	at org.apache.spark.util.CompletionIterator.next(CompletionIterator.scala:29)
   ```
   
   ```
   root@beta-spark4:/tmp# ls -l shuffleMerged_application_1640143179334_0148_126_0_4877*
   -rw-r--r-- 1 root root 16036 Jan  7 19:41 shuffleMerged_application_1640143179334_0148_126_0_4877.data
   -rw-r--r-- 1 root root     8 Jan  7 19:41 shuffleMerged_application_1640143179334_0148_126_0_4877.index
   -rw-r--r-- 1 root root     0 Jan  7 19:41 shuffleMerged_application_1640143179334_0148_126_0_4877.meta
   ```
   
   So, the ESS and reduce task running on same machine, and ESS closed the 'data', 'index', 'meta' files and reported the there size as `chunk_size=1, meta_length=18, data_length=157`, these metadata also return to driver and pass to reduce task, but when reduce task read the file from disk, the data is not match!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] pan3793 edited a comment on pull request #35076: [SPARK-37793][CORE][SHUFFLE] Fallback to fetch original blocks when noLocalMergedBlockDataError

Posted by GitBox <gi...@apache.org>.
pan3793 edited a comment on pull request #35076:
URL: https://github.com/apache/spark/pull/35076#issuecomment-1007354842


   Hi @otterc I got more information for this issue.
   
   Add assertion and debug log in `RemoteBlockPushResolver`(ESS side)
   
   ```java
   public MergeStatuses finalizeShuffleMerge(FinalizeShuffleMerge msg) {
       ...
       for (AppShufflePartitionInfo partition: shuffleMergePartitions.values()) {
           synchronized (partition) {
             try {
               // This can throw IOException which will marks this shuffle partition as not merged.
               partition.finalizePartition();
               bitmaps.add(partition.mapTracker);
               reduceIds.add(partition.reduceId);
               sizes.add(partition.getLastChunkOffset());
             } catch (IOException ioe) {
               logger.warn("Exception while finalizing shuffle partition {}_{} {} {}", msg.appId,
                 msg.appAttemptId, msg.shuffleId, partition.reduceId, ioe);
             } finally {
               partition.closeAllFilesAndDeleteIfNeeded(false);
             }
           }
   +       assert partition.dataFile.length() == partition.lastChunkOffset;
   +       assert partition.indexFile.file.length() == partition.indexFile.getPos();
   +       assert partition.metaFile.file.length() == partition.metaFile.getPos();
   +       logger.info("shuffle partition {}_{} {} {}, chunk_size={}, meta_length={}, data_length={}",
   +              msg.appId, msg.appAttemptId, msg.shuffleId, partition.reduceId,
   +              partition.indexFile.getPos() / 8 - 1,
   +              partition.metaFile.getPos(),
   +              partition.lastChunkOffset);
        }
         mergeStatuses = new MergeStatuses(msg.shuffleId, msg.shuffleMergeId,
           bitmaps.toArray(new RoaringBitmap[0]), Ints.toArray(reduceIds),
           Longs.toArray(sizes));
       }
       ...
   }
   ```
   
   Add assertion and debug log in `IndexShuffleBlockResolver`(Reducer side)
   ```scala
   override def getMergedBlockData(
       blockId: ShuffleMergedBlockId,
       dirs: Option[Array[String]]): Seq[ManagedBuffer] = {
     val indexFile =
       getMergedBlockIndexFile(conf.getAppId, blockId.shuffleId, blockId.shuffleMergeId,
         blockId.reduceId, dirs)
     val dataFile = getMergedBlockDataFile(conf.getAppId, blockId.shuffleId,
       blockId.shuffleMergeId, blockId.reduceId, dirs)
     val metaFile = getMergedBlockMetaFile(conf.getAppId, blockId.shuffleId,
       blockId.shuffleMergeId, blockId.reduceId, dirs)
     // Load all the indexes in order to identify all chunks in the specified merged shuffle file.
     val size = indexFile.length.toInt
     val offsets = Utils.tryWithResource {
       new DataInputStream(Files.newInputStream(indexFile.toPath))
     } { dis =>
       val buffer = ByteBuffer.allocate(size)
       dis.readFully(buffer.array)
       buffer.asLongBuffer
     }
     // Number of chunks is number of indexes - 1
     val numChunks = size / 8 - 1
   + if (numChunks == 0) {
   +   val indexBackupPath = java.nio.file.Paths.get(s"/tmp/${indexFile.toPath.getFileName}")
   +   val dataBackupPath = java.nio.file.Paths.get(s"/tmp/${dataFile.toPath.getFileName}")
   +   val metaBackupPath = java.nio.file.Paths.get(s"/tmp/${metaFile.toPath.getFileName}")
   +   logError(s"$blockId chunk_size is 0, " +
   +      s"index_file is $indexFile, backup to $indexBackupPath" +
   +      s"data_file is $dataFile, backup to $dataBackupPath" +
   +      s"meta_file is $metaFile, backup to $metaBackupPath")
   +   Files.copy(indexFile.toPath, indexBackupPath)
   +   Files.copy(dataFile.toPath, dataBackupPath)
   +   Files.copy(metaFile.toPath, metaBackupPath)
   +   assert(false)
     }
     for (index <- 0 until numChunks) yield {
       new FileSegmentManagedBuffer(transportConf, dataFile,
         offsets.get(index),
         offsets.get(index + 1) - offsets.get(index))
     }
   }
   ```
   
   Then I run TPCDS several rounds and reproduce the exception.
   
   Assertion failed in reduce task side.
   ```log
   01-07 19:42:08 INFO [dag-scheduler-event-loop] scheduler.DAGScheduler@61: ShuffleMapStage 453 (save at QueryRunner.scala:98) failed in 1.811 s due to Job aborted due to stage failure: Task 122 in stage 453.0 failed 4 times, most recent failure: Lost task 122.3 in stage 453.0 (TID 278831) (beta-spark4 executor 562): java.lang.AssertionError: assertion failed
   	at scala.Predef$.assert(Predef.scala:208)
   	at org.apache.spark.shuffle.IndexShuffleBlockResolver.getMergedBlockData(IndexShuffleBlockResolver.scala:504)
   	at org.apache.spark.storage.BlockManager.getLocalMergedBlockData(BlockManager.scala:777)
   	at org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:945)
   	at org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:85)
   	at org.apache.spark.util.CompletionIterator.next(CompletionIterator.scala:29)
   ```
   
   ESS logs
   ```
   2022-01-07 19:40:46,795 INFO shuffle.RemoteBlockPushResolver: shuffle partition application_1640143179334_0148_-1 126 4877, chunk_size=1, meta_length=18, data_length=157
   ```
   
   Reduce task backup merged shuffle files 
   ```
   root@beta-spark4:/tmp# ls -l shuffleMerged_application_1640143179334_0148_126_0_4877*
   -rw-r--r-- 1 root root 16036 Jan  7 19:41 shuffleMerged_application_1640143179334_0148_126_0_4877.data
   -rw-r--r-- 1 root root     8 Jan  7 19:41 shuffleMerged_application_1640143179334_0148_126_0_4877.index
   -rw-r--r-- 1 root root     0 Jan  7 19:41 shuffleMerged_application_1640143179334_0148_126_0_4877.meta
   ```
   
   So, the ESS and reduce task running on same machine, and ESS closed the 'data', 'index', 'meta' files and reported the there size as `chunk_size=1, meta_length=18, data_length=157`, these metadata also return to driver and pass to reduce task, but when reduce task read the file from disk, the data is not match!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] pan3793 commented on pull request #35076: [SPARK-37793][CORE][SHUFFLE] Fallback to fetch original blocks when noLocalMergedBlockDataError

Posted by GitBox <gi...@apache.org>.
pan3793 commented on pull request #35076:
URL: https://github.com/apache/spark/pull/35076#issuecomment-1005881779


   Update:
   
   After https://github.com/apache/spark/commit/f6128a6f4215dc45a19209d799dd9bf98fab6d8a, I run 3 rounds of 1T TPC-DS, don't see neither this issue nor the issue mentioned in #34934, but the following issue occurs frequently, it cause shufle retry rather than fail the job, and usually the stage can success in retry.
   
   ```
   FetchFailed(null, shuffleId=15, mapIndex=-1, mapId=-1, reduceId=838, message=
   org.apache.spark.shuffle.MetadataFetchFailedException: Missing an output location for shuffle 15 partition 838
   	at org.apache.spark.MapOutputTracker$.validateStatus(MapOutputTracker.scala:1619)
   	at org.apache.spark.MapOutputTracker$.$anonfun$convertMapStatuses$7(MapOutputTracker.scala:1555)
   	at scala.collection.TraversableLike$WithFilter.$anonfun$foreach$1(TraversableLike.scala:985)
   	at scala.collection.Iterator.foreach(Iterator.scala:943)
   	at scala.collection.Iterator.foreach$(Iterator.scala:943)
   	at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
   	at scala.collection.IterableLike.foreach(IterableLike.scala:74)
   	at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
   	at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
   	at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:984)
   	at org.apache.spark.MapOutputTracker$.$anonfun$convertMapStatuses$4(MapOutputTracker.scala:1554)
   	at org.apache.spark.MapOutputTracker$.$anonfun$convertMapStatuses$4$adapted(MapOutputTracker.scala:1535)
   	at scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36)
   	at scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:33)
   	at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:198)
   	at org.apache.spark.MapOutputTracker$.convertMapStatuses(MapOutputTracker.scala:1535)
   	at org.apache.spark.MapOutputTrackerWorker.getMapSizesByExecutorIdImpl(MapOutputTracker.scala:1230)
   	at org.apache.spark.MapOutputTrackerWorker.getPushBasedShuffleMapSizesByExecutorId(MapOutputTracker.scala:1204)
   	at org.apache.spark.shuffle.sort.SortShuffleManager.getReader(SortShuffleManager.scala:136)
   	at org.apache.spark.shuffle.ShuffleManager.getReader(ShuffleManager.scala:63)
   	at org.apache.spark.shuffle.ShuffleManager.getReader$(ShuffleManager.scala:57)
   	at org.apache.spark.shuffle.sort.SortShuffleManager.getReader(SortShuffleManager.scala:73)
   	at org.apache.spark.sql.execution.ShuffledRowRDD.compute(ShuffledRowRDD.scala:208)
   	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
   	at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
   	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
   	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
   	at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
   	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
   	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
   	at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
   	at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
   	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
   	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
   	at org.apache.spark.scheduler.Task.run(Task.scala:136)
   	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:507)
   	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1475)
   	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:510)
   	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
   	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
   	at java.lang.Thread.run(Thread.java:748)
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] pan3793 edited a comment on pull request #35076: [SPARK-37793][CORE][SHUFFLE] Fallback to fetch original blocks when noLocalMergedBlockDataError

Posted by GitBox <gi...@apache.org>.
pan3793 edited a comment on pull request #35076:
URL: https://github.com/apache/spark/pull/35076#issuecomment-1005368068


   @otterc I agree with you that `bufs` should not be empty in design, and #34934 also does. 
   
   I also suspect there are some bugs or concurrence issues in code, and add some assertions, but unfortunately, nothing was found. 
   
   Besides those 2 issues, I also met the shuffle data corrupt issues frequently.
   ```
   Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Task 144 in stage 1921.0 failed 4 times, most recent failure: Lost task 144.3 in stage 1921.0 (TID 139025) (beta-spark4 executor 85): java.io.EOFException: reached end of stream after reading 46 bytes; 48 bytes expected
   	at org.sparkproject.guava.io.ByteStreams.readFully(ByteStreams.java:735)
   	at org.apache.spark.sql.execution.UnsafeRowSerializerInstance$$anon$2$$anon$3.next(UnsafeRowSerializer.scala:127)
   	at org.apache.spark.sql.execution.UnsafeRowSerializerInstance$$anon$2$$anon$3.next(UnsafeRowSerializer.scala:110)
           ...
   ```
   Both hardware(disk) issue and network issue may cause shuffle data corruption, and due to the lack of checksum mechanism of push-based shuffle, there is a chance we pass the corrupt data to `xxSerializer` layer, then cause exception and fail the task.
   
   So I think except to the code bug, there still has opportunity to read the corrupt metadata from disk/network, even the possibility is lower than shuffle data because metadata usually smaller, and when it happens, fallback to fetch the original blocks should be safe.
   
   With this patch and #34934, the data corruption is the only critical issue(I mean can fail the job) in our dozen rounds of 1T TPC-DS test, and I think add the checksum should solve that issue.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] pan3793 commented on pull request #35076: [SPARK-37793][CORE][SHUFFLE] Fallback to fetch original blocks when noLocalMergedBlockDataError

Posted by GitBox <gi...@apache.org>.
pan3793 commented on pull request #35076:
URL: https://github.com/apache/spark/pull/35076#issuecomment-1007354842


   Hi @otterc I got more information for this issue.
   
   Add assertion and debug log in `RemoteBlockPushResolver`(ESS side)
   
   ```java
   public MergeStatuses finalizeShuffleMerge(FinalizeShuffleMerge msg) {
       ...
       for (AppShufflePartitionInfo partition: shuffleMergePartitions.values()) {
           synchronized (partition) {
             try {
               // This can throw IOException which will marks this shuffle partition as not merged.
               partition.finalizePartition();
               bitmaps.add(partition.mapTracker);
               reduceIds.add(partition.reduceId);
               sizes.add(partition.getLastChunkOffset());
             } catch (IOException ioe) {
               logger.warn("Exception while finalizing shuffle partition {}_{} {} {}", msg.appId,
                 msg.appAttemptId, msg.shuffleId, partition.reduceId, ioe);
             } finally {
               partition.closeAllFilesAndDeleteIfNeeded(false);
             }
           }
           assert partition.dataFile.length() == partition.lastChunkOffset;
           assert partition.indexFile.file.length() == partition.indexFile.getPos();
           assert partition.metaFile.file.length() == partition.metaFile.getPos();
           logger.info("shuffle partition {}_{} {} {}, chunk_size={}, meta_length={}, data_length={}",
                   msg.appId, msg.appAttemptId, msg.shuffleId, partition.reduceId,
                   partition.indexFile.getPos() / 8 - 1,
                   partition.metaFile.getPos(),
                   partition.lastChunkOffset);
         }
         mergeStatuses = new MergeStatuses(msg.shuffleId, msg.shuffleMergeId,
           bitmaps.toArray(new RoaringBitmap[0]), Ints.toArray(reduceIds),
           Longs.toArray(sizes));
       }
       ...
   }
   ```
   
   ```
   2022-01-07 19:40:46,795 INFO shuffle.RemoteBlockPushResolver: shuffle partition application_1640143179334_0148_-1 126 4877, chunk_size=1, meta_length=18, data_length=157
   ```
   
   Add assertion and debug log in `IndexShuffleBlockResolver`(Reducer side)
   ```scala
   override def getMergedBlockData(
       blockId: ShuffleMergedBlockId,
       dirs: Option[Array[String]]): Seq[ManagedBuffer] = {
     val indexFile =
       getMergedBlockIndexFile(conf.getAppId, blockId.shuffleId, blockId.shuffleMergeId,
         blockId.reduceId, dirs)
     val dataFile = getMergedBlockDataFile(conf.getAppId, blockId.shuffleId,
       blockId.shuffleMergeId, blockId.reduceId, dirs)
     val metaFile = getMergedBlockMetaFile(conf.getAppId, blockId.shuffleId,
       blockId.shuffleMergeId, blockId.reduceId, dirs)
     // Load all the indexes in order to identify all chunks in the specified merged shuffle file.
     val size = indexFile.length.toInt
     val offsets = Utils.tryWithResource {
       new DataInputStream(Files.newInputStream(indexFile.toPath))
     } { dis =>
       val buffer = ByteBuffer.allocate(size)
       dis.readFully(buffer.array)
       buffer.asLongBuffer
     }
     // Number of chunks is number of indexes - 1
     val numChunks = size / 8 - 1
     if (numChunks == 0) {
       val indexBackupPath = java.nio.file.Paths.get(s"/tmp/${indexFile.toPath.getFileName}")
       val dataBackupPath = java.nio.file.Paths.get(s"/tmp/${dataFile.toPath.getFileName}")
       val metaBackupPath = java.nio.file.Paths.get(s"/tmp/${metaFile.toPath.getFileName}")
       logError(s"$blockId chunk_size is 0, " +
         s"index_file is $indexFile, backup to $indexBackupPath" +
         s"data_file is $dataFile, backup to $dataBackupPath" +
         s"meta_file is $metaFile, backup to $metaBackupPath")
       Files.copy(indexFile.toPath, indexBackupPath)
       Files.copy(dataFile.toPath, dataBackupPath)
       Files.copy(metaFile.toPath, metaBackupPath)
       assert(false)
     }
     for (index <- 0 until numChunks) yield {
       new FileSegmentManagedBuffer(transportConf, dataFile,
         offsets.get(index),
         offsets.get(index + 1) - offsets.get(index))
     }
   }
   ```
   
   Then I run TPCDS several rounds and reproduce the exception.
   
   ```log
   01-07 19:42:08 INFO [dag-scheduler-event-loop] scheduler.DAGScheduler@61: ShuffleMapStage 453 (save at QueryRunner.scala:98) failed in 1.811 s due to Job aborted due to stage failure: Task 122 in stage 453.0 failed 4 times, most recent failure: Lost task 122.3 in stage 453.0 (TID 278831) (beta-spark4 executor 562): java.lang.AssertionError: assertion failed
   	at scala.Predef$.assert(Predef.scala:208)
   	at org.apache.spark.shuffle.IndexShuffleBlockResolver.getMergedBlockData(IndexShuffleBlockResolver.scala:504)
   	at org.apache.spark.storage.BlockManager.getLocalMergedBlockData(BlockManager.scala:777)
   	at org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:945)
   	at org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:85)
   	at org.apache.spark.util.CompletionIterator.next(CompletionIterator.scala:29)
   ```
   
   ```
   root@beta-spark4:/tmp# ls -l shuffleMerged_application_1640143179334_0148_126_0_4877*
   -rw-r--r-- 1 root root 16036 Jan  7 19:41 shuffleMerged_application_1640143179334_0148_126_0_4877.data
   -rw-r--r-- 1 root root     8 Jan  7 19:41 shuffleMerged_application_1640143179334_0148_126_0_4877.index
   -rw-r--r-- 1 root root     0 Jan  7 19:41 shuffleMerged_application_1640143179334_0148_126_0_4877.meta
   ```
   
   So, the ESS and reduce task running on same machine, and ESS closed the 'data', 'index', 'meta' files and reported the there size as `chunk_size=1, meta_length=18, data_length=157`, these metadata also return to driver and pass to reduce task, but when reduce task read the file from disk, the data is not match!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] pan3793 edited a comment on pull request #35076: [SPARK-37793][CORE][SHUFFLE] Fallback to fetch original blocks when noLocalMergedBlockDataError

Posted by GitBox <gi...@apache.org>.
pan3793 edited a comment on pull request #35076:
URL: https://github.com/apache/spark/pull/35076#issuecomment-1005368068


   @otterc I agree with you that `bufs` should not be empty in design, and #34934 also does. 
   
   Besides those 2 issues, I also met the shuffle data corrupt issues frequently.
   ```
   Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Task 144 in stage 1921.0 failed 4 times, most recent failure: Lost task 144.3 in stage 1921.0 (TID 139025) (beta-spark4 executor 85): java.io.EOFException: reached end of stream after reading 46 bytes; 48 bytes expected
   	at org.sparkproject.guava.io.ByteStreams.readFully(ByteStreams.java:735)
   	at org.apache.spark.sql.execution.UnsafeRowSerializerInstance$$anon$2$$anon$3.next(UnsafeRowSerializer.scala:127)
   	at org.apache.spark.sql.execution.UnsafeRowSerializerInstance$$anon$2$$anon$3.next(UnsafeRowSerializer.scala:110)
           ...
   ```
   Both hardware(disk) issue and network issue may make shuffle data corruption, and due the lack of checksum mechanism  of push-based shuffle, there is a chance we pass the corrupt data to `xxSerializer` layer, then cause exception and fail the task.
   
   So I think except to the code bug, there still has opportunity to read the corrupt metadata from disk/network, even the possibility is lower than shuffle data because metadata usually smaller, and when it happens, fallback to fetch the original blocks should be safe.
   
   With this patch and #34934, data corrupt is the only issue in our dozen round of 1T TPC-DS test, and I think add the checksum should solve that issue.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] pan3793 edited a comment on pull request #35076: [SPARK-37793][CORE][SHUFFLE] Fallback to fetch original blocks when noLocalMergedBlockDataError

Posted by GitBox <gi...@apache.org>.
pan3793 edited a comment on pull request #35076:
URL: https://github.com/apache/spark/pull/35076#issuecomment-1005368068


   @otterc I agree with you that `bufs` should not be empty in design, and #34934 also does. 
   
   Besides those 2 issues, I also met the shuffle data corrupt issues frequently.
   ```
   Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Task 144 in stage 1921.0 failed 4 times, most recent failure: Lost task 144.3 in stage 1921.0 (TID 139025) (beta-spark4 executor 85): java.io.EOFException: reached end of stream after reading 46 bytes; 48 bytes expected
   	at org.sparkproject.guava.io.ByteStreams.readFully(ByteStreams.java:735)
   	at org.apache.spark.sql.execution.UnsafeRowSerializerInstance$$anon$2$$anon$3.next(UnsafeRowSerializer.scala:127)
   	at org.apache.spark.sql.execution.UnsafeRowSerializerInstance$$anon$2$$anon$3.next(UnsafeRowSerializer.scala:110)
           ...
   ```
   Both hardware(disk) issue and network issue may make shuffle data corruption, and due the lack of checksum mechanism  of push-based shuffle, there is a chance we pass the corrupt data to `xxSerializer` layer, then cause exception and fail the task.
   
   So I think except to the code bug, there still has opportunity to read the corrupt metadata from disk/network, even the possibility is lower than shuffle data because metadata usually smaller, and when it happens, fallback to fetch the original blocks should be safe.
   
   With this patch and #34934, data corrupt is the only issue in our dozen round of 1T TPC-DS tests, and I think add the checksum should solve that issue.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] pan3793 commented on pull request #35076: [SPARK-37793][CORE][SHUFFLE] Fallback to fetch original blocks when noLocalMergedBlockDataError

Posted by GitBox <gi...@apache.org>.
pan3793 commented on pull request #35076:
URL: https://github.com/apache/spark/pull/35076#issuecomment-1008332334


   After reading and debugging the push-based shuffle code, I don't know if I understand it correctly, and have some questions, will appreciate it if you can give me some feedbacks @mridulm @otterc 
   
   1. There may be multiple streams request to write one shuffle partition, I saw some variables declared without transient, does netty ensure to handle them in the same thread?
   2. The ESS writes 3 files for a merged partition, `data`, `index`, `meta`, and maintains each committed file position in-memory variables. When data arrives, locks `partitionInfo`, and writes files ordered by `data`, `index`, `meta`, if all writing success, update the committed file position, if any `IOException` occurs, the committed file position will keep previous values, then release the `partitionInfo` lock. Thus, the committed status should always be consistent. Finally, truncate files in committed positions before reporting merged status to `DAGScheduler`. So if ESS reported a merged status to `DAGScheduler`, the final files should always be consistent with each other and the merged status. Do I understand it correctly?
   3. For performance, ESS does not call `flush` of each file writing, if `write` does not throw IOE, ESS treats the writing is succeeded, and finally call `partition.closeAllFilesAndDeleteIfNeeded(false)` in `#finalizeShuffleMerge`, but `#closeAllFilesAndDeleteIfNeeded` will swallow any IOE which may cause the file inconsistent with the merged status?
   4. Does `file.e.getChannel().truncate(file.getPos())` always success if no IOE throw? I saw it will throw `null` in some conditions(NOT familiar with file system)
   5. A basic question about the OS file system. If process A writes and closes a file without any IOE, and gets the file length is `len`, does OS ensure another process B always reads the latest file content and gets the same `len`? 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] otterc commented on pull request #35076: [SPARK-37793][CORE][SHUFFLE] Fallback to fetch original blocks when noLocalMergedBlockDataError

Posted by GitBox <gi...@apache.org>.
otterc commented on pull request #35076:
URL: https://github.com/apache/spark/pull/35076#issuecomment-1005099025


   > After some investigations, found that the last FetchResult put into result is PushMergedLocalMetaFetchResult, and there is a chance that bufs is empty, will cause no SuccessFetchResult be added to results, and thread hang if no other FetchResult is put into results.
   
   I don't think that `bufs` should ever be empty. If `bufs` is empty, it just means that there were no blocks that got merged and in that case there shouldn't even be push-merged block (local or remote). @pan3793 could you please let me know how to reproduce this bug? Is there a particular query that you are running with which you see this?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] pan3793 edited a comment on pull request #35076: [SPARK-37793][CORE][SHUFFLE] Fallback to fetch original blocks when noLocalMergedBlockDataError

Posted by GitBox <gi...@apache.org>.
pan3793 edited a comment on pull request #35076:
URL: https://github.com/apache/spark/pull/35076#issuecomment-1005881779


   Update:
   
   After https://github.com/apache/spark/commit/f6128a6f4215dc45a19209d799dd9bf98fab6d8a, I run 3 rounds of 1T TPC-DS using master branch's code, ~don't see neither this issue nor the issue mentioned in #34934~, but the following issue occurs frequently, it cause stage retry rather than fail the job, and usually the stage can success in retry.
   
   ```
   FetchFailed(null, shuffleId=15, mapIndex=-1, mapId=-1, reduceId=838, message=
   org.apache.spark.shuffle.MetadataFetchFailedException: Missing an output location for shuffle 15 partition 838
   	at org.apache.spark.MapOutputTracker$.validateStatus(MapOutputTracker.scala:1619)
   	at org.apache.spark.MapOutputTracker$.$anonfun$convertMapStatuses$7(MapOutputTracker.scala:1555)
   	at scala.collection.TraversableLike$WithFilter.$anonfun$foreach$1(TraversableLike.scala:985)
   	at scala.collection.Iterator.foreach(Iterator.scala:943)
   	at scala.collection.Iterator.foreach$(Iterator.scala:943)
   	at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
   	at scala.collection.IterableLike.foreach(IterableLike.scala:74)
   	at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
   	at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
   	at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:984)
   	at org.apache.spark.MapOutputTracker$.$anonfun$convertMapStatuses$4(MapOutputTracker.scala:1554)
   	at org.apache.spark.MapOutputTracker$.$anonfun$convertMapStatuses$4$adapted(MapOutputTracker.scala:1535)
   	at scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36)
   	at scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:33)
   	at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:198)
   	at org.apache.spark.MapOutputTracker$.convertMapStatuses(MapOutputTracker.scala:1535)
   	at org.apache.spark.MapOutputTrackerWorker.getMapSizesByExecutorIdImpl(MapOutputTracker.scala:1230)
   	at org.apache.spark.MapOutputTrackerWorker.getPushBasedShuffleMapSizesByExecutorId(MapOutputTracker.scala:1204)
   	at org.apache.spark.shuffle.sort.SortShuffleManager.getReader(SortShuffleManager.scala:136)
   	at org.apache.spark.shuffle.ShuffleManager.getReader(ShuffleManager.scala:63)
   	at org.apache.spark.shuffle.ShuffleManager.getReader$(ShuffleManager.scala:57)
   	at org.apache.spark.shuffle.sort.SortShuffleManager.getReader(SortShuffleManager.scala:73)
   	at org.apache.spark.sql.execution.ShuffledRowRDD.compute(ShuffledRowRDD.scala:208)
   	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
   	at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
   	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
   	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
   	at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
   	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
   	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
   	at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
   	at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
   	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
   	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
   	at org.apache.spark.scheduler.Task.run(Task.scala:136)
   	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:507)
   	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1475)
   	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:510)
   	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
   	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
   	at java.lang.Thread.run(Thread.java:748)
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] pan3793 commented on pull request #35076: [SPARK-37793][CORE][SHUFFLE] Fallback to fetch original blocks when noLocalMergedBlockDataError

Posted by GitBox <gi...@apache.org>.
pan3793 commented on pull request #35076:
URL: https://github.com/apache/spark/pull/35076#issuecomment-1008323283


   Sorry, I don't got your point @mridulm 
   
   The ESS log indicate that `partition.getLastChunkOffset()` is 157, how does your change to solve the issue you pointed?
   
   ```
   2022-01-07 19:40:46,795 INFO shuffle.RemoteBlockPushResolver: shuffle partition application_1640143179334_0148_-1 126 4877, chunk_size=1, meta_length=18, data_length=157
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] mridulm commented on pull request #35076: [SPARK-37793][CORE][SHUFFLE] Fallback to fetch original blocks when noLocalMergedBlockDataError

Posted by GitBox <gi...@apache.org>.
mridulm commented on pull request #35076:
URL: https://github.com/apache/spark/pull/35076#issuecomment-1008449953


   
   To respond to your queries @pan3793 (pls feel free to elaborate @otterc):
   
   > 
   > 1. On ESS side, there may be multiple streams request to write one shuffle partition, I saw some variables declared without transient, does netty ensure to handle them in the same thread?
   
   `transient` applies to serialization/deserialization - did you mean `volatile` in this context instead ?
   Having said that, all state is modified with the `AppShufflePartitionInfo` locked - so would be within that critical section.
   
   > 2. The ESS writes 3 files for a merged partition, `data`, `index`, `meta`, and maintains each committed file position in-memory variables. When data arrives, locks `partitionInfo`, and writes files ordered by `data`, `index`, `meta` from the committed position, if all writing success, update the committed file position, if any `IOException` occurs, the committed file position will keep previous values, then release the `partitionInfo` lock. Thus, the committed status should always be consistent. Finally, truncate files in committed positions before reporting merged status to `DAGScheduler`. So if ESS reported a merged status to `DAGScheduler`, the final files should always be consistent with each other and the merged status. And we can trust the committed data of file in anytime. Do I understand it correctly?
   
   Yes. In addition, any exceptions during write/etc would trigger a failure, and would reset back to previous 'good' state.
   
   > 3. For performance, ESS does not call `flush` of each file writing, if `write` does not throw IOE, ESS treats the writing is succeeded, and finally call `partition.closeAllFilesAndDeleteIfNeeded(false)` in `#finalizeShuffleMerge`, but `#closeAllFilesAndDeleteIfNeeded` will swallow any IOE which may cause the file inconsistent with the merged status?
   
   The `IOException` being thrown in that method is when we are unable to close the stream - in this case, it is a close of the fd.
   While possible in theory, usually it would point to other more severe issues outside of what spark can deal with.
   But you are right, it does log and ignore failures if close fails.
   
   > 4. Does `file.e.getChannel().truncate(file.getPos())` always success if no IOE throw? I saw it will return `null` in some conditions(NOT familiar with file system)
   
   The truncate is actually a best case effort to clean up excess disk space usage.
   If there is an ongoing write, and we are finalizing - the excess data from write is not relevant and wont be consumed - and so truncate.
   It also makes things more clear when debugging (the file sizes should match the metadata we know).
   
   > 5. A basic question about the OS file system. If process A writes and closes a file without any IOE, and gets the file length is `len`, does OS ensure another process B always reads the latest file content and gets the same `len`?
   
   Yes, unless there is some other interleaving modifications to that file (or some OS/fs/driver bugs, but I am discounting them for the time being !).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins commented on pull request #35076: [SPARK-37793][CORE][SHUFFLE] Fallback to fetch original blocks when noLocalMergedBlockDataError

Posted by GitBox <gi...@apache.org>.
AmplabJenkins commented on pull request #35076:
URL: https://github.com/apache/spark/pull/35076#issuecomment-1003331733


   Can one of the admins verify this patch?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] otterc commented on pull request #35076: [SPARK-37793][CORE][SHUFFLE] Fallback to fetch original blocks when noLocalMergedBlockDataError

Posted by GitBox <gi...@apache.org>.
otterc commented on pull request #35076:
URL: https://github.com/apache/spark/pull/35076#issuecomment-1006820470


   @pan3793 I will try to reproduce this issue and debug. It will take me couple of days. Thanks for debugging and updating with your findings. I will let you know what I find.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] otterc commented on pull request #35076: [SPARK-37793][CORE][SHUFFLE] Fallback to fetch original blocks when noLocalMergedBlockDataError

Posted by GitBox <gi...@apache.org>.
otterc commented on pull request #35076:
URL: https://github.com/apache/spark/pull/35076#issuecomment-1009606408


   > Does file.e.getChannel().truncate(file.getPos()) always success if no IOE throw? I saw it will return null in some conditions(NOT familiar with file system)
    
   We do depend on truncate to ensure that the data in the merged file is consistent with the metadata. If there is some partial shuffle block that has been written to the merged file but it has not been committed yet (metadata not updated), then truncate will remove that data. If there is a failure during truncation then The `FileChannel.truncate`  throws exceptions, otherwise it returns the FileChannel. The javadoc for `FileChannel.truncate`  indicates the same  as well as the implementation `FileChannelImp.truncate()`. 
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] pan3793 commented on pull request #35076: [SPARK-37793][CORE][SHUFFLE] Fallback to fetch original blocks when noLocalMergedBlockDataError

Posted by GitBox <gi...@apache.org>.
pan3793 commented on pull request #35076:
URL: https://github.com/apache/spark/pull/35076#issuecomment-1008497632


   @mridulm thank you for your response!
   
   > did you mean `volatile` in this context instead?
   Having said that, all state is modified with the AppShufflePartitionInfo locked - so would be within that critical section.
   
   Oops, sorry, I mean `volatile`. Another question here, the `synchronized(appInfo)` ensures the visibility of `appInfo` itself, but what about the member variables without `volatile` of `appInfo`?
   
   > The `IOException` being thrown in that method is when we are unable to close the stream - in this case, it is a close of the `fd`.
   While possible in theory, usually it would point to other more severe issues outside of what spark can deal with.
   But you are right, it does log and ignore failures if close fails.
   
   Then I think if `#closeAllFilesAndDeleteIfNeeded` throws IOE, we need to mark this partition merge failed.
   
   > The truncate is actually a best case effort to clean up excess disk space usage.
   
   Then it has a chance that the actual file size is greater than metadata, i.e. because of a hardware issue, the disk becomes read-only when doing truncate(in this case it will throw IOE?). But since we can always trust the content before the 'good' position, for the case that file length greater than the 'good' position still can be treated as good merged data?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] pan3793 commented on pull request #35076: [SPARK-37793][CORE][SHUFFLE] Fallback to fetch original blocks when noLocalMergedBlockDataError

Posted by GitBox <gi...@apache.org>.
pan3793 commented on pull request #35076:
URL: https://github.com/apache/spark/pull/35076#issuecomment-1005118384


   No particular query, but easy reproduce when run all 1T TPCDS queries.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] pan3793 edited a comment on pull request #35076: [SPARK-37793][CORE][SHUFFLE] Fallback to fetch original blocks when noLocalMergedBlockDataError

Posted by GitBox <gi...@apache.org>.
pan3793 edited a comment on pull request #35076:
URL: https://github.com/apache/spark/pull/35076#issuecomment-1007354842


   Hi @otterc I got more information for this issue.
   
   Add assertion and debug log in `RemoteBlockPushResolver`(ESS side)
   
   ```java
   public MergeStatuses finalizeShuffleMerge(FinalizeShuffleMerge msg) {
       ...
       for (AppShufflePartitionInfo partition: shuffleMergePartitions.values()) {
           synchronized (partition) {
             try {
               // This can throw IOException which will marks this shuffle partition as not merged.
               partition.finalizePartition();
               bitmaps.add(partition.mapTracker);
               reduceIds.add(partition.reduceId);
               sizes.add(partition.getLastChunkOffset());
             } catch (IOException ioe) {
               logger.warn("Exception while finalizing shuffle partition {}_{} {} {}", msg.appId,
                 msg.appAttemptId, msg.shuffleId, partition.reduceId, ioe);
             } finally {
               partition.closeAllFilesAndDeleteIfNeeded(false);
             }
           }
   +       assert partition.dataFile.length() == partition.lastChunkOffset;
   +       assert partition.indexFile.file.length() == partition.indexFile.getPos();
   +       assert partition.metaFile.file.length() == partition.metaFile.getPos();
   +       logger.info("shuffle partition {}_{} {} {}, chunk_size={}, meta_length={}, data_length={}",
   +              msg.appId, msg.appAttemptId, msg.shuffleId, partition.reduceId,
   +              partition.indexFile.getPos() / 8 - 1,
   +              partition.metaFile.getPos(),
   +              partition.lastChunkOffset);
        }
         mergeStatuses = new MergeStatuses(msg.shuffleId, msg.shuffleMergeId,
           bitmaps.toArray(new RoaringBitmap[0]), Ints.toArray(reduceIds),
           Longs.toArray(sizes));
       }
       ...
   }
   ```
   
   ```
   2022-01-07 19:40:46,795 INFO shuffle.RemoteBlockPushResolver: shuffle partition application_1640143179334_0148_-1 126 4877, chunk_size=1, meta_length=18, data_length=157
   ```
   
   Add assertion and debug log in `IndexShuffleBlockResolver`(Reducer side)
   ```scala
   override def getMergedBlockData(
       blockId: ShuffleMergedBlockId,
       dirs: Option[Array[String]]): Seq[ManagedBuffer] = {
     val indexFile =
       getMergedBlockIndexFile(conf.getAppId, blockId.shuffleId, blockId.shuffleMergeId,
         blockId.reduceId, dirs)
     val dataFile = getMergedBlockDataFile(conf.getAppId, blockId.shuffleId,
       blockId.shuffleMergeId, blockId.reduceId, dirs)
     val metaFile = getMergedBlockMetaFile(conf.getAppId, blockId.shuffleId,
       blockId.shuffleMergeId, blockId.reduceId, dirs)
     // Load all the indexes in order to identify all chunks in the specified merged shuffle file.
     val size = indexFile.length.toInt
     val offsets = Utils.tryWithResource {
       new DataInputStream(Files.newInputStream(indexFile.toPath))
     } { dis =>
       val buffer = ByteBuffer.allocate(size)
       dis.readFully(buffer.array)
       buffer.asLongBuffer
     }
     // Number of chunks is number of indexes - 1
     val numChunks = size / 8 - 1
   + if (numChunks == 0) {
   +   val indexBackupPath = java.nio.file.Paths.get(s"/tmp/${indexFile.toPath.getFileName}")
   +   val dataBackupPath = java.nio.file.Paths.get(s"/tmp/${dataFile.toPath.getFileName}")
   +   val metaBackupPath = java.nio.file.Paths.get(s"/tmp/${metaFile.toPath.getFileName}")
   +   logError(s"$blockId chunk_size is 0, " +
   +      s"index_file is $indexFile, backup to $indexBackupPath" +
   +      s"data_file is $dataFile, backup to $dataBackupPath" +
   +      s"meta_file is $metaFile, backup to $metaBackupPath")
   +   Files.copy(indexFile.toPath, indexBackupPath)
   +   Files.copy(dataFile.toPath, dataBackupPath)
   +   Files.copy(metaFile.toPath, metaBackupPath)
   +   assert(false)
     }
     for (index <- 0 until numChunks) yield {
       new FileSegmentManagedBuffer(transportConf, dataFile,
         offsets.get(index),
         offsets.get(index + 1) - offsets.get(index))
     }
   }
   ```
   
   Then I run TPCDS several rounds and reproduce the exception.
   
   ```log
   01-07 19:42:08 INFO [dag-scheduler-event-loop] scheduler.DAGScheduler@61: ShuffleMapStage 453 (save at QueryRunner.scala:98) failed in 1.811 s due to Job aborted due to stage failure: Task 122 in stage 453.0 failed 4 times, most recent failure: Lost task 122.3 in stage 453.0 (TID 278831) (beta-spark4 executor 562): java.lang.AssertionError: assertion failed
   	at scala.Predef$.assert(Predef.scala:208)
   	at org.apache.spark.shuffle.IndexShuffleBlockResolver.getMergedBlockData(IndexShuffleBlockResolver.scala:504)
   	at org.apache.spark.storage.BlockManager.getLocalMergedBlockData(BlockManager.scala:777)
   	at org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:945)
   	at org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:85)
   	at org.apache.spark.util.CompletionIterator.next(CompletionIterator.scala:29)
   ```
   
   ```
   root@beta-spark4:/tmp# ls -l shuffleMerged_application_1640143179334_0148_126_0_4877*
   -rw-r--r-- 1 root root 16036 Jan  7 19:41 shuffleMerged_application_1640143179334_0148_126_0_4877.data
   -rw-r--r-- 1 root root     8 Jan  7 19:41 shuffleMerged_application_1640143179334_0148_126_0_4877.index
   -rw-r--r-- 1 root root     0 Jan  7 19:41 shuffleMerged_application_1640143179334_0148_126_0_4877.meta
   ```
   
   So, the ESS and reduce task running on same machine, and ESS closed the 'data', 'index', 'meta' files and reported the there size as `chunk_size=1, meta_length=18, data_length=157`, these metadata also return to driver and pass to reduce task, but when reduce task read the file from disk, the data is not match!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] pan3793 edited a comment on pull request #35076: [SPARK-37793][CORE][SHUFFLE] Fallback to fetch original blocks when noLocalMergedBlockDataError

Posted by GitBox <gi...@apache.org>.
pan3793 edited a comment on pull request #35076:
URL: https://github.com/apache/spark/pull/35076#issuecomment-1005368068


   @otterc I agree with you that `bufs` should not be empty in design, and #34934 also does. 
   
   Besides those 2 issues, I also met the shuffle data corrupt issues frequently.
   ```
   Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Task 144 in stage 1921.0 failed 4 times, most recent failure: Lost task 144.3 in stage 1921.0 (TID 139025) (beta-spark4 executor 85): java.io.EOFException: reached end of stream after reading 46 bytes; 48 bytes expected
   	at org.sparkproject.guava.io.ByteStreams.readFully(ByteStreams.java:735)
   	at org.apache.spark.sql.execution.UnsafeRowSerializerInstance$$anon$2$$anon$3.next(UnsafeRowSerializer.scala:127)
   	at org.apache.spark.sql.execution.UnsafeRowSerializerInstance$$anon$2$$anon$3.next(UnsafeRowSerializer.scala:110)
           ...
   ```
   Both hardware(disk) issue and network issue may cause shuffle data corruption, and due the lack of checksum mechanism  of push-based shuffle, there is a chance we pass the corrupt data to `xxSerializer` layer, then cause exception and fail the task.
   
   So I think except to the code bug, there still has opportunity to read the corrupt metadata from disk/network, even the possibility is lower than shuffle data because metadata usually smaller, and when it happens, fallback to fetch the original blocks should be safe.
   
   With this patch and #34934, the data corruption is the only issue in our dozen rounds of 1T TPC-DS test, and I think add the checksum should solve that issue.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] pan3793 edited a comment on pull request #35076: [SPARK-37793][CORE][SHUFFLE] Fallback to fetch original blocks when noLocalMergedBlockDataError

Posted by GitBox <gi...@apache.org>.
pan3793 edited a comment on pull request #35076:
URL: https://github.com/apache/spark/pull/35076#issuecomment-1005881779


   Update:
   
   After https://github.com/apache/spark/commit/f6128a6f4215dc45a19209d799dd9bf98fab6d8a, I run 3 rounds of 1T TPC-DS using master branch's code, don't see neither this issue nor the issue mentioned in #34934, but the following issue occurs frequently, it cause stage retry rather than fail the job, and usually the stage can success in retry.
   
   ```
   FetchFailed(null, shuffleId=15, mapIndex=-1, mapId=-1, reduceId=838, message=
   org.apache.spark.shuffle.MetadataFetchFailedException: Missing an output location for shuffle 15 partition 838
   	at org.apache.spark.MapOutputTracker$.validateStatus(MapOutputTracker.scala:1619)
   	at org.apache.spark.MapOutputTracker$.$anonfun$convertMapStatuses$7(MapOutputTracker.scala:1555)
   	at scala.collection.TraversableLike$WithFilter.$anonfun$foreach$1(TraversableLike.scala:985)
   	at scala.collection.Iterator.foreach(Iterator.scala:943)
   	at scala.collection.Iterator.foreach$(Iterator.scala:943)
   	at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
   	at scala.collection.IterableLike.foreach(IterableLike.scala:74)
   	at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
   	at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
   	at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:984)
   	at org.apache.spark.MapOutputTracker$.$anonfun$convertMapStatuses$4(MapOutputTracker.scala:1554)
   	at org.apache.spark.MapOutputTracker$.$anonfun$convertMapStatuses$4$adapted(MapOutputTracker.scala:1535)
   	at scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36)
   	at scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:33)
   	at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:198)
   	at org.apache.spark.MapOutputTracker$.convertMapStatuses(MapOutputTracker.scala:1535)
   	at org.apache.spark.MapOutputTrackerWorker.getMapSizesByExecutorIdImpl(MapOutputTracker.scala:1230)
   	at org.apache.spark.MapOutputTrackerWorker.getPushBasedShuffleMapSizesByExecutorId(MapOutputTracker.scala:1204)
   	at org.apache.spark.shuffle.sort.SortShuffleManager.getReader(SortShuffleManager.scala:136)
   	at org.apache.spark.shuffle.ShuffleManager.getReader(ShuffleManager.scala:63)
   	at org.apache.spark.shuffle.ShuffleManager.getReader$(ShuffleManager.scala:57)
   	at org.apache.spark.shuffle.sort.SortShuffleManager.getReader(SortShuffleManager.scala:73)
   	at org.apache.spark.sql.execution.ShuffledRowRDD.compute(ShuffledRowRDD.scala:208)
   	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
   	at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
   	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
   	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
   	at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
   	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
   	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
   	at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
   	at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
   	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
   	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
   	at org.apache.spark.scheduler.Task.run(Task.scala:136)
   	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:507)
   	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1475)
   	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:510)
   	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
   	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
   	at java.lang.Thread.run(Thread.java:748)
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] mridulm commented on pull request #35076: [SPARK-37793][CORE][SHUFFLE] Fallback to fetch original blocks when noLocalMergedBlockDataError

Posted by GitBox <gi...@apache.org>.
mridulm commented on pull request #35076:
URL: https://github.com/apache/spark/pull/35076#issuecomment-1003452997


   +CC @otterc 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] pan3793 edited a comment on pull request #35076: [SPARK-37793][CORE][SHUFFLE] Fallback to fetch original blocks when noLocalMergedBlockDataError

Posted by GitBox <gi...@apache.org>.
pan3793 edited a comment on pull request #35076:
URL: https://github.com/apache/spark/pull/35076#issuecomment-1007354842






-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] pan3793 edited a comment on pull request #35076: [SPARK-37793][CORE][SHUFFLE] Fallback to fetch original blocks when noLocalMergedBlockDataError

Posted by GitBox <gi...@apache.org>.
pan3793 edited a comment on pull request #35076:
URL: https://github.com/apache/spark/pull/35076#issuecomment-1008332334


   After reading and debugging the push-based shuffle code, I don't know if I understand it correctly, and have some questions, will appreciate it if you can give me some feedbacks @mridulm @otterc 
   
   1. On ESS side, there may be multiple streams request to write one shuffle partition, I saw some variables declared without transient, does netty ensure to handle them in the same thread?
   2. The ESS writes 3 files for a merged partition, `data`, `index`, `meta`, and maintains each committed file position in-memory variables. When data arrives, locks `partitionInfo`, and writes files ordered by `data`, `index`, `meta` from the committed position, if all writing success, update the committed file position, if any `IOException` occurs, the committed file position will keep previous values, then release the `partitionInfo` lock. Thus, the committed status should always be consistent. Finally, truncate files in committed positions before reporting merged status to `DAGScheduler`. So if ESS reported a merged status to `DAGScheduler`, the final files should always be consistent with each other and the merged status. And we can trust the committed data of file in anytime. Do I understand it correctly?
   3. For performance, ESS does not call `flush` of each file writing, if `write` does not throw IOE, ESS treats the writing is succeeded, and finally call `partition.closeAllFilesAndDeleteIfNeeded(false)` in `#finalizeShuffleMerge`, but `#closeAllFilesAndDeleteIfNeeded` will swallow any IOE which may cause the file inconsistent with the merged status?
   4. Does `file.e.getChannel().truncate(file.getPos())` always success if no IOE throw? I saw it will return `null` in some conditions(NOT familiar with file system)
   5. A basic question about the OS file system. If process A writes and closes a file without any IOE, and gets the file length is `len`, does OS ensure another process B always reads the latest file content and gets the same `len`? 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] pan3793 edited a comment on pull request #35076: [SPARK-37793][CORE][SHUFFLE] Fallback to fetch original blocks when noLocalMergedBlockDataError

Posted by GitBox <gi...@apache.org>.
pan3793 edited a comment on pull request #35076:
URL: https://github.com/apache/spark/pull/35076#issuecomment-1008332334


   After reading and debugging the push-based shuffle code, I don't know if I understand it correctly, and have some questions, will appreciate it if you can give me some feedbacks @mridulm @otterc 
   
   1. On ESS side, there may be multiple streams request to write one shuffle partition, I saw some variables declared without transient, does netty ensure to handle them in the same thread?
   2. The ESS writes 3 files for a merged partition, `data`, `index`, `meta`, and maintains each committed file position in-memory variables. When data arrives, locks `partitionInfo`, and writes files ordered by `data`, `index`, `meta`, if all writing success, update the committed file position, if any `IOException` occurs, the committed file position will keep previous values, then release the `partitionInfo` lock. Thus, the committed status should always be consistent. Finally, truncate files in committed positions before reporting merged status to `DAGScheduler`. So if ESS reported a merged status to `DAGScheduler`, the final files should always be consistent with each other and the merged status. And we can trust the committed data of file in anytime. Do I understand it correctly?
   3. For performance, ESS does not call `flush` of each file writing, if `write` does not throw IOE, ESS treats the writing is succeeded, and finally call `partition.closeAllFilesAndDeleteIfNeeded(false)` in `#finalizeShuffleMerge`, but `#closeAllFilesAndDeleteIfNeeded` will swallow any IOE which may cause the file inconsistent with the merged status?
   4. Does `file.e.getChannel().truncate(file.getPos())` always success if no IOE throw? I saw it will return `null` in some conditions(NOT familiar with file system)
   5. A basic question about the OS file system. If process A writes and closes a file without any IOE, and gets the file length is `len`, does OS ensure another process B always reads the latest file content and gets the same `len`? 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] pan3793 edited a comment on pull request #35076: [SPARK-37793][CORE][SHUFFLE] Fallback to fetch original blocks when noLocalMergedBlockDataError

Posted by GitBox <gi...@apache.org>.
pan3793 edited a comment on pull request #35076:
URL: https://github.com/apache/spark/pull/35076#issuecomment-1008332334






-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] pan3793 edited a comment on pull request #35076: [SPARK-37793][CORE][SHUFFLE] Fallback to fetch original blocks when noLocalMergedBlockDataError

Posted by GitBox <gi...@apache.org>.
pan3793 edited a comment on pull request #35076:
URL: https://github.com/apache/spark/pull/35076#issuecomment-1008323283


   Sorry, I don't get your point @mridulm 
   
   The ESS log indicate that `partition.getLastChunkOffset()` is 157, how does your change to solve the issue you pointed?
   
   ```
   2022-01-07 19:40:46,795 INFO shuffle.RemoteBlockPushResolver: shuffle partition application_1640143179334_0148_-1 126 4877, chunk_size=1, meta_length=18, data_length=157
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] pan3793 edited a comment on pull request #35076: [SPARK-37793][CORE][SHUFFLE] Fallback to fetch original blocks when noLocalMergedBlockDataError

Posted by GitBox <gi...@apache.org>.
pan3793 edited a comment on pull request #35076:
URL: https://github.com/apache/spark/pull/35076#issuecomment-1005881779


   Update:
   
   After https://github.com/apache/spark/commit/f6128a6f4215dc45a19209d799dd9bf98fab6d8a, I run 3 rounds of 1T TPC-DS using master branch's code, don't see neither this issue nor the issue mentioned in #34934, but the following issue occurs frequently, it cause shufle retry rather than fail the job, and usually the stage can success in retry.
   
   ```
   FetchFailed(null, shuffleId=15, mapIndex=-1, mapId=-1, reduceId=838, message=
   org.apache.spark.shuffle.MetadataFetchFailedException: Missing an output location for shuffle 15 partition 838
   	at org.apache.spark.MapOutputTracker$.validateStatus(MapOutputTracker.scala:1619)
   	at org.apache.spark.MapOutputTracker$.$anonfun$convertMapStatuses$7(MapOutputTracker.scala:1555)
   	at scala.collection.TraversableLike$WithFilter.$anonfun$foreach$1(TraversableLike.scala:985)
   	at scala.collection.Iterator.foreach(Iterator.scala:943)
   	at scala.collection.Iterator.foreach$(Iterator.scala:943)
   	at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
   	at scala.collection.IterableLike.foreach(IterableLike.scala:74)
   	at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
   	at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
   	at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:984)
   	at org.apache.spark.MapOutputTracker$.$anonfun$convertMapStatuses$4(MapOutputTracker.scala:1554)
   	at org.apache.spark.MapOutputTracker$.$anonfun$convertMapStatuses$4$adapted(MapOutputTracker.scala:1535)
   	at scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36)
   	at scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:33)
   	at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:198)
   	at org.apache.spark.MapOutputTracker$.convertMapStatuses(MapOutputTracker.scala:1535)
   	at org.apache.spark.MapOutputTrackerWorker.getMapSizesByExecutorIdImpl(MapOutputTracker.scala:1230)
   	at org.apache.spark.MapOutputTrackerWorker.getPushBasedShuffleMapSizesByExecutorId(MapOutputTracker.scala:1204)
   	at org.apache.spark.shuffle.sort.SortShuffleManager.getReader(SortShuffleManager.scala:136)
   	at org.apache.spark.shuffle.ShuffleManager.getReader(ShuffleManager.scala:63)
   	at org.apache.spark.shuffle.ShuffleManager.getReader$(ShuffleManager.scala:57)
   	at org.apache.spark.shuffle.sort.SortShuffleManager.getReader(SortShuffleManager.scala:73)
   	at org.apache.spark.sql.execution.ShuffledRowRDD.compute(ShuffledRowRDD.scala:208)
   	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
   	at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
   	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
   	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
   	at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
   	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
   	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
   	at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
   	at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
   	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
   	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
   	at org.apache.spark.scheduler.Task.run(Task.scala:136)
   	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:507)
   	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1475)
   	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:510)
   	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
   	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
   	at java.lang.Thread.run(Thread.java:748)
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] pan3793 commented on pull request #35076: [SPARK-37793][CORE][SHUFFLE] Fallback to fetch original blocks when noLocalMergedBlockDataError

Posted by GitBox <gi...@apache.org>.
pan3793 commented on pull request #35076:
URL: https://github.com/apache/spark/pull/35076#issuecomment-1005368068


   @otterc I agree with you that `bufs` should not be empty in design, and #34934 also does. 
   
   Besides those 2 issues, I also met the shuffle data corrupt issues frequently.
   ```
   Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Task 144 in stage 1921.0 failed 4 times, most recent failure: Lost task 144.3 in stage 1921.0 (TID 139025) (beta-spark4 executor 85): java.io.EOFException: reached end of stream after reading 46 bytes; 48 bytes expected
   	at org.sparkproject.guava.io.ByteStreams.readFully(ByteStreams.java:735)
   	at org.apache.spark.sql.execution.UnsafeRowSerializerInstance$$anon$2$$anon$3.next(UnsafeRowSerializer.scala:127)
   	at org.apache.spark.sql.execution.UnsafeRowSerializerInstance$$anon$2$$anon$3.next(UnsafeRowSerializer.scala:110)
           ...
   ```
   Both hardware(disk) issue and network issue may make shuffle data corruption, and due the lack of checksum mechanism  of push-based shuffle, there is a chance we pass the corrupt data to `xxSerializer` layer, then cause exception and fail the task.
   
   So I think except to the code bug, there still has opportunity to read the corrupt metadata from disk/network, even the possibility is lower than shuffle data because metadata usually smaller, and when it happens, fallback to fetch the original blocks should be safe.
   
   With this path and #34934, data corrupt is the only issue in our dozen round of 1T TPC-DS test, and I think add the checksum should solve that issue.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] pan3793 edited a comment on pull request #35076: [SPARK-37793][CORE][SHUFFLE] Fallback to fetch original blocks when noLocalMergedBlockDataError

Posted by GitBox <gi...@apache.org>.
pan3793 edited a comment on pull request #35076:
URL: https://github.com/apache/spark/pull/35076#issuecomment-1006295600


   Update:
   After f6128a6, this issue and the issue mentioned in #34934 still be there, when I turn `spark.shuffle.push.minShuffleSizeToWait` to zero(default is 500m), the issues happen frequently again.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] otterc commented on pull request #35076: [SPARK-37793][CORE][SHUFFLE] Fallback to fetch original blocks when noLocalMergedBlockDataError

Posted by GitBox <gi...@apache.org>.
otterc commented on pull request #35076:
URL: https://github.com/apache/spark/pull/35076#issuecomment-1010458523


   @pan3793 I have been trying to reproduce this issue by running different tpch queries but wasn't able to reproduce the issue. I am running the code in the master branch. Is it possible for you to share the entire application log and the shuffle service logs with me? Some other things that can be useful for me to reproduce this problem
   - Values of all the other spark configurations?
   - Do you see any other exceptions in the shuffle service logs before this happens?
   - Do you see any stage re-attempts?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] pan3793 commented on pull request #35076: [SPARK-37793][CORE][SHUFFLE] Fallback to fetch original blocks when noLocalMergedBlockDataError

Posted by GitBox <gi...@apache.org>.
pan3793 commented on pull request #35076:
URL: https://github.com/apache/spark/pull/35076#issuecomment-1006295600


   Update:
   After f6128a6, this issue and the issue mentioned in #34934 still be there, when I turn `spark.shuffle.push.minShuffleSizeToWait` to zero, the issues happen frequently again.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] pan3793 edited a comment on pull request #35076: [SPARK-37793][CORE][SHUFFLE] Fallback to fetch original blocks when noLocalMergedBlockDataError

Posted by GitBox <gi...@apache.org>.
pan3793 edited a comment on pull request #35076:
URL: https://github.com/apache/spark/pull/35076#issuecomment-1005368068


   @otterc I agree with you that `bufs` should not be empty in design, and #34934 also does. 
   
   I also suspect there are some bugs or concurrence issues in code, and add some assertions, but unfortunately, nothing was found. 
   
   Besides those 2 issues, I also met the shuffle data corruption issues frequently.
   ```
   Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Task 144 in stage 1921.0 failed 4 times, most recent failure: Lost task 144.3 in stage 1921.0 (TID 139025) (beta-spark4 executor 85): java.io.EOFException: reached end of stream after reading 46 bytes; 48 bytes expected
   	at org.sparkproject.guava.io.ByteStreams.readFully(ByteStreams.java:735)
   	at org.apache.spark.sql.execution.UnsafeRowSerializerInstance$$anon$2$$anon$3.next(UnsafeRowSerializer.scala:127)
   	at org.apache.spark.sql.execution.UnsafeRowSerializerInstance$$anon$2$$anon$3.next(UnsafeRowSerializer.scala:110)
           ...
   ```
   Both hardware(disk) issue and network issue may cause shuffle data corruption, and due to the lack of checksum mechanism of push-based shuffle, there is a chance we pass the corrupt data to `xxSerializer` layer, then cause exception and fail the task.
   
   So I think except to the code bug, there still has opportunity to read the corrupt metadata from disk/network, even the possibility is lower than shuffle data because metadata usually smaller, and when it happens, fallback to fetch the original blocks should be safe.
   
   With this patch and #34934, the data corruption is the only critical issue(I mean can fail the job) in our dozen rounds of 1T TPC-DS test, and I think add the checksum should solve that issue.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] mridulm commented on pull request #35076: [SPARK-37793][CORE][SHUFFLE] Fallback to fetch original blocks when noLocalMergedBlockDataError

Posted by GitBox <gi...@apache.org>.
mridulm commented on pull request #35076:
URL: https://github.com/apache/spark/pull/35076#issuecomment-1008181443


   One possibility I was thinking of was if we calling `finalizePartition` before any chunks have been fully written out for a reducer.
   
   Can you test with this change ?
   ```
   diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java
   index d0eb4aed65..0d3a3c7448 100644
   --- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java
   +++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java
   @@ -575,17 +575,23 @@ public class RemoteBlockPushResolver implements MergedShuffleFileManager {
          List<Long> sizes = new ArrayList<>(shuffleMergePartitions.size());
          for (AppShufflePartitionInfo partition: shuffleMergePartitions.values()) {
            synchronized (partition) {
   +          boolean shouldDelete = false;
              try {
                // This can throw IOException which will marks this shuffle partition as not merged.
                partition.finalizePartition();
   -            bitmaps.add(partition.mapTracker);
   -            reduceIds.add(partition.reduceId);
   -            sizes.add(partition.getLastChunkOffset());
   +            long size = partition.getLastChunkOffset();
   +            if (size > 0) {
   +              bitmaps.add(partition.mapTracker);
   +              reduceIds.add(partition.reduceId);
   +              sizes.add(size);
   +            } else {
   +              shouldDelete = true;
   +            }
              } catch (IOException ioe) {
                logger.warn("Exception while finalizing shuffle partition {}_{} {} {}", msg.appId,
                  msg.appAttemptId, msg.shuffleId, partition.reduceId, ioe);
              } finally {
   -            partition.closeAllFilesAndDeleteIfNeeded(false);
   +            partition.closeAllFilesAndDeleteIfNeeded(shouldDelete);
              }
            }
          }
   ```
   
   +CC @otterc 
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] pan3793 edited a comment on pull request #35076: [SPARK-37793][CORE][SHUFFLE] Fallback to fetch original blocks when noLocalMergedBlockDataError

Posted by GitBox <gi...@apache.org>.
pan3793 edited a comment on pull request #35076:
URL: https://github.com/apache/spark/pull/35076#issuecomment-1007354842


   Hi @otterc I got more information for this issue.
   
   Add assertion and debug log in `RemoteBlockPushResolver`(ESS side)
   
   ```java
   public MergeStatuses finalizeShuffleMerge(FinalizeShuffleMerge msg) {
       ...
       for (AppShufflePartitionInfo partition: shuffleMergePartitions.values()) {
           synchronized (partition) {
             try {
               // This can throw IOException which will marks this shuffle partition as not merged.
               partition.finalizePartition();
               bitmaps.add(partition.mapTracker);
               reduceIds.add(partition.reduceId);
               sizes.add(partition.getLastChunkOffset());
             } catch (IOException ioe) {
               logger.warn("Exception while finalizing shuffle partition {}_{} {} {}", msg.appId,
                 msg.appAttemptId, msg.shuffleId, partition.reduceId, ioe);
             } finally {
               partition.closeAllFilesAndDeleteIfNeeded(false);
             }
           }
   +       assert partition.dataFile.length() == partition.lastChunkOffset;
   +       assert partition.indexFile.file.length() == partition.indexFile.getPos();
   +       assert partition.metaFile.file.length() == partition.metaFile.getPos();
   +       logger.info("shuffle partition {}_{} {} {}, chunk_size={}, meta_length={}, data_length={}",
   +              msg.appId, msg.appAttemptId, msg.shuffleId, partition.reduceId,
   +              partition.indexFile.getPos() / 8 - 1,
   +              partition.metaFile.getPos(),
   +              partition.lastChunkOffset);
   +    }
         mergeStatuses = new MergeStatuses(msg.shuffleId, msg.shuffleMergeId,
           bitmaps.toArray(new RoaringBitmap[0]), Ints.toArray(reduceIds),
           Longs.toArray(sizes));
       }
       ...
   }
   ```
   
   ```
   2022-01-07 19:40:46,795 INFO shuffle.RemoteBlockPushResolver: shuffle partition application_1640143179334_0148_-1 126 4877, chunk_size=1, meta_length=18, data_length=157
   ```
   
   Add assertion and debug log in `IndexShuffleBlockResolver`(Reducer side)
   ```scala
   override def getMergedBlockData(
       blockId: ShuffleMergedBlockId,
       dirs: Option[Array[String]]): Seq[ManagedBuffer] = {
     val indexFile =
       getMergedBlockIndexFile(conf.getAppId, blockId.shuffleId, blockId.shuffleMergeId,
         blockId.reduceId, dirs)
     val dataFile = getMergedBlockDataFile(conf.getAppId, blockId.shuffleId,
       blockId.shuffleMergeId, blockId.reduceId, dirs)
     val metaFile = getMergedBlockMetaFile(conf.getAppId, blockId.shuffleId,
       blockId.shuffleMergeId, blockId.reduceId, dirs)
     // Load all the indexes in order to identify all chunks in the specified merged shuffle file.
     val size = indexFile.length.toInt
     val offsets = Utils.tryWithResource {
       new DataInputStream(Files.newInputStream(indexFile.toPath))
     } { dis =>
       val buffer = ByteBuffer.allocate(size)
       dis.readFully(buffer.array)
       buffer.asLongBuffer
     }
     // Number of chunks is number of indexes - 1
     val numChunks = size / 8 - 1
   + if (numChunks == 0) {
   +   val indexBackupPath = java.nio.file.Paths.get(s"/tmp/${indexFile.toPath.getFileName}")
   +   val dataBackupPath = java.nio.file.Paths.get(s"/tmp/${dataFile.toPath.getFileName}")
   +   val metaBackupPath = java.nio.file.Paths.get(s"/tmp/${metaFile.toPath.getFileName}")
   +   logError(s"$blockId chunk_size is 0, " +
   +      s"index_file is $indexFile, backup to $indexBackupPath" +
   +      s"data_file is $dataFile, backup to $dataBackupPath" +
   +      s"meta_file is $metaFile, backup to $metaBackupPath")
   +   Files.copy(indexFile.toPath, indexBackupPath)
   +   Files.copy(dataFile.toPath, dataBackupPath)
   +   Files.copy(metaFile.toPath, metaBackupPath)
   +   assert(false)
     }
     for (index <- 0 until numChunks) yield {
       new FileSegmentManagedBuffer(transportConf, dataFile,
         offsets.get(index),
         offsets.get(index + 1) - offsets.get(index))
     }
   }
   ```
   
   Then I run TPCDS several rounds and reproduce the exception.
   
   ```log
   01-07 19:42:08 INFO [dag-scheduler-event-loop] scheduler.DAGScheduler@61: ShuffleMapStage 453 (save at QueryRunner.scala:98) failed in 1.811 s due to Job aborted due to stage failure: Task 122 in stage 453.0 failed 4 times, most recent failure: Lost task 122.3 in stage 453.0 (TID 278831) (beta-spark4 executor 562): java.lang.AssertionError: assertion failed
   	at scala.Predef$.assert(Predef.scala:208)
   	at org.apache.spark.shuffle.IndexShuffleBlockResolver.getMergedBlockData(IndexShuffleBlockResolver.scala:504)
   	at org.apache.spark.storage.BlockManager.getLocalMergedBlockData(BlockManager.scala:777)
   	at org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:945)
   	at org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:85)
   	at org.apache.spark.util.CompletionIterator.next(CompletionIterator.scala:29)
   ```
   
   ```
   root@beta-spark4:/tmp# ls -l shuffleMerged_application_1640143179334_0148_126_0_4877*
   -rw-r--r-- 1 root root 16036 Jan  7 19:41 shuffleMerged_application_1640143179334_0148_126_0_4877.data
   -rw-r--r-- 1 root root     8 Jan  7 19:41 shuffleMerged_application_1640143179334_0148_126_0_4877.index
   -rw-r--r-- 1 root root     0 Jan  7 19:41 shuffleMerged_application_1640143179334_0148_126_0_4877.meta
   ```
   
   So, the ESS and reduce task running on same machine, and ESS closed the 'data', 'index', 'meta' files and reported the there size as `chunk_size=1, meta_length=18, data_length=157`, these metadata also return to driver and pass to reduce task, but when reduce task read the file from disk, the data is not match!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] mridulm commented on pull request #35076: [SPARK-37793][CORE][SHUFFLE] Fallback to fetch original blocks when noLocalMergedBlockDataError

Posted by GitBox <gi...@apache.org>.
mridulm commented on pull request #35076:
URL: https://github.com/apache/spark/pull/35076#issuecomment-1008446824


   I had written up my comment before your log messages @pan3793.
   What I had detailed is a potential bug - whether that is impacting this specific issue needs to be seen, though looks unlikely.
   
   The discrepancy between data file size and data length seems to indicate some other issue here - though I cant make much out from the details provided unfortunately.
   I will let @otterc reproduce.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] pan3793 edited a comment on pull request #35076: [SPARK-37793][CORE][SHUFFLE] Fallback to fetch original blocks when noLocalMergedBlockDataError

Posted by GitBox <gi...@apache.org>.
pan3793 edited a comment on pull request #35076:
URL: https://github.com/apache/spark/pull/35076#issuecomment-1008332334


   After reading and debugging the push-based shuffle code, I don't know if I understand it correctly, and have some questions, will appreciate it if you can give me some feedbacks @mridulm @otterc 
   
   1. On ESS side, there may be multiple streams request to write one shuffle partition, I saw some variables declared without transient, does netty ensure to handle them in the same thread?
   2. The ESS writes 3 files for a merged partition, `data`, `index`, `meta`, and maintains each committed file position in-memory variables. When data arrives, locks `partitionInfo`, and writes files ordered by `data`, `index`, `meta`, if all writing success, update the committed file position, if any `IOException` occurs, the committed file position will keep previous values, then release the `partitionInfo` lock. Thus, the committed status should always be consistent. Finally, truncate files in committed positions before reporting merged status to `DAGScheduler`. So if ESS reported a merged status to `DAGScheduler`, the final files should always be consistent with each other and the merged status. Do I understand it correctly?
   3. For performance, ESS does not call `flush` of each file writing, if `write` does not throw IOE, ESS treats the writing is succeeded, and finally call `partition.closeAllFilesAndDeleteIfNeeded(false)` in `#finalizeShuffleMerge`, but `#closeAllFilesAndDeleteIfNeeded` will swallow any IOE which may cause the file inconsistent with the merged status?
   4. Does `file.e.getChannel().truncate(file.getPos())` always success if no IOE throw? I saw it will return `null` in some conditions(NOT familiar with file system)
   5. A basic question about the OS file system. If process A writes and closes a file without any IOE, and gets the file length is `len`, does OS ensure another process B always reads the latest file content and gets the same `len`? 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] pan3793 edited a comment on pull request #35076: [SPARK-37793][CORE][SHUFFLE] Fallback to fetch original blocks when noLocalMergedBlockDataError

Posted by GitBox <gi...@apache.org>.
pan3793 edited a comment on pull request #35076:
URL: https://github.com/apache/spark/pull/35076#issuecomment-1008332334


   After reading and debugging the push-based shuffle code, I don't know if I understand it correctly, and have some questions, will appreciate it if you can give me some feedbacks @mridulm @otterc 
   
   1. On ESS side, there may be multiple streams request to write one shuffle partition, I saw some variables declared without transient, does netty ensure to handle them in the same thread?
   2. The ESS writes 3 files for a merged partition, `data`, `index`, `meta`, and maintains each committed file position in-memory variables. When data arrives, locks `partitionInfo`, and writes files ordered by `data`, `index`, `meta`, if all writing success, update the committed file position, if any `IOException` occurs, the committed file position will keep previous values, then release the `partitionInfo` lock. Thus, the committed status should always be consistent. Finally, truncate files in committed positions before reporting merged status to `DAGScheduler`. So if ESS reported a merged status to `DAGScheduler`, the final files should always be consistent with each other and the merged status. Do I understand it correctly?
   3. For performance, ESS does not call `flush` of each file writing, if `write` does not throw IOE, ESS treats the writing is succeeded, and finally call `partition.closeAllFilesAndDeleteIfNeeded(false)` in `#finalizeShuffleMerge`, but `#closeAllFilesAndDeleteIfNeeded` will swallow any IOE which may cause the file inconsistent with the merged status?
   4. Does `file.e.getChannel().truncate(file.getPos())` always success if no IOE throw? I saw it will throw `null` in some conditions(NOT familiar with file system)
   5. A basic question about the OS file system. If process A writes and closes a file without any IOE, and gets the file length is `len`, does OS ensure another process B always reads the latest file content and gets the same `len`? 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org