You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@celeborn.apache.org by "RexXiong (via GitHub)" <gi...@apache.org> on 2023/02/13 08:15:52 UTC

[GitHub] [incubator-celeborn] RexXiong commented on a diff in pull request #1225: [CELEBORN-292] optimize mappartitionfilewriter flushing index and reading data header

RexXiong commented on code in PR #1225:
URL: https://github.com/apache/incubator-celeborn/pull/1225#discussion_r1104110342


##########
worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/MapPartitionFileWriter.java:
##########
@@ -197,17 +191,29 @@ public synchronized void destroy(IOException ioException) {
   }
 
   public void pushDataHandShake(int numReducePartitions, int bufferSize) {
+    logger.debug(
+        "FileWriter pushDataHandShake numReducePartitions:{} bufferSize:{}",
+        numReducePartitions,
+        bufferSize);
+
     this.numReducePartitions = numReducePartitions;
     fileInfo.setBufferSize(bufferSize);
+    fileInfo.setNumReducerPartitions(numReducePartitions);

Review Comment:
   keep NumReducerPartitions/numReducePartitions consistent.



##########
worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/MapPartitionFileWriter.java:
##########
@@ -221,9 +227,25 @@ public void regionFinish() throws IOException {
     for (int partitionIndex = 0; partitionIndex < numReducePartitions; ++partitionIndex) {
       indexBuffer.putLong(fileOffset);
       if (!isBroadcastRegion) {
+        logger.info(

Review Comment:
   change logger level to debug



##########
worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/MapPartitionFileWriter.java:
##########
@@ -197,17 +191,29 @@ public synchronized void destroy(IOException ioException) {
   }
 
   public void pushDataHandShake(int numReducePartitions, int bufferSize) {
+    logger.debug(
+        "FileWriter pushDataHandShake numReducePartitions:{} bufferSize:{}",
+        numReducePartitions,
+        bufferSize);
+
     this.numReducePartitions = numReducePartitions;
     fileInfo.setBufferSize(bufferSize);
+    fileInfo.setNumReducerPartitions(numReducePartitions);
   }
 
   public void regionStart(int currentDataRegionIndex, boolean isBroadcastRegion) {
+    logger.debug(

Review Comment:
   It would be useful if we logger with fileinfo such as file pathname with this.



##########
worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/MapPartitionFileWriter.java:
##########
@@ -197,17 +191,29 @@ public synchronized void destroy(IOException ioException) {
   }
 
   public void pushDataHandShake(int numReducePartitions, int bufferSize) {
+    logger.debug(
+        "FileWriter pushDataHandShake numReducePartitions:{} bufferSize:{}",
+        numReducePartitions,
+        bufferSize);
+
     this.numReducePartitions = numReducePartitions;
     fileInfo.setBufferSize(bufferSize);
+    fileInfo.setNumReducerPartitions(numReducePartitions);
   }
 
   public void regionStart(int currentDataRegionIndex, boolean isBroadcastRegion) {
+    logger.debug(
+        "FileWriter regionStart currentDataRegionIndex:{} isBroadcastRegion:{}",
+        currentDataRegionIndex,
+        isBroadcastRegion);
+
     this.currentReducePartition = 0;
     this.currentDataRegionIndex = currentDataRegionIndex;
     this.isBroadcastRegion = isBroadcastRegion;
   }
 
   public void regionFinish() throws IOException {
+    logger.debug("FileWriter regionFinish");

Review Comment:
   ditto



##########
common/src/main/scala/org/apache/celeborn/common/util/Utils.scala:
##########
@@ -994,4 +994,11 @@ object Utils extends Logging {
     }
   }
 
+  def getShortFormattedFileName(fileInfo: FileInfo): String = {
+    val parentFile = fileInfo.getFile.getParent

Review Comment:
   mv this method to FileInfo.



##########
worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/MapPartitionFileWriter.java:
##########
@@ -260,21 +274,37 @@ private synchronized void destroyIndex() {
   }
 
   private void flushIndex() throws IOException {
-    indexBuffer.flip();
-    notifier.checkException();
-    notifier.numPendingFlushes.incrementAndGet();
-    if (indexBuffer.hasRemaining()) {
-      FlushTask task = null;
-      if (channelIndex != null) {
-        Unpooled.wrappedBuffer(indexBuffer);
-        task = new LocalFlushTask(flushBufferIndex, channelIndex, notifier);
-      } else if (fileInfo.isHdfs()) {
-        task = new HdfsFlushTask(flushBufferIndex, fileInfo.getHdfsIndexPath(), notifier);
+    if (indexBuffer != null) {
+      logger.debug("flushIndex start:" + fileInfo.getIndexPath());

Review Comment:
   use format



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@celeborn.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org