You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@uniffle.apache.org by GitBox <gi...@apache.org> on 2022/08/05 11:26:04 UTC

[GitHub] [incubator-uniffle] frankliee commented on a diff in pull request #129: [Improvement] try read from backup shuffle servers when fetched data is inconsistent

frankliee commented on code in PR #129:
URL: https://github.com/apache/incubator-uniffle/pull/129#discussion_r938719586


##########
storage/src/main/java/org/apache/uniffle/storage/handler/impl/MemoryQuorumClientReadHandler.java:
##########
@@ -20,54 +20,58 @@
 import java.util.List;
 
 import com.google.common.collect.Lists;
+import org.roaringbitmap.longlong.Roaring64NavigableMap;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.uniffle.client.api.ShuffleServerClient;
 import org.apache.uniffle.common.ShuffleDataResult;
-import org.apache.uniffle.common.exception.RssException;
 import org.apache.uniffle.common.util.Constants;
+import org.apache.uniffle.common.util.RetryUtils;
 
 public class MemoryQuorumClientReadHandler extends AbstractClientReadHandler {
 
   private static final Logger LOG = LoggerFactory.getLogger(MemoryQuorumClientReadHandler.class);
   private long lastBlockId = Constants.INVALID_BLOCK_ID;
   private List<MemoryClientReadHandler> handlers = Lists.newLinkedList();
+  private int currentHandlerIdx = 0;
 
   public MemoryQuorumClientReadHandler(
       String appId,
       int shuffleId,
       int partitionId,
       int readBufferSize,
-      List<ShuffleServerClient> shuffleServerClients) {
+      List<ShuffleServerClient> shuffleServerClients, Roaring64NavigableMap processBlockIds) {
     this.appId = appId;
     this.shuffleId = shuffleId;
     this.partitionId = partitionId;
     this.readBufferSize = readBufferSize;
     shuffleServerClients.forEach(client ->
         handlers.add(new MemoryClientReadHandler(
-            appId, shuffleId, partitionId, readBufferSize, client))
+            appId, shuffleId, partitionId, readBufferSize, client, processBlockIds))
     );
   }
 
   @Override
   public ShuffleDataResult readShuffleData() {
-    boolean readSuccessful = false;
     ShuffleDataResult result = null;
-
-    for (MemoryClientReadHandler handler: handlers) {
+    while (currentHandlerIdx < handlers.size()) {
       try {
-        result = handler.readShuffleData();
-        readSuccessful = true;
-        break;
-      } catch (Exception e) {
-        LOG.warn("Failed to read a replica due to ", e);
+        result = RetryUtils.retry(() -> {
+          MemoryClientReadHandler handler = handlers.get(currentHandlerIdx);
+          ShuffleDataResult shuffleDataResult = handler.readShuffleData();
+          return shuffleDataResult;
+        }, 1000, 3);
+      } catch (Throwable e) {
+        LOG.warn("Failed to read a replica for appId[" + appId + "], shuffleId["
+            + shuffleId + "], partitionId[" + partitionId + "] due to ", e);
       }
-    }
 
-    if (!readSuccessful) {
-      throw new RssException("Failed to read in memory shuffle data for appId[" + appId
-          + "], shuffleId[" + shuffleId + "], partitionId[" + partitionId + "]");
+      if (result == null || result.isEmpty()) {
+        currentHandlerIdx++;
+        continue;
+      }
+      return result;

Review Comment:
   Why removing throw RssException?
   What about the case when all servers are not ok



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org