You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uniffle.apache.org by ro...@apache.org on 2023/10/08 02:10:51 UTC
[incubator-uniffle] branch master updated: [#1201] improvement: only invoking LOG.debug when LOG.isDebugEnabled() is true (#1217)
This is an automated email from the ASF dual-hosted git repository.
roryqi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git
The following commit(s) were added to refs/heads/master by this push:
new 05aba7068 [#1201] improvement: only invoking LOG.debug when LOG.isDebugEnabled() is true (#1217)
05aba7068 is described below
commit 05aba7068e05b97280f8e0a308ab8d6ef20dcbdc
Author: o'Laoxu <x_...@yeah.net>
AuthorDate: Sun Oct 8 10:10:44 2023 +0800
[#1201] improvement: only invoking LOG.debug when LOG.isDebugEnabled() is true (#1217)
### What changes were proposed in this pull request?
Improve invoking LOG.debug when LOG.isDebugEnabled() is true .
### Why are the changes needed?
Fix: #1201
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
No
---
.../org/apache/uniffle/client/RestClientImpl.java | 8 +++-
.../org/apache/hadoop/mapred/SortWriteBuffer.java | 16 ++++---
.../hadoop/mapred/SortWriteBufferManager.java | 8 +++-
.../mapreduce/task/reduce/RssEventFetcher.java | 4 +-
.../task/reduce/RssRemoteMergeManagerImpl.java | 36 +++++++++-------
.../spark/shuffle/writer/WriteBufferManager.java | 50 ++++++++++++----------
.../java/org/apache/tez/common/TezClassLoader.java | 12 +++---
.../org/apache/tez/dag/app/RssDAGAppMaster.java | 4 +-
.../common/sort/buffer/WriteBufferManager.java | 4 +-
.../client/impl/ShuffleWriteClientImpl.java | 4 +-
.../filesystem/HadoopFilesystemProvider.java | 4 +-
.../netty/client/TransportClientFactory.java | 4 +-
.../netty/handle/TransportRequestHandler.java | 8 +++-
.../coordinator/CoordinatorGrpcService.java | 12 ++++--
.../access/checker/AccessCandidatesChecker.java | 12 ++++--
.../client/impl/grpc/ShuffleServerGrpcClient.java | 36 +++++++++-------
.../impl/grpc/ShuffleServerGrpcNettyClient.java | 22 +++++-----
.../apache/uniffle/server/ShuffleFlushManager.java | 6 ++-
.../uniffle/server/ShuffleServerGrpcService.java | 44 ++++++++++---------
.../server/buffer/ShuffleBufferManager.java | 22 +++++-----
.../server/netty/ShuffleServerNettyHandler.java | 26 +++++------
.../uniffle/storage/common/LocalStorageMeta.java | 4 +-
.../handler/impl/HadoopShuffleWriteHandler.java | 22 ++++++----
.../handler/impl/LocalFileServerReadHandler.java | 22 +++++-----
.../handler/impl/LocalFileWriteHandler.java | 16 ++++---
25 files changed, 242 insertions(+), 164 deletions(-)
diff --git a/cli/src/main/java/org/apache/uniffle/client/RestClientImpl.java b/cli/src/main/java/org/apache/uniffle/client/RestClientImpl.java
index 81f17ad25..46ae30766 100644
--- a/cli/src/main/java/org/apache/uniffle/client/RestClientImpl.java
+++ b/cli/src/main/java/org/apache/uniffle/client/RestClientImpl.java
@@ -97,7 +97,9 @@ public class RestClientImpl implements RestClient {
}
HttpUriRequest httpRequest = requestBuilder.setUri(uri).build();
- LOG.debug("Executing {} request: {}", httpRequest.getMethod(), uri);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Executing {} request: {}", httpRequest.getMethod(), uri);
+ }
ResponseHandler<String> responseHandler =
resp -> {
@@ -112,7 +114,9 @@ public class RestClientImpl implements RestClient {
};
response = httpclient.execute(httpRequest, responseHandler);
- LOG.debug("Response: {}", response);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Response: {}", response);
+ }
} catch (ConnectException | ConnectTimeoutException | NoHttpResponseException e) {
throw new UniffleRestException("Api request failed for " + uri.toString(), e);
} catch (UniffleRestException rethrow) {
diff --git a/client-mr/core/src/main/java/org/apache/hadoop/mapred/SortWriteBuffer.java b/client-mr/core/src/main/java/org/apache/hadoop/mapred/SortWriteBuffer.java
index 4425dc790..d6fc3bbeb 100644
--- a/client-mr/core/src/main/java/org/apache/hadoop/mapred/SortWriteBuffer.java
+++ b/client-mr/core/src/main/java/org/apache/hadoop/mapred/SortWriteBuffer.java
@@ -140,13 +140,15 @@ public class SortWriteBuffer<K, V> extends OutputStream {
private boolean compact(int lastIndex, int lastOffset, int dataLength) {
if (lastIndex != currentIndex) {
- LOG.debug(
- "compact lastIndex {}, currentIndex {}, lastOffset {} currentOffset {} dataLength {}",
- lastIndex,
- currentIndex,
- lastOffset,
- currentOffset,
- dataLength);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(
+ "compact lastIndex {}, currentIndex {}, lastOffset {} currentOffset {} dataLength {}",
+ lastIndex,
+ currentIndex,
+ lastOffset,
+ currentOffset,
+ dataLength);
+ }
WrappedBuffer buffer = new WrappedBuffer(lastOffset + dataLength);
// copy data
int offset = 0;
diff --git a/client-mr/core/src/main/java/org/apache/hadoop/mapred/SortWriteBufferManager.java b/client-mr/core/src/main/java/org/apache/hadoop/mapred/SortWriteBufferManager.java
index 46e94f23c..afa80df75 100644
--- a/client-mr/core/src/main/java/org/apache/hadoop/mapred/SortWriteBufferManager.java
+++ b/client-mr/core/src/main/java/org/apache/hadoop/mapred/SortWriteBufferManager.java
@@ -181,7 +181,9 @@ public class SortWriteBufferManager<K, V> {
if (length > maxMemSize) {
throw new RssException("record is too big");
}
- LOG.debug("memoryUsedSize {} increase {}", memoryUsedSize, length);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("memoryUsedSize {} increase {}", memoryUsedSize, length);
+ }
memoryUsedSize.addAndGet(length);
if (buffer.getDataLength() > maxBufferSize) {
if (waitSendBuffers.remove(buffer)) {
@@ -255,7 +257,9 @@ public class SortWriteBufferManager<K, V> {
} finally {
try {
memoryLock.lock();
- LOG.debug("memoryUsedSize {} decrease {}", memoryUsedSize, size);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("memoryUsedSize {} decrease {}", memoryUsedSize, size);
+ }
memoryUsedSize.addAndGet(-size);
inSendListBytes.addAndGet(-size);
full.signalAll();
diff --git a/client-mr/core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/RssEventFetcher.java b/client-mr/core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/RssEventFetcher.java
index 83987c810..397d45fb2 100644
--- a/client-mr/core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/RssEventFetcher.java
+++ b/client-mr/core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/RssEventFetcher.java
@@ -159,7 +159,9 @@ public class RssEventFetcher<K, V> {
maxEventsToFetch,
(org.apache.hadoop.mapred.TaskAttemptID) reduce);
events = update.getMapTaskCompletionEvents();
- LOG.debug("Got " + events.length + " map completion events from " + fromEventIdx);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Got " + events.length + " map completion events from " + fromEventIdx);
+ }
assert !update.shouldReset() : "Unexpected legacy state";
diff --git a/client-mr/core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/RssRemoteMergeManagerImpl.java b/client-mr/core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/RssRemoteMergeManagerImpl.java
index b1770007a..c9f0d45f0 100644
--- a/client-mr/core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/RssRemoteMergeManagerImpl.java
+++ b/client-mr/core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/RssRemoteMergeManagerImpl.java
@@ -207,30 +207,34 @@ public class RssRemoteMergeManagerImpl<K, V> extends MergeManagerImpl<K, V> {
throws IOException {
// we disable OnDisk MapOutput to avoid merging disk immediate data
if (usedMemory > memoryLimit) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(
+ mapId
+ + ": Stalling shuffle since usedMemory ("
+ + usedMemory
+ + ") is greater than memoryLimit ("
+ + memoryLimit
+ + ")."
+ + " CommitMemory is ("
+ + commitMemory
+ + ")");
+ }
+ return null;
+ }
+
+ // Allow the in-memory shuffle to progress
+ if (LOG.isDebugEnabled()) {
LOG.debug(
mapId
- + ": Stalling shuffle since usedMemory ("
+ + ": Proceeding with shuffle since usedMemory ("
+ usedMemory
- + ") is greater than memoryLimit ("
+ + ") is lesser than memoryLimit ("
+ memoryLimit
+ ")."
- + " CommitMemory is ("
+ + "CommitMemory is ("
+ commitMemory
+ ")");
- return null;
}
-
- // Allow the in-memory shuffle to progress
- LOG.debug(
- mapId
- + ": Proceeding with shuffle since usedMemory ("
- + usedMemory
- + ") is lesser than memoryLimit ("
- + memoryLimit
- + ")."
- + "CommitMemory is ("
- + commitMemory
- + ")");
usedMemory += requestedSize;
// use this rss merger as the callback
return new InMemoryMapOutput<K, V>(jobConf, mapId, this, (int) requestedSize, codec, true);
diff --git a/client-spark/common/src/main/java/org/apache/spark/shuffle/writer/WriteBufferManager.java b/client-spark/common/src/main/java/org/apache/spark/shuffle/writer/WriteBufferManager.java
index 27fb40a21..eaa3ff916 100644
--- a/client-spark/common/src/main/java/org/apache/spark/shuffle/writer/WriteBufferManager.java
+++ b/client-spark/common/src/main/java/org/apache/spark/shuffle/writer/WriteBufferManager.java
@@ -216,16 +216,18 @@ public class WriteBufferManager extends MemoryConsumer {
sentBlocks.add(createShuffleBlock(partitionId, wb));
copyTime += wb.getCopyTime();
buffers.remove(partitionId);
- LOG.debug(
- "Single buffer is full for shuffleId["
- + shuffleId
- + "] partition["
- + partitionId
- + "] with memoryUsed["
- + wb.getMemoryUsed()
- + "], dataLength["
- + wb.getDataLength()
- + "]");
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(
+ "Single buffer is full for shuffleId["
+ + shuffleId
+ + "] partition["
+ + partitionId
+ + "] with memoryUsed["
+ + wb.getMemoryUsed()
+ + "], dataLength["
+ + wb.getDataLength()
+ + "]");
+ }
}
} else {
// The true of hasRequested means the former partitioned buffer has been flushed, that is
@@ -401,12 +403,14 @@ public class WriteBufferManager extends MemoryConsumer {
shuffleBlockInfosPerEvent.add(sbi);
// split shuffle data according to the size
if (totalSize > sendSizeLimit) {
- LOG.debug(
- "Build event with "
- + shuffleBlockInfosPerEvent.size()
- + " blocks and "
- + totalSize
- + " bytes");
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(
+ "Build event with "
+ + shuffleBlockInfosPerEvent.size()
+ + " blocks and "
+ + totalSize
+ + " bytes");
+ }
// Use final temporary variables for closures
final long memoryUsedTemp = memoryUsed;
final List<ShuffleBlockInfo> shuffleBlocksTemp = shuffleBlockInfosPerEvent;
@@ -424,12 +428,14 @@ public class WriteBufferManager extends MemoryConsumer {
}
}
if (!shuffleBlockInfosPerEvent.isEmpty()) {
- LOG.debug(
- "Build event with "
- + shuffleBlockInfosPerEvent.size()
- + " blocks and "
- + totalSize
- + " bytes");
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(
+ "Build event with "
+ + shuffleBlockInfosPerEvent.size()
+ + " blocks and "
+ + totalSize
+ + " bytes");
+ }
// Use final temporary variables for closures
final long memoryUsedTemp = memoryUsed;
final List<ShuffleBlockInfo> shuffleBlocksTemp = shuffleBlockInfosPerEvent;
diff --git a/client-tez/src/main/java/org/apache/tez/common/TezClassLoader.java b/client-tez/src/main/java/org/apache/tez/common/TezClassLoader.java
index 1b2375d87..bc6c2c6c4 100644
--- a/client-tez/src/main/java/org/apache/tez/common/TezClassLoader.java
+++ b/client-tez/src/main/java/org/apache/tez/common/TezClassLoader.java
@@ -70,11 +70,13 @@ public class TezClassLoader extends URLClassLoader {
}
public static void setupTezClassLoader() {
- LOG.debug(
- "Setting up TezClassLoader: thread: {}, current thread classloader: {} system classloader: {}",
- Thread.currentThread().getId(),
- Thread.currentThread().getContextClassLoader(),
- ClassLoader.getSystemClassLoader());
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(
+ "Setting up TezClassLoader: thread: {}, current thread classloader: {} system classloader: {}",
+ Thread.currentThread().getId(),
+ Thread.currentThread().getContextClassLoader(),
+ ClassLoader.getSystemClassLoader());
+ }
Thread.currentThread().setContextClassLoader(INSTANCE);
}
diff --git a/client-tez/src/main/java/org/apache/tez/dag/app/RssDAGAppMaster.java b/client-tez/src/main/java/org/apache/tez/dag/app/RssDAGAppMaster.java
index 742fe6616..55ee72069 100644
--- a/client-tez/src/main/java/org/apache/tez/dag/app/RssDAGAppMaster.java
+++ b/client-tez/src/main/java/org/apache/tez/dag/app/RssDAGAppMaster.java
@@ -204,7 +204,9 @@ public class RssDAGAppMaster extends DAGAppMaster {
() -> {
try {
appMaster.getShuffleWriteClient().sendAppHeartbeat(strAppAttemptId, heartbeatTimeout);
- LOG.debug("Finish send heartbeat to coordinator and servers");
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Finish send heartbeat to coordinator and servers");
+ }
} catch (Exception e) {
LOG.warn("Fail to send heartbeat to coordinator and servers", e);
}
diff --git a/client-tez/src/main/java/org/apache/tez/runtime/library/common/sort/buffer/WriteBufferManager.java b/client-tez/src/main/java/org/apache/tez/runtime/library/common/sort/buffer/WriteBufferManager.java
index b88beddaf..ad0686137 100644
--- a/client-tez/src/main/java/org/apache/tez/runtime/library/common/sort/buffer/WriteBufferManager.java
+++ b/client-tez/src/main/java/org/apache/tez/runtime/library/common/sort/buffer/WriteBufferManager.java
@@ -270,7 +270,9 @@ public class WriteBufferManager<K, V> {
} finally {
try {
memoryLock.lock();
- LOG.debug("memoryUsedSize {} decrease {}", memoryUsedSize, size);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("memoryUsedSize {} decrease {}", memoryUsedSize, size);
+ }
memoryUsedSize.addAndGet(-size);
inSendListBytes.addAndGet(-size);
full.signalAll();
diff --git a/client/src/main/java/org/apache/uniffle/client/impl/ShuffleWriteClientImpl.java b/client/src/main/java/org/apache/uniffle/client/impl/ShuffleWriteClientImpl.java
index d261b35b5..71ee284ab 100644
--- a/client/src/main/java/org/apache/uniffle/client/impl/ShuffleWriteClientImpl.java
+++ b/client/src/main/java/org/apache/uniffle/client/impl/ShuffleWriteClientImpl.java
@@ -228,7 +228,9 @@ public class ShuffleWriteClientImpl implements ShuffleWriteClient {
if (defectiveServers != null) {
defectiveServers.remove(ssi);
}
- LOG.debug("{} successfully.", logMsg);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("{} successfully.", logMsg);
+ }
} else {
if (defectiveServers != null) {
defectiveServers.add(ssi);
diff --git a/common/src/main/java/org/apache/uniffle/common/filesystem/HadoopFilesystemProvider.java b/common/src/main/java/org/apache/uniffle/common/filesystem/HadoopFilesystemProvider.java
index 7d9618cf8..963f36b2f 100644
--- a/common/src/main/java/org/apache/uniffle/common/filesystem/HadoopFilesystemProvider.java
+++ b/common/src/main/java/org/apache/uniffle/common/filesystem/HadoopFilesystemProvider.java
@@ -60,7 +60,9 @@ public class HadoopFilesystemProvider {
}
if (fileSystem instanceof LocalFileSystem) {
- LOGGER.debug("{} is local file system", path);
+ if (LOGGER.isDebugEnabled()) {
+ LOGGER.debug("{} is local file system", path);
+ }
return ((LocalFileSystem) fileSystem).getRawFileSystem();
}
diff --git a/common/src/main/java/org/apache/uniffle/common/netty/client/TransportClientFactory.java b/common/src/main/java/org/apache/uniffle/common/netty/client/TransportClientFactory.java
index f6ff57b60..2ede795f4 100644
--- a/common/src/main/java/org/apache/uniffle/common/netty/client/TransportClientFactory.java
+++ b/common/src/main/java/org/apache/uniffle/common/netty/client/TransportClientFactory.java
@@ -215,7 +215,9 @@ public class TransportClientFactory implements Closeable {
TransportClient client = clientRef.get();
assert client != null : "Channel future completed successfully with null client";
- logger.debug("Connection to {} successful", address);
+ if (logger.isDebugEnabled()) {
+ logger.debug("Connection to {} successful", address);
+ }
return client;
}
diff --git a/common/src/main/java/org/apache/uniffle/common/netty/handle/TransportRequestHandler.java b/common/src/main/java/org/apache/uniffle/common/netty/handle/TransportRequestHandler.java
index dc32aa9e2..2a48d3a7f 100644
--- a/common/src/main/java/org/apache/uniffle/common/netty/handle/TransportRequestHandler.java
+++ b/common/src/main/java/org/apache/uniffle/common/netty/handle/TransportRequestHandler.java
@@ -45,12 +45,16 @@ public class TransportRequestHandler extends MessageHandler<RequestMessage> {
@Override
public void channelActive() {
- logger.debug("channelActive: {}", reverseClient.getSocketAddress());
+ if (logger.isDebugEnabled()) {
+ logger.debug("channelActive: {}", reverseClient.getSocketAddress());
+ }
}
@Override
public void channelInactive() {
- logger.debug("channelInactive: {}", reverseClient.getSocketAddress());
+ if (logger.isDebugEnabled()) {
+ logger.debug("channelInactive: {}", reverseClient.getSocketAddress());
+ }
}
@Override
diff --git a/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorGrpcService.java b/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorGrpcService.java
index f64f32304..b658c329b 100644
--- a/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorGrpcService.java
+++ b/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorGrpcService.java
@@ -175,7 +175,9 @@ public class CoordinatorGrpcService extends CoordinatorServerGrpc.CoordinatorSer
.setRetMsg("")
.setStatus(StatusCode.SUCCESS)
.build();
- LOG.debug("Got heartbeat from {}", serverNode);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Got heartbeat from {}", serverNode);
+ }
responseObserver.onNext(response);
responseObserver.onCompleted();
}
@@ -214,7 +216,9 @@ public class CoordinatorGrpcService extends CoordinatorServerGrpc.CoordinatorSer
AppHeartBeatRequest request, StreamObserver<AppHeartBeatResponse> responseObserver) {
String appId = request.getAppId();
coordinatorServer.getApplicationManager().refreshAppId(appId);
- LOG.debug("Got heartbeat from application: {}", appId);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Got heartbeat from application: {}", appId);
+ }
AppHeartBeatResponse response =
AppHeartBeatResponse.newBuilder().setRetMsg("").setStatus(StatusCode.SUCCESS).build();
@@ -235,7 +239,9 @@ public class CoordinatorGrpcService extends CoordinatorServerGrpc.CoordinatorSer
String appId = request.getAppId();
String user = request.getUser();
coordinatorServer.getApplicationManager().registerApplicationInfo(appId, user);
- LOG.debug("Got a registered application info: {}", appId);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Got a registered application info: {}", appId);
+ }
ApplicationInfoResponse response =
ApplicationInfoResponse.newBuilder().setRetMsg("").setStatus(StatusCode.SUCCESS).build();
diff --git a/coordinator/src/main/java/org/apache/uniffle/coordinator/access/checker/AccessCandidatesChecker.java b/coordinator/src/main/java/org/apache/uniffle/coordinator/access/checker/AccessCandidatesChecker.java
index 1a39c7a2d..9556c367c 100644
--- a/coordinator/src/main/java/org/apache/uniffle/coordinator/access/checker/AccessCandidatesChecker.java
+++ b/coordinator/src/main/java/org/apache/uniffle/coordinator/access/checker/AccessCandidatesChecker.java
@@ -81,7 +81,9 @@ public class AccessCandidatesChecker extends AbstractAccessChecker {
LOG.error(msg);
throw new RssException(msg);
}
- LOG.debug("Load candidates: {}", String.join(";", candidates.get()));
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Load candidates: {}", String.join(";", candidates.get()));
+ }
int updateIntervalS =
conf.getInteger(CoordinatorConf.COORDINATOR_ACCESS_CANDIDATES_UPDATE_INTERVAL_SEC);
@@ -96,7 +98,9 @@ public class AccessCandidatesChecker extends AbstractAccessChecker {
String accessId = accessInfo.getAccessId().trim();
if (!candidates.get().contains(accessId)) {
String msg = String.format("Denied by AccessCandidatesChecker, accessInfo[%s].", accessInfo);
- LOG.debug("Candidates is {}, {}", candidates.get(), msg);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Candidates is {}, {}", candidates.get(), msg);
+ }
CoordinatorMetrics.counterTotalCandidatesDeniedRequest.inc();
return new AccessCheckResult(false, msg);
}
@@ -119,7 +123,9 @@ public class AccessCandidatesChecker extends AbstractAccessChecker {
if (lastCandidatesUpdateMS.get() != lastModifiedMS) {
updateAccessCandidatesInternal();
lastCandidatesUpdateMS.set(lastModifiedMS);
- LOG.debug("Load candidates: {}", String.join(";", candidates.get()));
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Load candidates: {}", String.join(";", candidates.get()));
+ }
}
} else {
LOG.warn("Candidates file not found.");
diff --git a/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcClient.java b/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcClient.java
index b7f976ead..fa9a0520b 100644
--- a/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcClient.java
+++ b/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcClient.java
@@ -258,12 +258,14 @@ public class ShuffleServerGrpcClient extends GrpcClient implements ShuffleServer
retry++;
}
if (rpcResponse.getStatus() == RssProtos.StatusCode.SUCCESS) {
- LOG.debug(
- "Require preAllocated size of {} from {}:{}, cost: {}(ms)",
- requireSize,
- host,
- port,
- System.currentTimeMillis() - start);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(
+ "Require preAllocated size of {} from {}:{}, cost: {}(ms)",
+ requireSize,
+ host,
+ port,
+ System.currentTimeMillis() - start);
+ }
result = rpcResponse.getRequireBufferId();
} else if (rpcResponse.getStatus() == RssProtos.StatusCode.NO_REGISTER) {
String msg =
@@ -420,16 +422,18 @@ public class ShuffleServerGrpcClient extends GrpcClient implements ShuffleServer
.setTimestamp(start)
.build();
SendShuffleDataResponse response = getBlockingStub().sendShuffleData(rpcRequest);
- LOG.debug(
- "Do sendShuffleData to {}:{} rpc cost:"
- + (System.currentTimeMillis() - start)
- + " ms for "
- + allocateSize
- + " bytes with "
- + finalBlockNum
- + " blocks",
- host,
- port);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(
+ "Do sendShuffleData to {}:{} rpc cost:"
+ + (System.currentTimeMillis() - start)
+ + " ms for "
+ + allocateSize
+ + " bytes with "
+ + finalBlockNum
+ + " blocks",
+ host,
+ port);
+ }
if (response.getStatus() != RssProtos.StatusCode.SUCCESS) {
String msg =
"Can't send shuffle data with "
diff --git a/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcNettyClient.java b/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcNettyClient.java
index 2fe15aae7..cb8e1e623 100644
--- a/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcNettyClient.java
+++ b/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcNettyClient.java
@@ -117,16 +117,18 @@ public class ShuffleServerGrpcNettyClient extends ShuffleServerGrpcClient {
long start = System.currentTimeMillis();
RpcResponse rpcResponse =
transportClient.sendRpcSync(sendShuffleDataRequest, RPC_TIMEOUT_DEFAULT_MS);
- LOG.debug(
- "Do sendShuffleData to {}:{} rpc cost:"
- + (System.currentTimeMillis() - start)
- + " ms for "
- + allocateSize
- + " bytes with "
- + finalBlockNum
- + " blocks",
- host,
- port);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(
+ "Do sendShuffleData to {}:{} rpc cost:"
+ + (System.currentTimeMillis() - start)
+ + " ms for "
+ + allocateSize
+ + " bytes with "
+ + finalBlockNum
+ + " blocks",
+ host,
+ port);
+ }
if (rpcResponse.getStatusCode() != StatusCode.SUCCESS) {
String msg =
"Can't send shuffle data with "
diff --git a/server/src/main/java/org/apache/uniffle/server/ShuffleFlushManager.java b/server/src/main/java/org/apache/uniffle/server/ShuffleFlushManager.java
index ba46dfab0..036ad69d0 100644
--- a/server/src/main/java/org/apache/uniffle/server/ShuffleFlushManager.java
+++ b/server/src/main/java/org/apache/uniffle/server/ShuffleFlushManager.java
@@ -102,8 +102,10 @@ public class ShuffleFlushManager {
if (shuffleServer != null) {
long duration = System.currentTimeMillis() - start;
if (writeSuccess) {
- LOG.debug(
- "Flush to file success in {} ms and release {} bytes", duration, event.getSize());
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(
+ "Flush to file success in {} ms and release {} bytes", duration, event.getSize());
+ }
} else {
ShuffleServerMetrics.counterTotalFailedWrittenEventNum.inc();
LOG.error(
diff --git a/server/src/main/java/org/apache/uniffle/server/ShuffleServerGrpcService.java b/server/src/main/java/org/apache/uniffle/server/ShuffleServerGrpcService.java
index 279180edb..7f0792b16 100644
--- a/server/src/main/java/org/apache/uniffle/server/ShuffleServerGrpcService.java
+++ b/server/src/main/java/org/apache/uniffle/server/ShuffleServerGrpcService.java
@@ -287,18 +287,20 @@ public class ShuffleServerGrpcService extends ShuffleServerImplBase {
shuffleServer
.getGrpcMetrics()
.recordProcessTime(ShuffleServerGrpcMetrics.SEND_SHUFFLE_DATA_METHOD, costTime);
- LOG.debug(
- "Cache Shuffle Data for appId["
- + appId
- + "], shuffleId["
- + shuffleId
- + "], cost "
- + costTime
- + " ms with "
- + shufflePartitionedData.size()
- + " blocks and "
- + requireSize
- + " bytes");
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(
+ "Cache Shuffle Data for appId["
+ + appId
+ + "], shuffleId["
+ + shuffleId
+ + "], cost "
+ + costTime
+ + " ms with "
+ + shufflePartitionedData.size()
+ + " blocks and "
+ + requireSize
+ + " bytes");
+ }
} else {
reply =
SendShuffleDataResponse.newBuilder()
@@ -328,14 +330,16 @@ public class ShuffleServerGrpcService extends ShuffleServerImplBase {
throw new IllegalStateException("AppId " + appId + " was removed already");
}
commitCount = shuffleServer.getShuffleTaskManager().updateAndGetCommitCount(appId, shuffleId);
- LOG.debug(
- "Get commitShuffleTask request for appId["
- + appId
- + "], shuffleId["
- + shuffleId
- + "], currentCommitted["
- + commitCount
- + "]");
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(
+ "Get commitShuffleTask request for appId["
+ + appId
+ + "], shuffleId["
+ + shuffleId
+ + "], currentCommitted["
+ + commitCount
+ + "]");
+ }
} catch (Exception e) {
status = StatusCode.INTERNAL_ERROR;
msg = "Error happened when commit for appId[" + appId + "], shuffleId[" + shuffleId + "]";
diff --git a/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferManager.java b/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferManager.java
index 2076f6705..9f23f0fda 100644
--- a/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferManager.java
+++ b/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferManager.java
@@ -323,16 +323,18 @@ public class ShuffleBufferManager {
}
return true;
}
- LOG.debug(
- "Require memory failed with "
- + size
- + " bytes, usedMemory["
- + usedMemory.get()
- + "] include preAllocation["
- + preAllocatedSize.get()
- + "], inFlushSize["
- + inFlushSize.get()
- + "]");
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(
+ "Require memory failed with "
+ + size
+ + " bytes, usedMemory["
+ + usedMemory.get()
+ + "] include preAllocation["
+ + preAllocatedSize.get()
+ + "], inFlushSize["
+ + inFlushSize.get()
+ + "]");
+ }
return false;
}
diff --git a/server/src/main/java/org/apache/uniffle/server/netty/ShuffleServerNettyHandler.java b/server/src/main/java/org/apache/uniffle/server/netty/ShuffleServerNettyHandler.java
index 30a90623b..fd1c99e62 100644
--- a/server/src/main/java/org/apache/uniffle/server/netty/ShuffleServerNettyHandler.java
+++ b/server/src/main/java/org/apache/uniffle/server/netty/ShuffleServerNettyHandler.java
@@ -193,18 +193,20 @@ public class ShuffleServerNettyHandler implements BaseMessageHandler {
shuffleServer
.getNettyMetrics()
.recordProcessTime(SendShuffleDataRequest.class.getName(), costTime);
- LOG.debug(
- "Cache Shuffle Data for appId["
- + appId
- + "], shuffleId["
- + shuffleId
- + "], cost "
- + costTime
- + " ms with "
- + shufflePartitionedData.size()
- + " blocks and "
- + requireSize
- + " bytes");
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(
+ "Cache Shuffle Data for appId["
+ + appId
+ + "], shuffleId["
+ + shuffleId
+ + "], cost "
+ + costTime
+ + " ms with "
+ + shufflePartitionedData.size()
+ + " blocks and "
+ + requireSize
+ + " bytes");
+ }
} else {
rpcResponse =
new RpcResponse(req.getRequestId(), StatusCode.INTERNAL_ERROR, "No data in request");
diff --git a/storage/src/main/java/org/apache/uniffle/storage/common/LocalStorageMeta.java b/storage/src/main/java/org/apache/uniffle/storage/common/LocalStorageMeta.java
index c23e08a9a..056ca4f6b 100644
--- a/storage/src/main/java/org/apache/uniffle/storage/common/LocalStorageMeta.java
+++ b/storage/src/main/java/org/apache/uniffle/storage/common/LocalStorageMeta.java
@@ -114,7 +114,9 @@ public class LocalStorageMeta {
private ShuffleMeta getShuffleMeta(String shuffleKey) {
ShuffleMeta shuffleMeta = shuffleMetaMap.get(shuffleKey);
if (shuffleMeta == null) {
- LOG.debug("Shuffle {} metadata has been removed!", shuffleKey);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Shuffle {} metadata has been removed!", shuffleKey);
+ }
}
return shuffleMeta;
}
diff --git a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/HadoopShuffleWriteHandler.java b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/HadoopShuffleWriteHandler.java
index 0d254d2b9..eb81c4089 100644
--- a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/HadoopShuffleWriteHandler.java
+++ b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/HadoopShuffleWriteHandler.java
@@ -138,10 +138,12 @@ public class HadoopShuffleWriteHandler implements ShuffleWriteHandler {
block.getTaskAttemptId());
indexWriter.writeIndex(segment);
}
- LOG.debug(
- "Write handler inside cost {} ms for {}",
- (System.currentTimeMillis() - ss),
- fileNamePrefix);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(
+ "Write handler inside cost {} ms for {}",
+ (System.currentTimeMillis() - ss),
+ fileNamePrefix);
+ }
} catch (IOException e) {
LOG.warn(
"Write failed with "
@@ -157,11 +159,13 @@ public class HadoopShuffleWriteHandler implements ShuffleWriteHandler {
} finally {
writeLock.unlock();
}
- LOG.debug(
- "Write handler outside write {} blocks cost {} ms for {}",
- shuffleBlocks.size(),
- (System.currentTimeMillis() - start),
- fileNamePrefix);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(
+ "Write handler outside write {} blocks cost {} ms for {}",
+ shuffleBlocks.size(),
+ (System.currentTimeMillis() - start),
+ fileNamePrefix);
+ }
}
@VisibleForTesting
diff --git a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/LocalFileServerReadHandler.java b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/LocalFileServerReadHandler.java
index b211fb4ec..f688a18bc 100644
--- a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/LocalFileServerReadHandler.java
+++ b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/LocalFileServerReadHandler.java
@@ -65,16 +65,18 @@ public class LocalFileServerReadHandler implements ServerReadHandler {
long start = System.currentTimeMillis();
prepareFilePath(appId, shuffleId, partitionId, partitionNumPerRange, partitionNum, path);
- LOG.debug(
- "Prepare for appId["
- + appId
- + "], shuffleId["
- + shuffleId
- + "], partitionId["
- + partitionId
- + "] cost "
- + (System.currentTimeMillis() - start)
- + " ms");
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(
+ "Prepare for appId["
+ + appId
+ + "], shuffleId["
+ + shuffleId
+ + "], partitionId["
+ + partitionId
+ + "] cost "
+ + (System.currentTimeMillis() - start)
+ + " ms");
+ }
}
private void prepareFilePath(
diff --git a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/LocalFileWriteHandler.java b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/LocalFileWriteHandler.java
index aaab64707..4b06e5aa9 100644
--- a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/LocalFileWriteHandler.java
+++ b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/LocalFileWriteHandler.java
@@ -115,15 +115,19 @@ public class LocalFileWriteHandler implements ShuffleWriteHandler {
block.getTaskAttemptId());
indexWriter.writeIndex(segment);
}
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(
+ "Write handler write {} blocks cost {} ms without file open close",
+ shuffleBlocks.size(),
+ (System.currentTimeMillis() - startTime));
+ }
+ }
+ if (LOG.isDebugEnabled()) {
LOG.debug(
- "Write handler write {} blocks cost {} ms without file open close",
+ "Write handler write {} blocks cost {} ms with file open close",
shuffleBlocks.size(),
- (System.currentTimeMillis() - startTime));
+ (System.currentTimeMillis() - accessTime));
}
- LOG.debug(
- "Write handler write {} blocks cost {} ms with file open close",
- shuffleBlocks.size(),
- (System.currentTimeMillis() - accessTime));
}
private LocalFileWriter createWriter(String fileName) throws IOException, IllegalStateException {