You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by wu...@apache.org on 2021/08/10 01:54:39 UTC

[spark] branch master updated: [SPARK-36332][SHUFFLE] Cleanup RemoteBlockPushResolver log messages

This is an automated email from the ASF dual-hosted git repository.

wuyi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new ab89710  [SPARK-36332][SHUFFLE] Cleanup RemoteBlockPushResolver log messages
ab89710 is described below

commit ab897109a3fa9f83a20857a292dd68fe97e447a8
Author: Venkata krishnan Sowrirajan <vs...@linkedin.com>
AuthorDate: Tue Aug 10 09:53:53 2021 +0800

    [SPARK-36332][SHUFFLE] Cleanup RemoteBlockPushResolver log messages
    
    ### What changes were proposed in this pull request?
    Cleanup `RemoteBlockPushResolver` log messages by using `AppShufflePartitionInfo#toString()` to avoid duplications. Currently this is based off of https://github.com/apache/spark/pull/33034 will remove those changes once it is merged and remove the WIP at that time.
    
    ### Why are the changes needed?
    Minor cleanup to make code more readable.
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    No tests, just changing log messages
    
    Closes #33561 from venkata91/SPARK-36332.
    
    Authored-by: Venkata krishnan Sowrirajan <vs...@linkedin.com>
    Signed-off-by: yi.wu <yi...@databricks.com>
---
 .../network/shuffle/RemoteBlockPushResolver.java   | 45 ++++++++--------------
 1 file changed, 17 insertions(+), 28 deletions(-)

diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java
index 8578843..84ecf3d 100644
--- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java
+++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java
@@ -678,9 +678,7 @@ public class RemoteBlockPushResolver implements MergedShuffleFileManager {
     private void writeBuf(ByteBuffer buf) throws IOException {
       while (buf.hasRemaining()) {
         long updatedPos = partitionInfo.getDataFilePos() + length;
-        logger.debug("{} shuffleId {} shuffleMergeId {} reduceId {} current pos"
-          + " {} updated pos {}", partitionInfo.appId, partitionInfo.shuffleId,
-          partitionInfo.shuffleMergeId, partitionInfo.reduceId,
+        logger.debug("{} current pos {} updated pos {}", partitionInfo,
           partitionInfo.getDataFilePos(), updatedPos);
         length += partitionInfo.dataChannel.write(buf, updatedPos);
       }
@@ -795,9 +793,7 @@ public class RemoteBlockPushResolver implements MergedShuffleFileManager {
             return;
           }
           abortIfNecessary();
-          logger.trace("{} shuffleId {} shuffleMergeId {} reduceId {} onData writable",
-            partitionInfo.appId, partitionInfo.shuffleId, partitionInfo.shuffleMergeId,
-            partitionInfo.reduceId);
+          logger.trace("{} onData writable", partitionInfo);
           if (partitionInfo.getCurrentMapIndex() < 0) {
             partitionInfo.setCurrentMapIndex(mapIndex);
           }
@@ -817,9 +813,7 @@ public class RemoteBlockPushResolver implements MergedShuffleFileManager {
             throw ioe;
           }
         } else {
-          logger.trace("{} shuffleId {} shuffleMergeId {} reduceId {} onData deferred",
-            partitionInfo.appId, partitionInfo.shuffleId, partitionInfo.shuffleMergeId,
-            partitionInfo.reduceId);
+          logger.trace("{} onData deferred", partitionInfo);
           // If we cannot write to disk, we buffer the current block chunk in memory so it could
           // potentially be written to disk later. We take our best effort without guarantee
           // that the block will be written to disk. If the block data is divided into multiple
@@ -852,9 +846,7 @@ public class RemoteBlockPushResolver implements MergedShuffleFileManager {
     @Override
     public void onComplete(String streamId) throws IOException {
       synchronized (partitionInfo) {
-        logger.trace("{} shuffleId {} shuffleMergeId {} reduceId {} onComplete invoked",
-          partitionInfo.appId, partitionInfo.shuffleId, partitionInfo.shuffleMergeId,
-          partitionInfo.reduceId);
+        logger.trace("{} onComplete invoked", partitionInfo);
         // Initially when this request got to the server, the shuffle merge finalize request
         // was not received yet or this was the latest stage attempt (or latest shuffleMergeId)
         // generating shuffle output for the shuffle ID. By the time we finish reading this
@@ -936,9 +928,7 @@ public class RemoteBlockPushResolver implements MergedShuffleFileManager {
         synchronized (partitionInfo) {
           if (!isStaleOrTooLate(appShuffleInfo.shuffles.get(partitionInfo.shuffleId),
               partitionInfo.shuffleMergeId, partitionInfo.reduceId)) {
-              logger.debug("{} shuffleId {} shuffleMergeId {} reduceId {}"
-                + " encountered failure", partitionInfo.appId, partitionInfo.shuffleId,
-                partitionInfo.shuffleMergeId, partitionInfo.reduceId);
+              logger.debug("{} encountered failure", partitionInfo);
               partitionInfo.setCurrentMapIndex(-1);
             }
           }
@@ -1032,9 +1022,7 @@ public class RemoteBlockPushResolver implements MergedShuffleFileManager {
     }
 
     public void setDataFilePos(long dataFilePos) {
-      logger.trace("{} shuffleId {} shuffleMergeId {} reduceId {} current pos {}"
-        + " update pos {}", appId, shuffleId, shuffleMergeId, reduceId, this.dataFilePos,
-        dataFilePos);
+      logger.trace("{} current pos {} update pos {}", this, this.dataFilePos, dataFilePos);
       this.dataFilePos = dataFilePos;
     }
 
@@ -1043,9 +1031,7 @@ public class RemoteBlockPushResolver implements MergedShuffleFileManager {
     }
 
     void setCurrentMapIndex(int mapIndex) {
-      logger.trace("{} shuffleId {} shuffleMergeId {} reduceId {} updated mapIndex {}"
-        + " current mapIndex {}", appId, shuffleId, shuffleMergeId, reduceId,
-        currentMapIndex, mapIndex);
+      logger.trace("{} mapIndex {} current mapIndex {}", this, currentMapIndex, mapIndex);
       this.currentMapIndex = mapIndex;
     }
 
@@ -1054,8 +1040,7 @@ public class RemoteBlockPushResolver implements MergedShuffleFileManager {
     }
 
     void blockMerged(int mapIndex) {
-      logger.debug("{} shuffleId {} shuffleMergeId {} reduceId {} updated merging mapIndex {}",
-        appId, shuffleId, shuffleMergeId, reduceId, mapIndex);
+      logger.debug("{} updated merging mapIndex {}", this, mapIndex);
       mapTracker.add(mapIndex);
       chunkTracker.add(mapIndex);
       lastMergedMapIndex = mapIndex;
@@ -1073,9 +1058,8 @@ public class RemoteBlockPushResolver implements MergedShuffleFileManager {
      */
     void updateChunkInfo(long chunkOffset, int mapIndex) throws IOException {
       try {
-        logger.trace("{} shuffleId {} shuffleMergeId {} reduceId {} index current {}"
-          + " updated {}", appId, shuffleId, shuffleMergeId, reduceId,
-          this.lastChunkOffset, chunkOffset);
+        logger.trace("{} index current {} updated {}", this, this.lastChunkOffset,
+          chunkOffset);
         if (indexMetaUpdateFailed) {
           indexFile.getChannel().position(indexFile.getPos());
         }
@@ -1103,8 +1087,7 @@ public class RemoteBlockPushResolver implements MergedShuffleFileManager {
         return;
       }
       chunkTracker.add(mapIndex);
-      logger.trace("{} shuffleId {} shuffleMergeId {} reduceId {} mapIndex {}"
-        + " write chunk to meta file", appId, shuffleId, shuffleMergeId, reduceId, mapIndex);
+      logger.trace("{} mapIndex {} write chunk to meta file", this, mapIndex);
       if (indexMetaUpdateFailed) {
         metaFile.getChannel().position(metaFile.getPos());
       }
@@ -1170,6 +1153,12 @@ public class RemoteBlockPushResolver implements MergedShuffleFileManager {
     }
 
     @Override
+    public String toString() {
+      return String.format("Application %s shuffleId %s shuffleMergeId %s reduceId %s",
+        appId, shuffleId, shuffleMergeId, reduceId);
+    }
+
+    @Override
     protected void finalize() throws Throwable {
       closeAllFilesAndDeleteIfNeeded(false);
     }

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org