You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by wu...@apache.org on 2021/08/10 01:54:39 UTC
[spark] branch master updated: [SPARK-36332][SHUFFLE] Cleanup
RemoteBlockPushResolver log messages
This is an automated email from the ASF dual-hosted git repository.
wuyi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new ab89710 [SPARK-36332][SHUFFLE] Cleanup RemoteBlockPushResolver log messages
ab89710 is described below
commit ab897109a3fa9f83a20857a292dd68fe97e447a8
Author: Venkata krishnan Sowrirajan <vs...@linkedin.com>
AuthorDate: Tue Aug 10 09:53:53 2021 +0800
[SPARK-36332][SHUFFLE] Cleanup RemoteBlockPushResolver log messages
### What changes were proposed in this pull request?
Cleanup `RemoteBlockPushResolver` log messages by using `AppShufflePartitionInfo#toString()` to avoid duplications. Currently this is based off of https://github.com/apache/spark/pull/33034 will remove those changes once it is merged and remove the WIP at that time.
### Why are the changes needed?
Minor cleanup to make code more readable.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
No tests, just changing log messages
Closes #33561 from venkata91/SPARK-36332.
Authored-by: Venkata krishnan Sowrirajan <vs...@linkedin.com>
Signed-off-by: yi.wu <yi...@databricks.com>
---
.../network/shuffle/RemoteBlockPushResolver.java | 45 ++++++++--------------
1 file changed, 17 insertions(+), 28 deletions(-)
diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java
index 8578843..84ecf3d 100644
--- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java
+++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java
@@ -678,9 +678,7 @@ public class RemoteBlockPushResolver implements MergedShuffleFileManager {
private void writeBuf(ByteBuffer buf) throws IOException {
while (buf.hasRemaining()) {
long updatedPos = partitionInfo.getDataFilePos() + length;
- logger.debug("{} shuffleId {} shuffleMergeId {} reduceId {} current pos"
- + " {} updated pos {}", partitionInfo.appId, partitionInfo.shuffleId,
- partitionInfo.shuffleMergeId, partitionInfo.reduceId,
+ logger.debug("{} current pos {} updated pos {}", partitionInfo,
partitionInfo.getDataFilePos(), updatedPos);
length += partitionInfo.dataChannel.write(buf, updatedPos);
}
@@ -795,9 +793,7 @@ public class RemoteBlockPushResolver implements MergedShuffleFileManager {
return;
}
abortIfNecessary();
- logger.trace("{} shuffleId {} shuffleMergeId {} reduceId {} onData writable",
- partitionInfo.appId, partitionInfo.shuffleId, partitionInfo.shuffleMergeId,
- partitionInfo.reduceId);
+ logger.trace("{} onData writable", partitionInfo);
if (partitionInfo.getCurrentMapIndex() < 0) {
partitionInfo.setCurrentMapIndex(mapIndex);
}
@@ -817,9 +813,7 @@ public class RemoteBlockPushResolver implements MergedShuffleFileManager {
throw ioe;
}
} else {
- logger.trace("{} shuffleId {} shuffleMergeId {} reduceId {} onData deferred",
- partitionInfo.appId, partitionInfo.shuffleId, partitionInfo.shuffleMergeId,
- partitionInfo.reduceId);
+ logger.trace("{} onData deferred", partitionInfo);
// If we cannot write to disk, we buffer the current block chunk in memory so it could
// potentially be written to disk later. We take our best effort without guarantee
// that the block will be written to disk. If the block data is divided into multiple
@@ -852,9 +846,7 @@ public class RemoteBlockPushResolver implements MergedShuffleFileManager {
@Override
public void onComplete(String streamId) throws IOException {
synchronized (partitionInfo) {
- logger.trace("{} shuffleId {} shuffleMergeId {} reduceId {} onComplete invoked",
- partitionInfo.appId, partitionInfo.shuffleId, partitionInfo.shuffleMergeId,
- partitionInfo.reduceId);
+ logger.trace("{} onComplete invoked", partitionInfo);
// Initially when this request got to the server, the shuffle merge finalize request
// was not received yet or this was the latest stage attempt (or latest shuffleMergeId)
// generating shuffle output for the shuffle ID. By the time we finish reading this
@@ -936,9 +928,7 @@ public class RemoteBlockPushResolver implements MergedShuffleFileManager {
synchronized (partitionInfo) {
if (!isStaleOrTooLate(appShuffleInfo.shuffles.get(partitionInfo.shuffleId),
partitionInfo.shuffleMergeId, partitionInfo.reduceId)) {
- logger.debug("{} shuffleId {} shuffleMergeId {} reduceId {}"
- + " encountered failure", partitionInfo.appId, partitionInfo.shuffleId,
- partitionInfo.shuffleMergeId, partitionInfo.reduceId);
+ logger.debug("{} encountered failure", partitionInfo);
partitionInfo.setCurrentMapIndex(-1);
}
}
@@ -1032,9 +1022,7 @@ public class RemoteBlockPushResolver implements MergedShuffleFileManager {
}
public void setDataFilePos(long dataFilePos) {
- logger.trace("{} shuffleId {} shuffleMergeId {} reduceId {} current pos {}"
- + " update pos {}", appId, shuffleId, shuffleMergeId, reduceId, this.dataFilePos,
- dataFilePos);
+ logger.trace("{} current pos {} update pos {}", this, this.dataFilePos, dataFilePos);
this.dataFilePos = dataFilePos;
}
@@ -1043,9 +1031,7 @@ public class RemoteBlockPushResolver implements MergedShuffleFileManager {
}
void setCurrentMapIndex(int mapIndex) {
- logger.trace("{} shuffleId {} shuffleMergeId {} reduceId {} updated mapIndex {}"
- + " current mapIndex {}", appId, shuffleId, shuffleMergeId, reduceId,
- currentMapIndex, mapIndex);
+ logger.trace("{} mapIndex {} current mapIndex {}", this, currentMapIndex, mapIndex);
this.currentMapIndex = mapIndex;
}
@@ -1054,8 +1040,7 @@ public class RemoteBlockPushResolver implements MergedShuffleFileManager {
}
void blockMerged(int mapIndex) {
- logger.debug("{} shuffleId {} shuffleMergeId {} reduceId {} updated merging mapIndex {}",
- appId, shuffleId, shuffleMergeId, reduceId, mapIndex);
+ logger.debug("{} updated merging mapIndex {}", this, mapIndex);
mapTracker.add(mapIndex);
chunkTracker.add(mapIndex);
lastMergedMapIndex = mapIndex;
@@ -1073,9 +1058,8 @@ public class RemoteBlockPushResolver implements MergedShuffleFileManager {
*/
void updateChunkInfo(long chunkOffset, int mapIndex) throws IOException {
try {
- logger.trace("{} shuffleId {} shuffleMergeId {} reduceId {} index current {}"
- + " updated {}", appId, shuffleId, shuffleMergeId, reduceId,
- this.lastChunkOffset, chunkOffset);
+ logger.trace("{} index current {} updated {}", this, this.lastChunkOffset,
+ chunkOffset);
if (indexMetaUpdateFailed) {
indexFile.getChannel().position(indexFile.getPos());
}
@@ -1103,8 +1087,7 @@ public class RemoteBlockPushResolver implements MergedShuffleFileManager {
return;
}
chunkTracker.add(mapIndex);
- logger.trace("{} shuffleId {} shuffleMergeId {} reduceId {} mapIndex {}"
- + " write chunk to meta file", appId, shuffleId, shuffleMergeId, reduceId, mapIndex);
+ logger.trace("{} mapIndex {} write chunk to meta file", this, mapIndex);
if (indexMetaUpdateFailed) {
metaFile.getChannel().position(metaFile.getPos());
}
@@ -1170,6 +1153,12 @@ public class RemoteBlockPushResolver implements MergedShuffleFileManager {
}
@Override
+ public String toString() {
+ return String.format("Application %s shuffleId %s shuffleMergeId %s reduceId %s",
+ appId, shuffleId, shuffleMergeId, reduceId);
+ }
+
+ @Override
protected void finalize() throws Throwable {
closeAllFilesAndDeleteIfNeeded(false);
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org