You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@celeborn.apache.org by "AngersZhuuuu (via GitHub)" <gi...@apache.org> on 2023/02/20 08:58:28 UTC

[GitHub] [incubator-celeborn] AngersZhuuuu commented on a diff in pull request #1249: [CELEBORN-308] Fix flusher will exit unexpectedly if flush task write failed.

AngersZhuuuu commented on code in PR #1249:
URL: https://github.com/apache/incubator-celeborn/pull/1249#discussion_r1111627213


##########
worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/Flusher.scala:
##########
@@ -38,17 +39,19 @@ import org.apache.celeborn.service.deploy.worker.congestcontrol.CongestionContro
 abstract private[worker] class Flusher(
     val workerSource: AbstractSource,
     val threadCount: Int,
-    val avgFlushTimeSlidingWindowSize: Int,
-    val avgFlushTimeSlidingWindowMinCount: Int) extends Logging {
+    val conf: CelebornConf) extends Logging {
   protected lazy val flusherId = System.identityHashCode(this)
   protected val workingQueues = new Array[LinkedBlockingQueue[FlushTask]](threadCount)
   protected val bufferQueue = new LinkedBlockingQueue[CompositeByteBuf]()
   protected val workers = new Array[Thread](threadCount)
   protected var nextWorkerIndex: Int = 0
   protected val flushCount = new LongAdder
   protected val flushTotalTime = new LongAdder
+  protected val avgFlushTimeSlidingWindowSize = conf.avgFlushTimeSlidingWindowSize
+  protected val avgFlushTimeSlidingWindowMinCount = conf.avgFlushTimeSlidingWindowMinCount
   protected val avgTimeWindow = new Array[(Long, Long)](avgFlushTimeSlidingWindowSize)
   protected var avgTimeWindowCurrentIndex = 0
+  protected val failFlushBuffer = conf.testFailFlushBufferWithNonCriticalError

Review Comment:
   `failFlushBuffer `?



##########
worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/Flusher.scala:
##########
@@ -220,15 +228,13 @@ private[worker] class LocalFlusher(
 
 final private[worker] class HdfsFlusher(
     workerSource: AbstractSource,
-    hdfsFlusherThreads: Int,
-    flushAvgTimeWindowSize: Int,
-    avgFlushTimeSlidingWindowMinCount: Int) extends Flusher(
+    conf: CelebornConf) extends Flusher(
     workerSource,
-    hdfsFlusherThreads,
-    flushAvgTimeWindowSize,
-    avgFlushTimeSlidingWindowMinCount) with Logging {
+    conf.hdfsFlusherThreads,
+    conf) with Logging {
   override def toString: String = s"HdfsFlusher@$flusherId"
 
+  // In HDFSFlusher, flush buffers failed will stop flusher.
   override def processIOException(e: IOException, deviceErrorType: DiskStatus): Unit = {
     stopAndCleanFlusher()

Review Comment:
   Should we remove this too?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@celeborn.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org