You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@celeborn.apache.org by "zhongqiangczq (via GitHub)" <gi...@apache.org> on 2023/02/13 03:36:32 UTC

[GitHub] [incubator-celeborn] zhongqiangczq opened a new pull request, #1225: [CELEBORN-292] impove code about mappartition filewriter

zhongqiangczq opened a new pull request, #1225:
URL: https://github.com/apache/incubator-celeborn/pull/1225

   <!--
   Thanks for sending a pull request!  Here are some tips for you:
     - Make sure the PR title start w/ a JIRA ticket, e.g. '[CELEBORN-XXXX] Your PR title ...'.
     - Be sure to keep the PR description updated to reflect all changes.
     - Please write your PR title to summarize what this PR proposes.
     - If possible, provide a concise example to reproduce the issue for a faster review.
   -->
   
   ### What changes were proposed in this pull request?
   
   
   
   ### Why are the changes needed?
   
   add logs
   modify flushindex from async to sync
   modify reading header from platform method to bytebuf read
   
   ### Does this PR introduce _any_ user-facing change?
   
   
   
   ### How was this patch tested?
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@celeborn.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-celeborn] zhongqiangczq commented on a diff in pull request #1225: [CELEBORN-292] optimize mappartitionfilewriter flushing index and reading data header

Posted by "zhongqiangczq (via GitHub)" <gi...@apache.org>.
zhongqiangczq commented on code in PR #1225:
URL: https://github.com/apache/incubator-celeborn/pull/1225#discussion_r1106841850


##########
worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/MapPartitionFileWriter.java:
##########
@@ -197,17 +191,29 @@ public synchronized void destroy(IOException ioException) {
   }
 
   public void pushDataHandShake(int numReducePartitions, int bufferSize) {
+    logger.debug(
+        "FileWriter pushDataHandShake numReducePartitions:{} bufferSize:{}",
+        numReducePartitions,
+        bufferSize);
+
     this.numReducePartitions = numReducePartitions;
     fileInfo.setBufferSize(bufferSize);
+    fileInfo.setNumReducerPartitions(numReducePartitions);

Review Comment:
   fixed
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@celeborn.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-celeborn] FMX commented on a diff in pull request #1225: [CELEBORN-292] optimize mappartitionfilewriter flushing index and reading data header

Posted by "FMX (via GitHub)" <gi...@apache.org>.
FMX commented on code in PR #1225:
URL: https://github.com/apache/incubator-celeborn/pull/1225#discussion_r1109306801


##########
worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/MapPartitionFileWriter.java:
##########
@@ -106,36 +103,36 @@ private void takeBufferIndex() {
       source.startTimer(metricsName, fileAbsPath);
     }
 
-    // real action
-    flushBufferIndex = flusher.takeBuffer();
-
     // metrics end
     if (source.metricsCollectCriticalEnabled()) {
       source.stopTimer(metricsName, fileAbsPath);
     }
-
-    if (flushBufferIndex == null) {
-      IOException e =
-          new IOException(
-              "Take buffer index encounter error from Flusher: " + flusher.bufferQueueInfo());
-      notifier.setException(e);
-    }
   }
 
   public void write(ByteBuf data) throws IOException {
-    byte[] header = new byte[16];
     data.markReaderIndex();
-    data.readBytes(header);
+    int partitionId = data.readInt();
+    int attemptId = data.readInt();
+    int batchId = data.readInt();
+    int size = data.readInt();
+
     data.resetReaderIndex();
-    int partitionId = Platform.getInt(header, Platform.BYTE_ARRAY_OFFSET);
+    logger.debug(
+        "mappartition filename:{} write partition:{} attemptId:{} batchId:{} size:{}",
+        fileInfo.getFilePath(),
+        partitionId,
+        attemptId,
+        batchId,
+        size);
+
     collectPartitionDataLength(partitionId, data);
 
     super.write(data);
   }
 
   private void collectPartitionDataLength(int partitionId, ByteBuf data) throws IOException {

Review Comment:
   Concurrent issue here cause write will be invoked by multiple threads.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@celeborn.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-celeborn] codecov[bot] commented on pull request #1225: [CELEBORN-292] optimize mappartitionfilewriter flushing index and reading data header

Posted by "codecov[bot] (via GitHub)" <gi...@apache.org>.
codecov[bot] commented on PR #1225:
URL: https://github.com/apache/incubator-celeborn/pull/1225#issuecomment-1427397535

   # [Codecov](https://codecov.io/gh/apache/incubator-celeborn/pull/1225?src=pr&el=h1&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) Report
   > Merging [#1225](https://codecov.io/gh/apache/incubator-celeborn/pull/1225?src=pr&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (b12895d) into [main](https://codecov.io/gh/apache/incubator-celeborn/commit/adb6592d316b28fdf26cc0364779b64de96c9e6a?el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (adb6592) will **increase** coverage by `0.19%`.
   > The diff coverage is `65.52%`.
   
   ```diff
   @@             Coverage Diff              @@
   ##               main    #1225      +/-   ##
   ============================================
   + Coverage     27.18%   27.37%   +0.19%     
   - Complexity      806      810       +4     
   ============================================
     Files           212      212              
     Lines         17974    18025      +51     
     Branches       1964     1963       -1     
   ============================================
   + Hits           4885     4933      +48     
   - Misses        12766    12770       +4     
   + Partials        323      322       -1     
   ```
   
   
   | [Impacted Files](https://codecov.io/gh/apache/incubator-celeborn/pull/1225?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | Coverage Δ | |
   |---|---|---|
   | [...java/org/apache/celeborn/common/meta/FileInfo.java](https://codecov.io/gh/apache/incubator-celeborn/pull/1225?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-Y29tbW9uL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9jZWxlYm9ybi9jb21tb24vbWV0YS9GaWxlSW5mby5qYXZh) | `0.00% <0.00%> (ø)` | |
   | [.../scala/org/apache/celeborn/common/util/Utils.scala](https://codecov.io/gh/apache/incubator-celeborn/pull/1225?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-Y29tbW9uL3NyYy9tYWluL3NjYWxhL29yZy9hcGFjaGUvY2VsZWJvcm4vY29tbW9uL3V0aWwvVXRpbHMuc2NhbGE=) | `6.86% <0.00%> (-0.05%)` | :arrow_down: |
   | [.../deploy/worker/storage/MapPartitionFileWriter.java](https://codecov.io/gh/apache/incubator-celeborn/pull/1225?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-d29ya2VyL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9jZWxlYm9ybi9zZXJ2aWNlL2RlcGxveS93b3JrZXIvc3RvcmFnZS9NYXBQYXJ0aXRpb25GaWxlV3JpdGVyLmphdmE=) | `54.78% <74.00%> (+13.61%)` | :arrow_up: |
   | [...cala/org/apache/celeborn/common/CelebornConf.scala](https://codecov.io/gh/apache/incubator-celeborn/pull/1225?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-Y29tbW9uL3NyYy9tYWluL3NjYWxhL29yZy9hcGFjaGUvY2VsZWJvcm4vY29tbW9uL0NlbGVib3JuQ29uZi5zY2FsYQ==) | `81.01% <100.00%> (ø)` | |
   | [...apache/celeborn/service/deploy/worker/Worker.scala](https://codecov.io/gh/apache/incubator-celeborn/pull/1225?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-d29ya2VyL3NyYy9tYWluL3NjYWxhL29yZy9hcGFjaGUvY2VsZWJvcm4vc2VydmljZS9kZXBsb3kvd29ya2VyL1dvcmtlci5zY2FsYQ==) | `0.00% <0.00%> (ø)` | |
   | [.../celeborn/service/deploy/worker/WorkerSource.scala](https://codecov.io/gh/apache/incubator-celeborn/pull/1225?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-d29ya2VyL3NyYy9tYWluL3NjYWxhL29yZy9hcGFjaGUvY2VsZWJvcm4vc2VydmljZS9kZXBsb3kvd29ya2VyL1dvcmtlclNvdXJjZS5zY2FsYQ==) | `100.00% <0.00%> (ø)` | |
   | [...eleborn/common/metrics/source/AbstractSource.scala](https://codecov.io/gh/apache/incubator-celeborn/pull/1225?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-Y29tbW9uL3NyYy9tYWluL3NjYWxhL29yZy9hcGFjaGUvY2VsZWJvcm4vY29tbW9uL21ldHJpY3Mvc291cmNlL0Fic3RyYWN0U291cmNlLnNjYWxh) | `0.00% <0.00%> (ø)` | |
   | [...oy/worker/congestcontrol/CongestionController.java](https://codecov.io/gh/apache/incubator-celeborn/pull/1225?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-d29ya2VyL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9jZWxlYm9ybi9zZXJ2aWNlL2RlcGxveS93b3JrZXIvY29uZ2VzdGNvbnRyb2wvQ29uZ2VzdGlvbkNvbnRyb2xsZXIuamF2YQ==) | `77.78% <0.00%> (+10.11%)` | :arrow_up: |
   
   :mega: We’re building smart automated test selection to slash your CI/CD build times. [Learn more](https://about.codecov.io/iterative-testing/?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@celeborn.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-celeborn] FMX commented on a diff in pull request #1225: [CELEBORN-292] optimize mappartitionfilewriter flushing index and reading data header

Posted by "FMX (via GitHub)" <gi...@apache.org>.
FMX commented on code in PR #1225:
URL: https://github.com/apache/incubator-celeborn/pull/1225#discussion_r1109306801


##########
worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/MapPartitionFileWriter.java:
##########
@@ -106,36 +103,36 @@ private void takeBufferIndex() {
       source.startTimer(metricsName, fileAbsPath);
     }
 
-    // real action
-    flushBufferIndex = flusher.takeBuffer();
-
     // metrics end
     if (source.metricsCollectCriticalEnabled()) {
       source.stopTimer(metricsName, fileAbsPath);
     }
-
-    if (flushBufferIndex == null) {
-      IOException e =
-          new IOException(
-              "Take buffer index encounter error from Flusher: " + flusher.bufferQueueInfo());
-      notifier.setException(e);
-    }
   }
 
   public void write(ByteBuf data) throws IOException {
-    byte[] header = new byte[16];
     data.markReaderIndex();
-    data.readBytes(header);
+    int partitionId = data.readInt();
+    int attemptId = data.readInt();
+    int batchId = data.readInt();
+    int size = data.readInt();
+
     data.resetReaderIndex();
-    int partitionId = Platform.getInt(header, Platform.BYTE_ARRAY_OFFSET);
+    logger.debug(
+        "mappartition filename:{} write partition:{} attemptId:{} batchId:{} size:{}",
+        fileInfo.getFilePath(),
+        partitionId,
+        attemptId,
+        batchId,
+        size);
+
     collectPartitionDataLength(partitionId, data);
 
     super.write(data);
   }
 
   private void collectPartitionDataLength(int partitionId, ByteBuf data) throws IOException {

Review Comment:
   Concurrent issue here cause write will be invoked by multiple threads.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@celeborn.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-celeborn] zhongqiangczq commented on a diff in pull request #1225: [CELEBORN-292] optimize mappartitionfilewriter flushing index and reading data header

Posted by "zhongqiangczq (via GitHub)" <gi...@apache.org>.
zhongqiangczq commented on code in PR #1225:
URL: https://github.com/apache/incubator-celeborn/pull/1225#discussion_r1105324733


##########
worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/MapPartitionFileWriter.java:
##########
@@ -197,17 +191,29 @@ public synchronized void destroy(IOException ioException) {
   }
 
   public void pushDataHandShake(int numReducePartitions, int bufferSize) {
+    logger.debug(
+        "FileWriter pushDataHandShake numReducePartitions:{} bufferSize:{}",
+        numReducePartitions,
+        bufferSize);
+
     this.numReducePartitions = numReducePartitions;
     fileInfo.setBufferSize(bufferSize);
+    fileInfo.setNumReducerPartitions(numReducePartitions);
   }
 
   public void regionStart(int currentDataRegionIndex, boolean isBroadcastRegion) {
+    logger.debug(

Review Comment:
   ok, is's useful to mark which file it write



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@celeborn.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-celeborn] RexXiong commented on a diff in pull request #1225: [CELEBORN-292] optimize mappartitionfilewriter flushing index and reading data header

Posted by "RexXiong (via GitHub)" <gi...@apache.org>.
RexXiong commented on code in PR #1225:
URL: https://github.com/apache/incubator-celeborn/pull/1225#discussion_r1106765968


##########
worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/MapPartitionFileWriter.java:
##########
@@ -260,21 +276,37 @@ private synchronized void destroyIndex() {
   }
 
   private void flushIndex() throws IOException {

Review Comment:
   better do flush index us async, we can improve point future.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@celeborn.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-celeborn] RexXiong commented on a diff in pull request #1225: [CELEBORN-292] optimize mappartitionfilewriter flushing index and reading data header

Posted by "RexXiong (via GitHub)" <gi...@apache.org>.
RexXiong commented on code in PR #1225:
URL: https://github.com/apache/incubator-celeborn/pull/1225#discussion_r1104110342


##########
worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/MapPartitionFileWriter.java:
##########
@@ -197,17 +191,29 @@ public synchronized void destroy(IOException ioException) {
   }
 
   public void pushDataHandShake(int numReducePartitions, int bufferSize) {
+    logger.debug(
+        "FileWriter pushDataHandShake numReducePartitions:{} bufferSize:{}",
+        numReducePartitions,
+        bufferSize);
+
     this.numReducePartitions = numReducePartitions;
     fileInfo.setBufferSize(bufferSize);
+    fileInfo.setNumReducerPartitions(numReducePartitions);

Review Comment:
   keep NumReducerPartitions/numReducePartitions consistent.



##########
worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/MapPartitionFileWriter.java:
##########
@@ -221,9 +227,25 @@ public void regionFinish() throws IOException {
     for (int partitionIndex = 0; partitionIndex < numReducePartitions; ++partitionIndex) {
       indexBuffer.putLong(fileOffset);
       if (!isBroadcastRegion) {
+        logger.info(

Review Comment:
   change logger level to debug



##########
worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/MapPartitionFileWriter.java:
##########
@@ -197,17 +191,29 @@ public synchronized void destroy(IOException ioException) {
   }
 
   public void pushDataHandShake(int numReducePartitions, int bufferSize) {
+    logger.debug(
+        "FileWriter pushDataHandShake numReducePartitions:{} bufferSize:{}",
+        numReducePartitions,
+        bufferSize);
+
     this.numReducePartitions = numReducePartitions;
     fileInfo.setBufferSize(bufferSize);
+    fileInfo.setNumReducerPartitions(numReducePartitions);
   }
 
   public void regionStart(int currentDataRegionIndex, boolean isBroadcastRegion) {
+    logger.debug(

Review Comment:
   It would be useful if we logger with fileinfo such as file pathname with this.



##########
worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/MapPartitionFileWriter.java:
##########
@@ -197,17 +191,29 @@ public synchronized void destroy(IOException ioException) {
   }
 
   public void pushDataHandShake(int numReducePartitions, int bufferSize) {
+    logger.debug(
+        "FileWriter pushDataHandShake numReducePartitions:{} bufferSize:{}",
+        numReducePartitions,
+        bufferSize);
+
     this.numReducePartitions = numReducePartitions;
     fileInfo.setBufferSize(bufferSize);
+    fileInfo.setNumReducerPartitions(numReducePartitions);
   }
 
   public void regionStart(int currentDataRegionIndex, boolean isBroadcastRegion) {
+    logger.debug(
+        "FileWriter regionStart currentDataRegionIndex:{} isBroadcastRegion:{}",
+        currentDataRegionIndex,
+        isBroadcastRegion);
+
     this.currentReducePartition = 0;
     this.currentDataRegionIndex = currentDataRegionIndex;
     this.isBroadcastRegion = isBroadcastRegion;
   }
 
   public void regionFinish() throws IOException {
+    logger.debug("FileWriter regionFinish");

Review Comment:
   ditto



##########
common/src/main/scala/org/apache/celeborn/common/util/Utils.scala:
##########
@@ -994,4 +994,11 @@ object Utils extends Logging {
     }
   }
 
+  def getShortFormattedFileName(fileInfo: FileInfo): String = {
+    val parentFile = fileInfo.getFile.getParent

Review Comment:
   mv this method to FileInfo.



##########
worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/MapPartitionFileWriter.java:
##########
@@ -260,21 +274,37 @@ private synchronized void destroyIndex() {
   }
 
   private void flushIndex() throws IOException {
-    indexBuffer.flip();
-    notifier.checkException();
-    notifier.numPendingFlushes.incrementAndGet();
-    if (indexBuffer.hasRemaining()) {
-      FlushTask task = null;
-      if (channelIndex != null) {
-        Unpooled.wrappedBuffer(indexBuffer);
-        task = new LocalFlushTask(flushBufferIndex, channelIndex, notifier);
-      } else if (fileInfo.isHdfs()) {
-        task = new HdfsFlushTask(flushBufferIndex, fileInfo.getHdfsIndexPath(), notifier);
+    if (indexBuffer != null) {
+      logger.debug("flushIndex start:" + fileInfo.getIndexPath());

Review Comment:
   use format



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@celeborn.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-celeborn] zhongqiangczq commented on a diff in pull request #1225: [CELEBORN-292] optimize mappartitionfilewriter flushing index and reading data header

Posted by "zhongqiangczq (via GitHub)" <gi...@apache.org>.
zhongqiangczq commented on code in PR #1225:
URL: https://github.com/apache/incubator-celeborn/pull/1225#discussion_r1105323991


##########
common/src/main/scala/org/apache/celeborn/common/util/Utils.scala:
##########
@@ -994,4 +994,11 @@ object Utils extends Logging {
     }
   }
 
+  def getShortFormattedFileName(fileInfo: FileInfo): String = {
+    val parentFile = fileInfo.getFile.getParent

Review Comment:
   delete this function and use fileinfo.getfilepath



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@celeborn.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-celeborn] FMX merged pull request #1225: [CELEBORN-292] optimize mappartitionfilewriter flushing index and reading data header

Posted by "FMX (via GitHub)" <gi...@apache.org>.
FMX merged PR #1225:
URL: https://github.com/apache/incubator-celeborn/pull/1225


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@celeborn.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-celeborn] zhongqiangczq commented on a diff in pull request #1225: [CELEBORN-292] optimize mappartitionfilewriter flushing index and reading data header

Posted by "zhongqiangczq (via GitHub)" <gi...@apache.org>.
zhongqiangczq commented on code in PR #1225:
URL: https://github.com/apache/incubator-celeborn/pull/1225#discussion_r1105325647


##########
worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/MapPartitionFileWriter.java:
##########
@@ -260,21 +274,37 @@ private synchronized void destroyIndex() {
   }
 
   private void flushIndex() throws IOException {
-    indexBuffer.flip();
-    notifier.checkException();
-    notifier.numPendingFlushes.incrementAndGet();
-    if (indexBuffer.hasRemaining()) {
-      FlushTask task = null;
-      if (channelIndex != null) {
-        Unpooled.wrappedBuffer(indexBuffer);
-        task = new LocalFlushTask(flushBufferIndex, channelIndex, notifier);
-      } else if (fileInfo.isHdfs()) {
-        task = new HdfsFlushTask(flushBufferIndex, fileInfo.getHdfsIndexPath(), notifier);
+    if (indexBuffer != null) {
+      logger.debug("flushIndex start:" + fileInfo.getIndexPath());

Review Comment:
   ok



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@celeborn.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-celeborn] zhongqiangczq commented on a diff in pull request #1225: [CELEBORN-292] optimize mappartitionfilewriter flushing index and reading data header

Posted by "zhongqiangczq (via GitHub)" <gi...@apache.org>.
zhongqiangczq commented on code in PR #1225:
URL: https://github.com/apache/incubator-celeborn/pull/1225#discussion_r1106842156


##########
worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/MapPartitionFileWriter.java:
##########
@@ -197,17 +191,29 @@ public synchronized void destroy(IOException ioException) {
   }
 
   public void pushDataHandShake(int numReducePartitions, int bufferSize) {
+    logger.debug(
+        "FileWriter pushDataHandShake numReducePartitions:{} bufferSize:{}",
+        numReducePartitions,
+        bufferSize);
+
     this.numReducePartitions = numReducePartitions;
     fileInfo.setBufferSize(bufferSize);
+    fileInfo.setNumReducerPartitions(numReducePartitions);
   }
 
   public void regionStart(int currentDataRegionIndex, boolean isBroadcastRegion) {
+    logger.debug(
+        "FileWriter regionStart currentDataRegionIndex:{} isBroadcastRegion:{}",
+        currentDataRegionIndex,
+        isBroadcastRegion);
+
     this.currentReducePartition = 0;
     this.currentDataRegionIndex = currentDataRegionIndex;
     this.isBroadcastRegion = isBroadcastRegion;
   }
 
   public void regionFinish() throws IOException {
+    logger.debug("FileWriter regionFinish");

Review Comment:
   add fileinfo.getfilepath



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@celeborn.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org