You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@uniffle.apache.org by GitBox <gi...@apache.org> on 2023/01/04 08:50:18 UTC

[GitHub] [incubator-uniffle] jerqi commented on a diff in pull request #383: [ISSUE-380] Refactor the flush process to fix fallback fail

jerqi commented on code in PR #383:
URL: https://github.com/apache/incubator-uniffle/pull/383#discussion_r1061252293


##########
server/src/main/java/org/apache/uniffle/server/ShuffleFlushManager.java:
##########
@@ -142,87 +143,96 @@ public void addToFlushQueue(ShuffleDataFlushEvent event) {
     }
   }
 
-  private void flushToFile(ShuffleDataFlushEvent event) {
-
-    Storage storage = storageManager.selectStorage(event);
-    if (storage != null && !storage.canWrite()) {
-      addPendingEvents(event);
-      return;
-    }
-
+  private void flushToFileImpl(ShuffleDataFlushEvent event) {
     long start = System.currentTimeMillis();
-    List<ShufflePartitionedBlock> blocks = event.getShuffleBlocks();
     boolean writeSuccess = false;
-    try {
-      // storage info maybe null if the application cache was cleared already
-      if (storage != null) {
-        if (blocks == null || blocks.isEmpty()) {
-          LOG.info("There is no block to be flushed: " + event);
-        } else if (!event.isValid()) {
-          //  avoid printing error log
+
+    while (true) {
+      try {
+        if (!event.isValid()) {
           writeSuccess = true;
           LOG.warn("AppId {} was removed already, event {} should be dropped", event.getAppId(), event);
-        } else {
-          String user = StringUtils.defaultString(
-              shuffleServer.getShuffleTaskManager().getUserByAppId(event.getAppId()),
-              StringUtils.EMPTY
-          );
-          CreateShuffleWriteHandlerRequest request = new CreateShuffleWriteHandlerRequest(
-                  storageType,
-                  event.getAppId(),
-                  event.getShuffleId(),
-                  event.getStartPartition(),
-                  event.getEndPartition(),
-                  storageBasePaths.toArray(new String[storageBasePaths.size()]),
-                  shuffleServerId,
-                  hadoopConf,
-                  storageDataReplica,
-                  user);
-          ShuffleWriteHandler handler = storage.getOrCreateWriteHandler(request);
-          do {
-            if (event.getRetryTimes() > retryMax) {
-              LOG.error("Failed to write data for " + event + " in " + retryMax + " times, shuffle data will be lost");
-              ShuffleServerMetrics.incStorageFailedCounter(storage.getStorageHost());
-              break;
-            }
-            if (!event.isValid()) {
-              LOG.warn("AppId {} was removed already, event {} should be dropped, may leak one handler",
-                  event.getAppId(), event);
-              //  avoid printing error log
-              writeSuccess = true;
-              break;
-            }
+          break;
+        }
+
+        if (event.getRetryTimes() > retryMax) {
+          LOG.error("Failed to write data for " + event + " in " + retryMax + " times, shuffle data will be lost");
+          ShuffleServerMetrics.incStorageFailedCounter(event.getUnderStorage().getStorageHost());
+          break;
+        }
 
-            writeSuccess = storageManager.write(storage, handler, event);
+        List<ShufflePartitionedBlock> blocks = event.getShuffleBlocks();
+        if (blocks == null || blocks.isEmpty()) {
+          LOG.info("There is no block to be flushed: " + event);
+          break;
+        }
 
-            if (writeSuccess) {
-              updateCommittedBlockIds(event.getAppId(), event.getShuffleId(), blocks);
-              ShuffleServerMetrics.incStorageSuccessCounter(storage.getStorageHost());
+        Storage storage = storageManager.selectStorage(event);
+        if (storage == null) {
+          break;
+        }
+
+        if (!storage.canWrite()) {
+          if (storageManager instanceof MultiStorageManager) {

Review Comment:
   Is there any process about this optimization?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org