You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@uniffle.apache.org by GitBox <gi...@apache.org> on 2022/08/05 10:39:03 UTC

[GitHub] [incubator-uniffle] xianjingfeng opened a new pull request, #129: try read from all shuffle servers when read shuffle data

xianjingfeng opened a new pull request, #129:
URL: https://github.com/apache/incubator-uniffle/pull/129

   <!--
   Thanks for sending a pull request!  Here are some tips for you:
     1. If this is your first time, please read our contributor guidelines: https://github.com/Tencent/Firestorm/blob/master/CONTRIBUTING.md
     2. Ensure you have added or run the appropriate tests for your PR
     3. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP]XXXX Your PR title ...'.
     4. Be sure to keep the PR description updated to reflect all changes.
     5. Please write your PR title to summarize what this PR proposes.
     6. If possible, provide a concise example to reproduce the issue for a faster review.
   -->
   
   ### What changes were proposed in this pull request?
   <!--
   Please clarify what changes you are proposing. The purpose of this section is to outline the changes and how this PR fixes the issue.
   If possible, please consider writing useful notes for better and faster reviews in your PR. See the examples below.
     1. If you refactor some codes with changing classes, showing the class hierarchy will help reviewers.
     2. If you fix some SQL features, you can provide some references of other DBMSes.
     3. If there is design documentation, please add the link.
     4. If there is a discussion in the mailing list, please add the link.
   -->
   1.In client side, Try read from all shuffle servers when read shuffle data. When read shuffle data from memory, pass `processBlockIds` to server side to achieve exclude duplication blocks
   2.In server side, If index file not found, return empty result
   
   ### Why are the changes needed?
   <!--
   Please clarify why the changes are needed. For instance,
     1. If you propose a new API, clarify the use case for a new API.
     2. If you fix a bug, you can clarify why it is a bug.
   -->
   When the data in this first server is damaged, application will fail. #124 
   
   ### Does this PR introduce _any_ user-facing change?
   <!--
   Note that it means *any* user-facing change including all aspects such as the documentation fix.
   If yes, please clarify the previous behavior and the change this PR proposes - provide the console output, description and/or an example to show the behavior difference if possible.
   If possible, please also clarify if this is a user-facing change compared to the released versions or within the unreleased branches such as master.
   If no, write 'No'.
   -->
   No
   
   ### How was this patch tested?
   <!--
   If tests were added, say they were added here. Please make sure to add some test cases that check the changes thoroughly including negative and positive cases if possible.
   If it was tested in a way different from regular unit tests, please clarify how you tested step by step, ideally copy and paste-able, so that other reviewers can test and check, and descendants can verify in the future.
   If tests were not added, please describe why they were not added and/or why it was difficult to add.
   -->
   Already added


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] frankliee commented on a diff in pull request #129: [Improvement] try read from backup shuffle servers when fetched data is inconsistent

Posted by GitBox <gi...@apache.org>.
frankliee commented on code in PR #129:
URL: https://github.com/apache/incubator-uniffle/pull/129#discussion_r938911308


##########
storage/src/main/java/org/apache/uniffle/storage/handler/impl/MemoryQuorumClientReadHandler.java:
##########
@@ -20,54 +20,58 @@
 import java.util.List;
 
 import com.google.common.collect.Lists;
+import org.roaringbitmap.longlong.Roaring64NavigableMap;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.uniffle.client.api.ShuffleServerClient;
 import org.apache.uniffle.common.ShuffleDataResult;
-import org.apache.uniffle.common.exception.RssException;
 import org.apache.uniffle.common.util.Constants;
+import org.apache.uniffle.common.util.RetryUtils;
 
 public class MemoryQuorumClientReadHandler extends AbstractClientReadHandler {
 
   private static final Logger LOG = LoggerFactory.getLogger(MemoryQuorumClientReadHandler.class);
   private long lastBlockId = Constants.INVALID_BLOCK_ID;
   private List<MemoryClientReadHandler> handlers = Lists.newLinkedList();
+  private int currentHandlerIdx = 0;
 
   public MemoryQuorumClientReadHandler(
       String appId,
       int shuffleId,
       int partitionId,
       int readBufferSize,
-      List<ShuffleServerClient> shuffleServerClients) {
+      List<ShuffleServerClient> shuffleServerClients, Roaring64NavigableMap processBlockIds) {
     this.appId = appId;
     this.shuffleId = shuffleId;
     this.partitionId = partitionId;
     this.readBufferSize = readBufferSize;
     shuffleServerClients.forEach(client ->
         handlers.add(new MemoryClientReadHandler(
-            appId, shuffleId, partitionId, readBufferSize, client))
+            appId, shuffleId, partitionId, readBufferSize, client, processBlockIds))
     );
   }
 
   @Override
   public ShuffleDataResult readShuffleData() {
-    boolean readSuccessful = false;
     ShuffleDataResult result = null;
-
-    for (MemoryClientReadHandler handler: handlers) {
+    while (currentHandlerIdx < handlers.size()) {
       try {
-        result = handler.readShuffleData();
-        readSuccessful = true;
-        break;
-      } catch (Exception e) {
-        LOG.warn("Failed to read a replica due to ", e);
+        result = RetryUtils.retry(() -> {
+          MemoryClientReadHandler handler = handlers.get(currentHandlerIdx);
+          ShuffleDataResult shuffleDataResult = handler.readShuffleData();
+          return shuffleDataResult;

Review Comment:
   I think using retry and timeout in there is unnecessary, because the client will fetch "multiple servers"(it is also retry), and each gRPC client has interval timeout. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] frankliee commented on a diff in pull request #129: [Improvement] try read from backup shuffle servers when fetched data is inconsistent

Posted by GitBox <gi...@apache.org>.
frankliee commented on code in PR #129:
URL: https://github.com/apache/incubator-uniffle/pull/129#discussion_r939021730


##########
storage/src/main/java/org/apache/uniffle/storage/handler/impl/MemoryQuorumClientReadHandler.java:
##########
@@ -20,54 +20,58 @@
 import java.util.List;
 
 import com.google.common.collect.Lists;
+import org.roaringbitmap.longlong.Roaring64NavigableMap;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.uniffle.client.api.ShuffleServerClient;
 import org.apache.uniffle.common.ShuffleDataResult;
-import org.apache.uniffle.common.exception.RssException;
 import org.apache.uniffle.common.util.Constants;
+import org.apache.uniffle.common.util.RetryUtils;
 
 public class MemoryQuorumClientReadHandler extends AbstractClientReadHandler {
 
   private static final Logger LOG = LoggerFactory.getLogger(MemoryQuorumClientReadHandler.class);
   private long lastBlockId = Constants.INVALID_BLOCK_ID;
   private List<MemoryClientReadHandler> handlers = Lists.newLinkedList();
+  private int currentHandlerIdx = 0;
 
   public MemoryQuorumClientReadHandler(
       String appId,
       int shuffleId,
       int partitionId,
       int readBufferSize,
-      List<ShuffleServerClient> shuffleServerClients) {
+      List<ShuffleServerClient> shuffleServerClients, Roaring64NavigableMap processBlockIds) {
     this.appId = appId;
     this.shuffleId = shuffleId;
     this.partitionId = partitionId;
     this.readBufferSize = readBufferSize;
     shuffleServerClients.forEach(client ->
         handlers.add(new MemoryClientReadHandler(
-            appId, shuffleId, partitionId, readBufferSize, client))
+            appId, shuffleId, partitionId, readBufferSize, client, processBlockIds))
     );
   }
 
   @Override
   public ShuffleDataResult readShuffleData() {
-    boolean readSuccessful = false;
     ShuffleDataResult result = null;
-
-    for (MemoryClientReadHandler handler: handlers) {
+    while (currentHandlerIdx < handlers.size()) {
       try {
-        result = handler.readShuffleData();
-        readSuccessful = true;
-        break;
-      } catch (Exception e) {
-        LOG.warn("Failed to read a replica due to ", e);
+        result = RetryUtils.retry(() -> {
+          MemoryClientReadHandler handler = handlers.get(currentHandlerIdx);
+          ShuffleDataResult shuffleDataResult = handler.readShuffleData();
+          return shuffleDataResult;

Review Comment:
   If this optimization is necessary, it should be put the place where gPRC is firstly used.
   The QuorumClient should only deal with the interaction of servers.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] xianjingfeng commented on a diff in pull request #129: [Improvement] try read from backup shuffle servers when fetched data is inconsistent

Posted by GitBox <gi...@apache.org>.
xianjingfeng commented on code in PR #129:
URL: https://github.com/apache/incubator-uniffle/pull/129#discussion_r939502319


##########
storage/src/main/java/org/apache/uniffle/storage/handler/impl/MemoryQuorumClientReadHandler.java:
##########
@@ -20,54 +20,58 @@
 import java.util.List;
 
 import com.google.common.collect.Lists;
+import org.roaringbitmap.longlong.Roaring64NavigableMap;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.uniffle.client.api.ShuffleServerClient;
 import org.apache.uniffle.common.ShuffleDataResult;
-import org.apache.uniffle.common.exception.RssException;
 import org.apache.uniffle.common.util.Constants;
+import org.apache.uniffle.common.util.RetryUtils;
 
 public class MemoryQuorumClientReadHandler extends AbstractClientReadHandler {
 
   private static final Logger LOG = LoggerFactory.getLogger(MemoryQuorumClientReadHandler.class);
   private long lastBlockId = Constants.INVALID_BLOCK_ID;
   private List<MemoryClientReadHandler> handlers = Lists.newLinkedList();
+  private int currentHandlerIdx = 0;
 
   public MemoryQuorumClientReadHandler(
       String appId,
       int shuffleId,
       int partitionId,
       int readBufferSize,
-      List<ShuffleServerClient> shuffleServerClients) {
+      List<ShuffleServerClient> shuffleServerClients, Roaring64NavigableMap processBlockIds) {
     this.appId = appId;
     this.shuffleId = shuffleId;
     this.partitionId = partitionId;
     this.readBufferSize = readBufferSize;
     shuffleServerClients.forEach(client ->
         handlers.add(new MemoryClientReadHandler(
-            appId, shuffleId, partitionId, readBufferSize, client))
+            appId, shuffleId, partitionId, readBufferSize, client, processBlockIds))
     );
   }
 
   @Override
   public ShuffleDataResult readShuffleData() {
-    boolean readSuccessful = false;
     ShuffleDataResult result = null;
-
-    for (MemoryClientReadHandler handler: handlers) {
+    while (currentHandlerIdx < handlers.size()) {
       try {
-        result = handler.readShuffleData();
-        readSuccessful = true;
-        break;
-      } catch (Exception e) {
-        LOG.warn("Failed to read a replica due to ", e);
+        result = RetryUtils.retry(() -> {
+          MemoryClientReadHandler handler = handlers.get(currentHandlerIdx);
+          ShuffleDataResult shuffleDataResult = handler.readShuffleData();
+          return shuffleDataResult;

Review Comment:
   If replica=3, replicaWrite=2, and replicaRead=2, In most cases, no shuffle data will be send to the third shuffle server. In this case, no redundant data will be read because we just read from the first and the third server



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] xianjingfeng closed pull request #129: [Improvement] try read from backup shuffle servers when fetched data is inconsistent

Posted by GitBox <gi...@apache.org>.
xianjingfeng closed pull request #129: [Improvement] try read from backup shuffle servers when fetched data is inconsistent
URL: https://github.com/apache/incubator-uniffle/pull/129


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] xianjingfeng commented on a diff in pull request #129: [Improvement] try read from backup shuffle servers when fetched data is inconsistent

Posted by GitBox <gi...@apache.org>.
xianjingfeng commented on code in PR #129:
URL: https://github.com/apache/incubator-uniffle/pull/129#discussion_r938923704


##########
storage/src/main/java/org/apache/uniffle/storage/handler/impl/MemoryQuorumClientReadHandler.java:
##########
@@ -20,54 +20,58 @@
 import java.util.List;
 
 import com.google.common.collect.Lists;
+import org.roaringbitmap.longlong.Roaring64NavigableMap;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.uniffle.client.api.ShuffleServerClient;
 import org.apache.uniffle.common.ShuffleDataResult;
-import org.apache.uniffle.common.exception.RssException;
 import org.apache.uniffle.common.util.Constants;
+import org.apache.uniffle.common.util.RetryUtils;
 
 public class MemoryQuorumClientReadHandler extends AbstractClientReadHandler {
 
   private static final Logger LOG = LoggerFactory.getLogger(MemoryQuorumClientReadHandler.class);
   private long lastBlockId = Constants.INVALID_BLOCK_ID;
   private List<MemoryClientReadHandler> handlers = Lists.newLinkedList();
+  private int currentHandlerIdx = 0;
 
   public MemoryQuorumClientReadHandler(
       String appId,
       int shuffleId,
       int partitionId,
       int readBufferSize,
-      List<ShuffleServerClient> shuffleServerClients) {
+      List<ShuffleServerClient> shuffleServerClients, Roaring64NavigableMap processBlockIds) {
     this.appId = appId;
     this.shuffleId = shuffleId;
     this.partitionId = partitionId;
     this.readBufferSize = readBufferSize;
     shuffleServerClients.forEach(client ->
         handlers.add(new MemoryClientReadHandler(
-            appId, shuffleId, partitionId, readBufferSize, client))
+            appId, shuffleId, partitionId, readBufferSize, client, processBlockIds))
     );
   }
 
   @Override
   public ShuffleDataResult readShuffleData() {
-    boolean readSuccessful = false;
     ShuffleDataResult result = null;
-
-    for (MemoryClientReadHandler handler: handlers) {
+    while (currentHandlerIdx < handlers.size()) {
       try {
-        result = handler.readShuffleData();
-        readSuccessful = true;
-        break;
-      } catch (Exception e) {
-        LOG.warn("Failed to read a replica due to ", e);
+        result = RetryUtils.retry(() -> {
+          MemoryClientReadHandler handler = handlers.get(currentHandlerIdx);
+          ShuffleDataResult shuffleDataResult = handler.readShuffleData();
+          return shuffleDataResult;

Review Comment:
   I think it is necessary, because i found if shuffle server in high load, rpc requests are easy fail, and i think this is also a reason of why blocks inconsistent



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] xianjingfeng commented on pull request #129: [Improvement] try read from backup shuffle servers when fetched data is inconsistent

Posted by GitBox <gi...@apache.org>.
xianjingfeng commented on PR #129:
URL: https://github.com/apache/incubator-uniffle/pull/129#issuecomment-1206577909

   > > > I suggest to focus on the client side fallback in this PR. Skipping memory data may need careful design on the server side, which could be put in an another PR along with design doc.
   > > 
   > > 
   > > I think this change in server side is not so big. Should we create a design doc?
   > 
   > But each PR should do one thing as much as we can.
   
   Agree with you. but this pr will not work without server side change


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] xianjingfeng commented on pull request #129: [Improvement] try read from backup shuffle servers when fetched data is inconsistent

Posted by GitBox <gi...@apache.org>.
xianjingfeng commented on PR #129:
URL: https://github.com/apache/incubator-uniffle/pull/129#issuecomment-1206367630

   > I suggest to focus on the client side fallback in this PR. Skipping memory data may need careful design on the server side, which could be put in an another PR along with design doc.
   
   I think this change in server side is not so big. Should we create a design doc?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] frankliee commented on pull request #129: [Improvement] try read from backup shuffle servers when fetched data is inconsistent

Posted by GitBox <gi...@apache.org>.
frankliee commented on PR #129:
URL: https://github.com/apache/incubator-uniffle/pull/129#issuecomment-1206563765

   > > I suggest to focus on the client side fallback in this PR. Skipping memory data may need careful design on the server side, which could be put in an another PR along with design doc.
   > 
   > I think this change in server side is not so big. Should we create a design doc?
   
   But each PR should do one thing as much as we can. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] frankliee commented on a diff in pull request #129: [Improvement] try read from backup shuffle servers when fetched data is inconsistent

Posted by GitBox <gi...@apache.org>.
frankliee commented on code in PR #129:
URL: https://github.com/apache/incubator-uniffle/pull/129#discussion_r939504721


##########
storage/src/main/java/org/apache/uniffle/storage/handler/impl/MemoryQuorumClientReadHandler.java:
##########
@@ -20,54 +20,58 @@
 import java.util.List;
 
 import com.google.common.collect.Lists;
+import org.roaringbitmap.longlong.Roaring64NavigableMap;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.uniffle.client.api.ShuffleServerClient;
 import org.apache.uniffle.common.ShuffleDataResult;
-import org.apache.uniffle.common.exception.RssException;
 import org.apache.uniffle.common.util.Constants;
+import org.apache.uniffle.common.util.RetryUtils;
 
 public class MemoryQuorumClientReadHandler extends AbstractClientReadHandler {
 
   private static final Logger LOG = LoggerFactory.getLogger(MemoryQuorumClientReadHandler.class);
   private long lastBlockId = Constants.INVALID_BLOCK_ID;
   private List<MemoryClientReadHandler> handlers = Lists.newLinkedList();
+  private int currentHandlerIdx = 0;
 
   public MemoryQuorumClientReadHandler(
       String appId,
       int shuffleId,
       int partitionId,
       int readBufferSize,
-      List<ShuffleServerClient> shuffleServerClients) {
+      List<ShuffleServerClient> shuffleServerClients, Roaring64NavigableMap processBlockIds) {
     this.appId = appId;
     this.shuffleId = shuffleId;
     this.partitionId = partitionId;
     this.readBufferSize = readBufferSize;
     shuffleServerClients.forEach(client ->
         handlers.add(new MemoryClientReadHandler(
-            appId, shuffleId, partitionId, readBufferSize, client))
+            appId, shuffleId, partitionId, readBufferSize, client, processBlockIds))
     );
   }
 
   @Override
   public ShuffleDataResult readShuffleData() {
-    boolean readSuccessful = false;
     ShuffleDataResult result = null;
-
-    for (MemoryClientReadHandler handler: handlers) {
+    while (currentHandlerIdx < handlers.size()) {
       try {
-        result = handler.readShuffleData();
-        readSuccessful = true;
-        break;
-      } catch (Exception e) {
-        LOG.warn("Failed to read a replica due to ", e);
+        result = RetryUtils.retry(() -> {
+          MemoryClientReadHandler handler = handlers.get(currentHandlerIdx);
+          ShuffleDataResult shuffleDataResult = handler.readShuffleData();
+          return shuffleDataResult;

Review Comment:
   If a server's disk is damaged or other server's bugs, the PRC is ok but some blocks are missing, you cannot know it until block checking. The replica only solve the integrality under server crash or network partition.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] xianjingfeng commented on a diff in pull request #129: [Improvement] try read from backup shuffle servers when fetched data is inconsistent

Posted by GitBox <gi...@apache.org>.
xianjingfeng commented on code in PR #129:
URL: https://github.com/apache/incubator-uniffle/pull/129#discussion_r938733377


##########
storage/src/main/java/org/apache/uniffle/storage/handler/impl/MemoryQuorumClientReadHandler.java:
##########
@@ -20,54 +20,58 @@
 import java.util.List;
 
 import com.google.common.collect.Lists;
+import org.roaringbitmap.longlong.Roaring64NavigableMap;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.uniffle.client.api.ShuffleServerClient;
 import org.apache.uniffle.common.ShuffleDataResult;
-import org.apache.uniffle.common.exception.RssException;
 import org.apache.uniffle.common.util.Constants;
+import org.apache.uniffle.common.util.RetryUtils;
 
 public class MemoryQuorumClientReadHandler extends AbstractClientReadHandler {
 
   private static final Logger LOG = LoggerFactory.getLogger(MemoryQuorumClientReadHandler.class);
   private long lastBlockId = Constants.INVALID_BLOCK_ID;
   private List<MemoryClientReadHandler> handlers = Lists.newLinkedList();
+  private int currentHandlerIdx = 0;
 
   public MemoryQuorumClientReadHandler(
       String appId,
       int shuffleId,
       int partitionId,
       int readBufferSize,
-      List<ShuffleServerClient> shuffleServerClients) {
+      List<ShuffleServerClient> shuffleServerClients, Roaring64NavigableMap processBlockIds) {
     this.appId = appId;
     this.shuffleId = shuffleId;
     this.partitionId = partitionId;
     this.readBufferSize = readBufferSize;
     shuffleServerClients.forEach(client ->
         handlers.add(new MemoryClientReadHandler(
-            appId, shuffleId, partitionId, readBufferSize, client))
+            appId, shuffleId, partitionId, readBufferSize, client, processBlockIds))
     );
   }
 
   @Override
   public ShuffleDataResult readShuffleData() {
-    boolean readSuccessful = false;
     ShuffleDataResult result = null;
-
-    for (MemoryClientReadHandler handler: handlers) {
+    while (currentHandlerIdx < handlers.size()) {
       try {
-        result = handler.readShuffleData();
-        readSuccessful = true;
-        break;
-      } catch (Exception e) {
-        LOG.warn("Failed to read a replica due to ", e);
+        result = RetryUtils.retry(() -> {
+          MemoryClientReadHandler handler = handlers.get(currentHandlerIdx);
+          ShuffleDataResult shuffleDataResult = handler.readShuffleData();
+          return shuffleDataResult;

Review Comment:
   This is what i want to discuss with you. It's difficult to pass parameters in the client. Should we add an new configration class to solve it?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] xianjingfeng commented on a diff in pull request #129: [Improvement] try read from backup shuffle servers when fetched data is inconsistent

Posted by GitBox <gi...@apache.org>.
xianjingfeng commented on code in PR #129:
URL: https://github.com/apache/incubator-uniffle/pull/129#discussion_r939506024


##########
storage/src/main/java/org/apache/uniffle/storage/handler/impl/MemoryQuorumClientReadHandler.java:
##########
@@ -20,54 +20,58 @@
 import java.util.List;
 
 import com.google.common.collect.Lists;
+import org.roaringbitmap.longlong.Roaring64NavigableMap;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.uniffle.client.api.ShuffleServerClient;
 import org.apache.uniffle.common.ShuffleDataResult;
-import org.apache.uniffle.common.exception.RssException;
 import org.apache.uniffle.common.util.Constants;
+import org.apache.uniffle.common.util.RetryUtils;
 
 public class MemoryQuorumClientReadHandler extends AbstractClientReadHandler {
 
   private static final Logger LOG = LoggerFactory.getLogger(MemoryQuorumClientReadHandler.class);
   private long lastBlockId = Constants.INVALID_BLOCK_ID;
   private List<MemoryClientReadHandler> handlers = Lists.newLinkedList();
+  private int currentHandlerIdx = 0;
 
   public MemoryQuorumClientReadHandler(
       String appId,
       int shuffleId,
       int partitionId,
       int readBufferSize,
-      List<ShuffleServerClient> shuffleServerClients) {
+      List<ShuffleServerClient> shuffleServerClients, Roaring64NavigableMap processBlockIds) {
     this.appId = appId;
     this.shuffleId = shuffleId;
     this.partitionId = partitionId;
     this.readBufferSize = readBufferSize;
     shuffleServerClients.forEach(client ->
         handlers.add(new MemoryClientReadHandler(
-            appId, shuffleId, partitionId, readBufferSize, client))
+            appId, shuffleId, partitionId, readBufferSize, client, processBlockIds))
     );
   }
 
   @Override
   public ShuffleDataResult readShuffleData() {
-    boolean readSuccessful = false;
     ShuffleDataResult result = null;
-
-    for (MemoryClientReadHandler handler: handlers) {
+    while (currentHandlerIdx < handlers.size()) {
       try {
-        result = handler.readShuffleData();
-        readSuccessful = true;
-        break;
-      } catch (Exception e) {
-        LOG.warn("Failed to read a replica due to ", e);
+        result = RetryUtils.retry(() -> {
+          MemoryClientReadHandler handler = handlers.get(currentHandlerIdx);
+          ShuffleDataResult shuffleDataResult = handler.readShuffleData();
+          return shuffleDataResult;

Review Comment:
   I think problems can be divided into three categories:
   1. Read index data fail. In this case, the client will receive an exception. The processing logic is the same as RPC failure
   2. Incorrect index data. In this case, i think `expectBlockIds` is also incorrect
   3. Index data is correct, but blocks are missing. In this case, we can check if all blocks recorded in index file have already read successlly



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] frankliee commented on a diff in pull request #129: [Improvement] try read from backup shuffle servers when fetched data is inconsistent

Posted by GitBox <gi...@apache.org>.
frankliee commented on code in PR #129:
URL: https://github.com/apache/incubator-uniffle/pull/129#discussion_r939504830


##########
storage/src/main/java/org/apache/uniffle/storage/handler/impl/MemoryQuorumClientReadHandler.java:
##########
@@ -20,54 +20,58 @@
 import java.util.List;
 
 import com.google.common.collect.Lists;
+import org.roaringbitmap.longlong.Roaring64NavigableMap;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.uniffle.client.api.ShuffleServerClient;
 import org.apache.uniffle.common.ShuffleDataResult;
-import org.apache.uniffle.common.exception.RssException;
 import org.apache.uniffle.common.util.Constants;
+import org.apache.uniffle.common.util.RetryUtils;
 
 public class MemoryQuorumClientReadHandler extends AbstractClientReadHandler {
 
   private static final Logger LOG = LoggerFactory.getLogger(MemoryQuorumClientReadHandler.class);
   private long lastBlockId = Constants.INVALID_BLOCK_ID;
   private List<MemoryClientReadHandler> handlers = Lists.newLinkedList();
+  private int currentHandlerIdx = 0;
 
   public MemoryQuorumClientReadHandler(
       String appId,
       int shuffleId,
       int partitionId,
       int readBufferSize,
-      List<ShuffleServerClient> shuffleServerClients) {
+      List<ShuffleServerClient> shuffleServerClients, Roaring64NavigableMap processBlockIds) {
     this.appId = appId;
     this.shuffleId = shuffleId;
     this.partitionId = partitionId;
     this.readBufferSize = readBufferSize;
     shuffleServerClients.forEach(client ->
         handlers.add(new MemoryClientReadHandler(
-            appId, shuffleId, partitionId, readBufferSize, client))
+            appId, shuffleId, partitionId, readBufferSize, client, processBlockIds))
     );
   }
 
   @Override
   public ShuffleDataResult readShuffleData() {
-    boolean readSuccessful = false;
     ShuffleDataResult result = null;
-
-    for (MemoryClientReadHandler handler: handlers) {
+    while (currentHandlerIdx < handlers.size()) {
       try {
-        result = handler.readShuffleData();
-        readSuccessful = true;
-        break;
-      } catch (Exception e) {
-        LOG.warn("Failed to read a replica due to ", e);
+        result = RetryUtils.retry(() -> {
+          MemoryClientReadHandler handler = handlers.get(currentHandlerIdx);
+          ShuffleDataResult shuffleDataResult = handler.readShuffleData();
+          return shuffleDataResult;

Review Comment:
   The PRK OK does not means data is totally complete, so we introduce block checking



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] xianjingfeng commented on a diff in pull request #129: [Improvement] try read from backup shuffle servers when fetched data is inconsistent

Posted by GitBox <gi...@apache.org>.
xianjingfeng commented on code in PR #129:
URL: https://github.com/apache/incubator-uniffle/pull/129#discussion_r938730713


##########
storage/src/main/java/org/apache/uniffle/storage/handler/impl/MemoryQuorumClientReadHandler.java:
##########
@@ -20,54 +20,58 @@
 import java.util.List;
 
 import com.google.common.collect.Lists;
+import org.roaringbitmap.longlong.Roaring64NavigableMap;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.uniffle.client.api.ShuffleServerClient;
 import org.apache.uniffle.common.ShuffleDataResult;
-import org.apache.uniffle.common.exception.RssException;
 import org.apache.uniffle.common.util.Constants;
+import org.apache.uniffle.common.util.RetryUtils;
 
 public class MemoryQuorumClientReadHandler extends AbstractClientReadHandler {
 
   private static final Logger LOG = LoggerFactory.getLogger(MemoryQuorumClientReadHandler.class);
   private long lastBlockId = Constants.INVALID_BLOCK_ID;
   private List<MemoryClientReadHandler> handlers = Lists.newLinkedList();
+  private int currentHandlerIdx = 0;
 
   public MemoryQuorumClientReadHandler(
       String appId,
       int shuffleId,
       int partitionId,
       int readBufferSize,
-      List<ShuffleServerClient> shuffleServerClients) {
+      List<ShuffleServerClient> shuffleServerClients, Roaring64NavigableMap processBlockIds) {
     this.appId = appId;
     this.shuffleId = shuffleId;
     this.partitionId = partitionId;
     this.readBufferSize = readBufferSize;
     shuffleServerClients.forEach(client ->
         handlers.add(new MemoryClientReadHandler(
-            appId, shuffleId, partitionId, readBufferSize, client))
+            appId, shuffleId, partitionId, readBufferSize, client, processBlockIds))
     );
   }
 
   @Override
   public ShuffleDataResult readShuffleData() {
-    boolean readSuccessful = false;
     ShuffleDataResult result = null;
-
-    for (MemoryClientReadHandler handler: handlers) {
+    while (currentHandlerIdx < handlers.size()) {
       try {
-        result = handler.readShuffleData();
-        readSuccessful = true;
-        break;
-      } catch (Exception e) {
-        LOG.warn("Failed to read a replica due to ", e);
+        result = RetryUtils.retry(() -> {
+          MemoryClientReadHandler handler = handlers.get(currentHandlerIdx);
+          ShuffleDataResult shuffleDataResult = handler.readShuffleData();
+          return shuffleDataResult;
+        }, 1000, 3);
+      } catch (Throwable e) {
+        LOG.warn("Failed to read a replica for appId[" + appId + "], shuffleId["
+            + shuffleId + "], partitionId[" + partitionId + "] due to ", e);
       }
-    }
 
-    if (!readSuccessful) {
-      throw new RssException("Failed to read in memory shuffle data for appId[" + appId
-          + "], shuffleId[" + shuffleId + "], partitionId[" + partitionId + "]");
+      if (result == null || result.isEmpty()) {
+        currentHandlerIdx++;
+        continue;
+      }
+      return result;

Review Comment:
   Maybe all data was flushed. We can add it back.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] frankliee commented on a diff in pull request #129: [Improvement] try read from backup shuffle servers when fetched data is inconsistent

Posted by GitBox <gi...@apache.org>.
frankliee commented on code in PR #129:
URL: https://github.com/apache/incubator-uniffle/pull/129#discussion_r938722193


##########
storage/src/main/java/org/apache/uniffle/storage/handler/impl/MemoryQuorumClientReadHandler.java:
##########
@@ -20,54 +20,58 @@
 import java.util.List;
 
 import com.google.common.collect.Lists;
+import org.roaringbitmap.longlong.Roaring64NavigableMap;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.uniffle.client.api.ShuffleServerClient;
 import org.apache.uniffle.common.ShuffleDataResult;
-import org.apache.uniffle.common.exception.RssException;
 import org.apache.uniffle.common.util.Constants;
+import org.apache.uniffle.common.util.RetryUtils;
 
 public class MemoryQuorumClientReadHandler extends AbstractClientReadHandler {
 
   private static final Logger LOG = LoggerFactory.getLogger(MemoryQuorumClientReadHandler.class);
   private long lastBlockId = Constants.INVALID_BLOCK_ID;
   private List<MemoryClientReadHandler> handlers = Lists.newLinkedList();
+  private int currentHandlerIdx = 0;
 
   public MemoryQuorumClientReadHandler(
       String appId,
       int shuffleId,
       int partitionId,
       int readBufferSize,
-      List<ShuffleServerClient> shuffleServerClients) {
+      List<ShuffleServerClient> shuffleServerClients, Roaring64NavigableMap processBlockIds) {
     this.appId = appId;
     this.shuffleId = shuffleId;
     this.partitionId = partitionId;
     this.readBufferSize = readBufferSize;
     shuffleServerClients.forEach(client ->
         handlers.add(new MemoryClientReadHandler(
-            appId, shuffleId, partitionId, readBufferSize, client))
+            appId, shuffleId, partitionId, readBufferSize, client, processBlockIds))
     );
   }
 
   @Override
   public ShuffleDataResult readShuffleData() {
-    boolean readSuccessful = false;
     ShuffleDataResult result = null;
-
-    for (MemoryClientReadHandler handler: handlers) {
+    while (currentHandlerIdx < handlers.size()) {
       try {
-        result = handler.readShuffleData();
-        readSuccessful = true;
-        break;
-      } catch (Exception e) {
-        LOG.warn("Failed to read a replica due to ", e);
+        result = RetryUtils.retry(() -> {
+          MemoryClientReadHandler handler = handlers.get(currentHandlerIdx);
+          ShuffleDataResult shuffleDataResult = handler.readShuffleData();
+          return shuffleDataResult;

Review Comment:
   Avoid to use hard-coded value.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] codecov-commenter commented on pull request #129: [Improvement] try read from backup shuffle servers when fetched data is inconsistent

Posted by GitBox <gi...@apache.org>.
codecov-commenter commented on PR #129:
URL: https://github.com/apache/incubator-uniffle/pull/129#issuecomment-1206521373

   # [Codecov](https://codecov.io/gh/apache/incubator-uniffle/pull/129?src=pr&el=h1&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) Report
   > Merging [#129](https://codecov.io/gh/apache/incubator-uniffle/pull/129?src=pr&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (aca97d2) into [master](https://codecov.io/gh/apache/incubator-uniffle/commit/fd8ccdd920296745b7f27e3f36ed06238b0f274f?el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (fd8ccdd) will **decrease** coverage by `1.58%`.
   > The diff coverage is `14.28%`.
   
   ```diff
   @@             Coverage Diff              @@
   ##             master     #129      +/-   ##
   ============================================
   - Coverage     57.17%   55.58%   -1.59%     
   + Complexity     1201     1123      -78     
   ============================================
     Files           150      141       -9     
     Lines          8178     7703     -475     
     Branches        773      743      -30     
   ============================================
   - Hits           4676     4282     -394     
   + Misses         3256     3185      -71     
   + Partials        246      236      -10     
   ```
   
   
   | [Impacted Files](https://codecov.io/gh/apache/incubator-uniffle/pull/129?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | Coverage Δ | |
   |---|---|---|
   | [...pache/uniffle/server/ShuffleServerGrpcService.java](https://codecov.io/gh/apache/incubator-uniffle/pull/129/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2VydmVyL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS91bmlmZmxlL3NlcnZlci9TaHVmZmxlU2VydmVyR3JwY1NlcnZpY2UuamF2YQ==) | `1.00% <0.00%> (-0.01%)` | :arrow_down: |
   | [.../org/apache/uniffle/server/ShuffleTaskManager.java](https://codecov.io/gh/apache/incubator-uniffle/pull/129/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2VydmVyL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS91bmlmZmxlL3NlcnZlci9TaHVmZmxlVGFza01hbmFnZXIuamF2YQ==) | `64.10% <0.00%> (ø)` | |
   | [...uniffle/storage/factory/ShuffleHandlerFactory.java](https://codecov.io/gh/apache/incubator-uniffle/pull/129/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c3RvcmFnZS9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvdW5pZmZsZS9zdG9yYWdlL2ZhY3RvcnkvU2h1ZmZsZUhhbmRsZXJGYWN0b3J5LmphdmE=) | `0.00% <0.00%> (ø)` | |
   | [...handler/impl/LocalFileQuorumClientReadHandler.java](https://codecov.io/gh/apache/incubator-uniffle/pull/129/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c3RvcmFnZS9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvdW5pZmZsZS9zdG9yYWdlL2hhbmRsZXIvaW1wbC9Mb2NhbEZpbGVRdW9ydW1DbGllbnRSZWFkSGFuZGxlci5qYXZh) | `0.00% <0.00%> (ø)` | |
   | [.../storage/handler/impl/MemoryClientReadHandler.java](https://codecov.io/gh/apache/incubator-uniffle/pull/129/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c3RvcmFnZS9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvdW5pZmZsZS9zdG9yYWdlL2hhbmRsZXIvaW1wbC9NZW1vcnlDbGllbnRSZWFkSGFuZGxlci5qYXZh) | `0.00% <0.00%> (ø)` | |
   | [...ge/handler/impl/MemoryQuorumClientReadHandler.java](https://codecov.io/gh/apache/incubator-uniffle/pull/129/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c3RvcmFnZS9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvdW5pZmZsZS9zdG9yYWdlL2hhbmRsZXIvaW1wbC9NZW1vcnlRdW9ydW1DbGllbnRSZWFkSGFuZGxlci5qYXZh) | `0.00% <0.00%> (ø)` | |
   | [...rg/apache/uniffle/server/buffer/ShuffleBuffer.java](https://codecov.io/gh/apache/incubator-uniffle/pull/129/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2VydmVyL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS91bmlmZmxlL3NlcnZlci9idWZmZXIvU2h1ZmZsZUJ1ZmZlci5qYXZh) | `90.38% <71.42%> (-2.35%)` | :arrow_down: |
   | [...he/uniffle/server/buffer/ShuffleBufferManager.java](https://codecov.io/gh/apache/incubator-uniffle/pull/129/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2VydmVyL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS91bmlmZmxlL3NlcnZlci9idWZmZXIvU2h1ZmZsZUJ1ZmZlck1hbmFnZXIuamF2YQ==) | `81.89% <100.00%> (+0.07%)` | :arrow_up: |
   | [...e/uniffle/server/storage/SingleStorageManager.java](https://codecov.io/gh/apache/incubator-uniffle/pull/129/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2VydmVyL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS91bmlmZmxlL3NlcnZlci9zdG9yYWdlL1NpbmdsZVN0b3JhZ2VNYW5hZ2VyLmphdmE=) | `65.57% <0.00%> (-1.64%)` | :arrow_down: |
   | [...he/uniffle/server/storage/LocalStorageManager.java](https://codecov.io/gh/apache/incubator-uniffle/pull/129/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2VydmVyL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS91bmlmZmxlL3NlcnZlci9zdG9yYWdlL0xvY2FsU3RvcmFnZU1hbmFnZXIuamF2YQ==) | `61.53% <0.00%> (-0.37%)` | :arrow_down: |
   | ... and [13 more](https://codecov.io/gh/apache/incubator-uniffle/pull/129/diff?src=pr&el=tree-more&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | |
   
   :mega: Codecov can now indicate which changes are the most critical in Pull Requests. [Learn more](https://about.codecov.io/product/feature/runtime-insights/?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] frankliee commented on a diff in pull request #129: [Improvement] try read from backup shuffle servers when fetched data is inconsistent

Posted by GitBox <gi...@apache.org>.
frankliee commented on code in PR #129:
URL: https://github.com/apache/incubator-uniffle/pull/129#discussion_r939501018


##########
storage/src/main/java/org/apache/uniffle/storage/handler/impl/MemoryQuorumClientReadHandler.java:
##########
@@ -20,54 +20,58 @@
 import java.util.List;
 
 import com.google.common.collect.Lists;
+import org.roaringbitmap.longlong.Roaring64NavigableMap;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.uniffle.client.api.ShuffleServerClient;
 import org.apache.uniffle.common.ShuffleDataResult;
-import org.apache.uniffle.common.exception.RssException;
 import org.apache.uniffle.common.util.Constants;
+import org.apache.uniffle.common.util.RetryUtils;
 
 public class MemoryQuorumClientReadHandler extends AbstractClientReadHandler {
 
   private static final Logger LOG = LoggerFactory.getLogger(MemoryQuorumClientReadHandler.class);
   private long lastBlockId = Constants.INVALID_BLOCK_ID;
   private List<MemoryClientReadHandler> handlers = Lists.newLinkedList();
+  private int currentHandlerIdx = 0;
 
   public MemoryQuorumClientReadHandler(
       String appId,
       int shuffleId,
       int partitionId,
       int readBufferSize,
-      List<ShuffleServerClient> shuffleServerClients) {
+      List<ShuffleServerClient> shuffleServerClients, Roaring64NavigableMap processBlockIds) {
     this.appId = appId;
     this.shuffleId = shuffleId;
     this.partitionId = partitionId;
     this.readBufferSize = readBufferSize;
     shuffleServerClients.forEach(client ->
         handlers.add(new MemoryClientReadHandler(
-            appId, shuffleId, partitionId, readBufferSize, client))
+            appId, shuffleId, partitionId, readBufferSize, client, processBlockIds))
     );
   }
 
   @Override
   public ShuffleDataResult readShuffleData() {
-    boolean readSuccessful = false;
     ShuffleDataResult result = null;
-
-    for (MemoryClientReadHandler handler: handlers) {
+    while (currentHandlerIdx < handlers.size()) {
       try {
-        result = handler.readShuffleData();
-        readSuccessful = true;
-        break;
-      } catch (Exception e) {
-        LOG.warn("Failed to read a replica due to ", e);
+        result = RetryUtils.retry(() -> {
+          MemoryClientReadHandler handler = handlers.get(currentHandlerIdx);
+          ShuffleDataResult shuffleDataResult = handler.readShuffleData();
+          return shuffleDataResult;

Review Comment:
   Reading rss.data.replica.read number of data is unworthy unless there are really inconsistent blocks. 
   When inconsistent blocks are frequent, we should dig the reason, rather than reading multiple replicas in most of cases. In our product environment, 20000 blocks per partition is still a stable setting.
   
   We could add an interface call "getFetchedReplicas" (default return {}) in AbstractClientReadHandler, and let these read clients to know which is the  server to start when the inconsistent blocks invoke retry.
   
   Besides, improving memory shuffle read logic is a large story, which is out of the range of this PR.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] frankliee commented on pull request #129: [Improvement] try read from backup shuffle servers when fetched data is inconsistent

Posted by GitBox <gi...@apache.org>.
frankliee commented on PR #129:
URL: https://github.com/apache/incubator-uniffle/pull/129#issuecomment-1206336024

   I suggest to focus on the client side fallback in this PR. 
   Skipping memory data may need careful design on the server side, which could be put in an another PR along with design doc.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] xianjingfeng commented on a diff in pull request #129: [Improvement] try read from backup shuffle servers when fetched data is inconsistent

Posted by GitBox <gi...@apache.org>.
xianjingfeng commented on code in PR #129:
URL: https://github.com/apache/incubator-uniffle/pull/129#discussion_r939481463


##########
storage/src/main/java/org/apache/uniffle/storage/handler/impl/MemoryQuorumClientReadHandler.java:
##########
@@ -20,54 +20,58 @@
 import java.util.List;
 
 import com.google.common.collect.Lists;
+import org.roaringbitmap.longlong.Roaring64NavigableMap;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.uniffle.client.api.ShuffleServerClient;
 import org.apache.uniffle.common.ShuffleDataResult;
-import org.apache.uniffle.common.exception.RssException;
 import org.apache.uniffle.common.util.Constants;
+import org.apache.uniffle.common.util.RetryUtils;
 
 public class MemoryQuorumClientReadHandler extends AbstractClientReadHandler {
 
   private static final Logger LOG = LoggerFactory.getLogger(MemoryQuorumClientReadHandler.class);
   private long lastBlockId = Constants.INVALID_BLOCK_ID;
   private List<MemoryClientReadHandler> handlers = Lists.newLinkedList();
+  private int currentHandlerIdx = 0;
 
   public MemoryQuorumClientReadHandler(
       String appId,
       int shuffleId,
       int partitionId,
       int readBufferSize,
-      List<ShuffleServerClient> shuffleServerClients) {
+      List<ShuffleServerClient> shuffleServerClients, Roaring64NavigableMap processBlockIds) {
     this.appId = appId;
     this.shuffleId = shuffleId;
     this.partitionId = partitionId;
     this.readBufferSize = readBufferSize;
     shuffleServerClients.forEach(client ->
         handlers.add(new MemoryClientReadHandler(
-            appId, shuffleId, partitionId, readBufferSize, client))
+            appId, shuffleId, partitionId, readBufferSize, client, processBlockIds))
     );
   }
 
   @Override
   public ShuffleDataResult readShuffleData() {
-    boolean readSuccessful = false;
     ShuffleDataResult result = null;
-
-    for (MemoryClientReadHandler handler: handlers) {
+    while (currentHandlerIdx < handlers.size()) {
       try {
-        result = handler.readShuffleData();
-        readSuccessful = true;
-        break;
-      } catch (Exception e) {
-        LOG.warn("Failed to read a replica due to ", e);
+        result = RetryUtils.retry(() -> {
+          MemoryClientReadHandler handler = handlers.get(currentHandlerIdx);
+          ShuffleDataResult shuffleDataResult = handler.readShuffleData();
+          return shuffleDataResult;

Review Comment:
   > And I suggest to fallback to backup server after block check shows data is inconsistent
   
   1. I think if we do this, the logic may will become very complicated and difficult to handle. If blocks inconsistent appear,  how do we try again? It is difficult for us to know which step needs to be retried. 
   2. I think the main problem of this logic is it will read redundant index even if the blocks of the first shuffle server is complete. And i think we should not read from all shuffle server under the condition of ensuring data integrity. I think the number of shuffle server to read should be equal to `rss.data.replica.read`. 
   3.  I found blocks inconsistent is easy encounter when the number of blocks greater than 5000 in a partition, And almost every time when greater than 20000. So I think the logic of this PR will be more stable
   
   > If this optimization is necessary, it should be put the place where gPRC is firstly used. The QuorumClient should only deal with the interaction of servers.
   
   Agree with you. Should we add retry logic to all grpc interface?
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] frankliee commented on a diff in pull request #129: [Improvement] try read from backup shuffle servers when fetched data is inconsistent

Posted by GitBox <gi...@apache.org>.
frankliee commented on code in PR #129:
URL: https://github.com/apache/incubator-uniffle/pull/129#discussion_r939472650


##########
storage/src/main/java/org/apache/uniffle/storage/handler/impl/MemoryQuorumClientReadHandler.java:
##########
@@ -20,54 +20,58 @@
 import java.util.List;
 
 import com.google.common.collect.Lists;
+import org.roaringbitmap.longlong.Roaring64NavigableMap;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.uniffle.client.api.ShuffleServerClient;
 import org.apache.uniffle.common.ShuffleDataResult;
-import org.apache.uniffle.common.exception.RssException;
 import org.apache.uniffle.common.util.Constants;
+import org.apache.uniffle.common.util.RetryUtils;
 
 public class MemoryQuorumClientReadHandler extends AbstractClientReadHandler {
 
   private static final Logger LOG = LoggerFactory.getLogger(MemoryQuorumClientReadHandler.class);
   private long lastBlockId = Constants.INVALID_BLOCK_ID;
   private List<MemoryClientReadHandler> handlers = Lists.newLinkedList();
+  private int currentHandlerIdx = 0;
 
   public MemoryQuorumClientReadHandler(
       String appId,
       int shuffleId,
       int partitionId,
       int readBufferSize,
-      List<ShuffleServerClient> shuffleServerClients) {
+      List<ShuffleServerClient> shuffleServerClients, Roaring64NavigableMap processBlockIds) {
     this.appId = appId;
     this.shuffleId = shuffleId;
     this.partitionId = partitionId;
     this.readBufferSize = readBufferSize;
     shuffleServerClients.forEach(client ->
         handlers.add(new MemoryClientReadHandler(
-            appId, shuffleId, partitionId, readBufferSize, client))
+            appId, shuffleId, partitionId, readBufferSize, client, processBlockIds))
     );
   }
 
   @Override
   public ShuffleDataResult readShuffleData() {
-    boolean readSuccessful = false;
     ShuffleDataResult result = null;
-
-    for (MemoryClientReadHandler handler: handlers) {
+    while (currentHandlerIdx < handlers.size()) {
       try {
-        result = handler.readShuffleData();
-        readSuccessful = true;
-        break;
-      } catch (Exception e) {
-        LOG.warn("Failed to read a replica due to ", e);
+        result = RetryUtils.retry(() -> {
+          MemoryClientReadHandler handler = handlers.get(currentHandlerIdx);
+          ShuffleDataResult shuffleDataResult = handler.readShuffleData();
+          return shuffleDataResult;

Review Comment:
   And I suggest to fallback to backup server after block check shows data is inconsistent, the current logic is before bock check so it cannot avoid the inconsistent caused by server's reasons (i.e., RPC is ok but data is incomplete).
    
    



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] xianjingfeng closed pull request #129: [Improvement] try read from backup shuffle servers when fetched data is inconsistent

Posted by GitBox <gi...@apache.org>.
xianjingfeng closed pull request #129: [Improvement] try read from backup shuffle servers when fetched data is inconsistent
URL: https://github.com/apache/incubator-uniffle/pull/129


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] frankliee commented on a diff in pull request #129: [Improvement] try read from backup shuffle servers when fetched data is inconsistent

Posted by GitBox <gi...@apache.org>.
frankliee commented on code in PR #129:
URL: https://github.com/apache/incubator-uniffle/pull/129#discussion_r938719586


##########
storage/src/main/java/org/apache/uniffle/storage/handler/impl/MemoryQuorumClientReadHandler.java:
##########
@@ -20,54 +20,58 @@
 import java.util.List;
 
 import com.google.common.collect.Lists;
+import org.roaringbitmap.longlong.Roaring64NavigableMap;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.uniffle.client.api.ShuffleServerClient;
 import org.apache.uniffle.common.ShuffleDataResult;
-import org.apache.uniffle.common.exception.RssException;
 import org.apache.uniffle.common.util.Constants;
+import org.apache.uniffle.common.util.RetryUtils;
 
 public class MemoryQuorumClientReadHandler extends AbstractClientReadHandler {
 
   private static final Logger LOG = LoggerFactory.getLogger(MemoryQuorumClientReadHandler.class);
   private long lastBlockId = Constants.INVALID_BLOCK_ID;
   private List<MemoryClientReadHandler> handlers = Lists.newLinkedList();
+  private int currentHandlerIdx = 0;
 
   public MemoryQuorumClientReadHandler(
       String appId,
       int shuffleId,
       int partitionId,
       int readBufferSize,
-      List<ShuffleServerClient> shuffleServerClients) {
+      List<ShuffleServerClient> shuffleServerClients, Roaring64NavigableMap processBlockIds) {
     this.appId = appId;
     this.shuffleId = shuffleId;
     this.partitionId = partitionId;
     this.readBufferSize = readBufferSize;
     shuffleServerClients.forEach(client ->
         handlers.add(new MemoryClientReadHandler(
-            appId, shuffleId, partitionId, readBufferSize, client))
+            appId, shuffleId, partitionId, readBufferSize, client, processBlockIds))
     );
   }
 
   @Override
   public ShuffleDataResult readShuffleData() {
-    boolean readSuccessful = false;
     ShuffleDataResult result = null;
-
-    for (MemoryClientReadHandler handler: handlers) {
+    while (currentHandlerIdx < handlers.size()) {
       try {
-        result = handler.readShuffleData();
-        readSuccessful = true;
-        break;
-      } catch (Exception e) {
-        LOG.warn("Failed to read a replica due to ", e);
+        result = RetryUtils.retry(() -> {
+          MemoryClientReadHandler handler = handlers.get(currentHandlerIdx);
+          ShuffleDataResult shuffleDataResult = handler.readShuffleData();
+          return shuffleDataResult;
+        }, 1000, 3);
+      } catch (Throwable e) {
+        LOG.warn("Failed to read a replica for appId[" + appId + "], shuffleId["
+            + shuffleId + "], partitionId[" + partitionId + "] due to ", e);
       }
-    }
 
-    if (!readSuccessful) {
-      throw new RssException("Failed to read in memory shuffle data for appId[" + appId
-          + "], shuffleId[" + shuffleId + "], partitionId[" + partitionId + "]");
+      if (result == null || result.isEmpty()) {
+        currentHandlerIdx++;
+        continue;
+      }
+      return result;

Review Comment:
   Why removing throw RssException?
   What about the case when all servers are not ok



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] frankliee commented on a diff in pull request #129: [Improvement] try read from backup shuffle servers when fetched data is inconsistent

Posted by GitBox <gi...@apache.org>.
frankliee commented on code in PR #129:
URL: https://github.com/apache/incubator-uniffle/pull/129#discussion_r939522693


##########
storage/src/main/java/org/apache/uniffle/storage/handler/impl/MemoryQuorumClientReadHandler.java:
##########
@@ -20,54 +20,58 @@
 import java.util.List;
 
 import com.google.common.collect.Lists;
+import org.roaringbitmap.longlong.Roaring64NavigableMap;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.uniffle.client.api.ShuffleServerClient;
 import org.apache.uniffle.common.ShuffleDataResult;
-import org.apache.uniffle.common.exception.RssException;
 import org.apache.uniffle.common.util.Constants;
+import org.apache.uniffle.common.util.RetryUtils;
 
 public class MemoryQuorumClientReadHandler extends AbstractClientReadHandler {
 
   private static final Logger LOG = LoggerFactory.getLogger(MemoryQuorumClientReadHandler.class);
   private long lastBlockId = Constants.INVALID_BLOCK_ID;
   private List<MemoryClientReadHandler> handlers = Lists.newLinkedList();
+  private int currentHandlerIdx = 0;
 
   public MemoryQuorumClientReadHandler(
       String appId,
       int shuffleId,
       int partitionId,
       int readBufferSize,
-      List<ShuffleServerClient> shuffleServerClients) {
+      List<ShuffleServerClient> shuffleServerClients, Roaring64NavigableMap processBlockIds) {
     this.appId = appId;
     this.shuffleId = shuffleId;
     this.partitionId = partitionId;
     this.readBufferSize = readBufferSize;
     shuffleServerClients.forEach(client ->
         handlers.add(new MemoryClientReadHandler(
-            appId, shuffleId, partitionId, readBufferSize, client))
+            appId, shuffleId, partitionId, readBufferSize, client, processBlockIds))
     );
   }
 
   @Override
   public ShuffleDataResult readShuffleData() {
-    boolean readSuccessful = false;
     ShuffleDataResult result = null;
-
-    for (MemoryClientReadHandler handler: handlers) {
+    while (currentHandlerIdx < handlers.size()) {
       try {
-        result = handler.readShuffleData();
-        readSuccessful = true;
-        break;
-      } catch (Exception e) {
-        LOG.warn("Failed to read a replica due to ", e);
+        result = RetryUtils.retry(() -> {
+          MemoryClientReadHandler handler = handlers.get(currentHandlerIdx);
+          ShuffleDataResult shuffleDataResult = handler.readShuffleData();
+          return shuffleDataResult;

Review Comment:
   expectBlockId is a different notion from index, it is generated by Spark client to find whether blocks are needed. 
   
   In fact, index and data can be both missed due to write side or server side reasons. And you cannot know whether there exist block missing until a server is totally read.
   
    So we should add fallback mechanism to fetch data from another server "after" (not before) block checking.
    
    
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] frankliee commented on a diff in pull request #129: [Improvement] try read from backup shuffle servers when fetched data is inconsistent

Posted by GitBox <gi...@apache.org>.
frankliee commented on code in PR #129:
URL: https://github.com/apache/incubator-uniffle/pull/129#discussion_r939503235


##########
storage/src/main/java/org/apache/uniffle/storage/handler/impl/MemoryQuorumClientReadHandler.java:
##########
@@ -20,54 +20,58 @@
 import java.util.List;
 
 import com.google.common.collect.Lists;
+import org.roaringbitmap.longlong.Roaring64NavigableMap;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.uniffle.client.api.ShuffleServerClient;
 import org.apache.uniffle.common.ShuffleDataResult;
-import org.apache.uniffle.common.exception.RssException;
 import org.apache.uniffle.common.util.Constants;
+import org.apache.uniffle.common.util.RetryUtils;
 
 public class MemoryQuorumClientReadHandler extends AbstractClientReadHandler {
 
   private static final Logger LOG = LoggerFactory.getLogger(MemoryQuorumClientReadHandler.class);
   private long lastBlockId = Constants.INVALID_BLOCK_ID;
   private List<MemoryClientReadHandler> handlers = Lists.newLinkedList();
+  private int currentHandlerIdx = 0;
 
   public MemoryQuorumClientReadHandler(
       String appId,
       int shuffleId,
       int partitionId,
       int readBufferSize,
-      List<ShuffleServerClient> shuffleServerClients) {
+      List<ShuffleServerClient> shuffleServerClients, Roaring64NavigableMap processBlockIds) {
     this.appId = appId;
     this.shuffleId = shuffleId;
     this.partitionId = partitionId;
     this.readBufferSize = readBufferSize;
     shuffleServerClients.forEach(client ->
         handlers.add(new MemoryClientReadHandler(
-            appId, shuffleId, partitionId, readBufferSize, client))
+            appId, shuffleId, partitionId, readBufferSize, client, processBlockIds))
     );
   }
 
   @Override
   public ShuffleDataResult readShuffleData() {
-    boolean readSuccessful = false;
     ShuffleDataResult result = null;
-
-    for (MemoryClientReadHandler handler: handlers) {
+    while (currentHandlerIdx < handlers.size()) {
       try {
-        result = handler.readShuffleData();
-        readSuccessful = true;
-        break;
-      } catch (Exception e) {
-        LOG.warn("Failed to read a replica due to ", e);
+        result = RetryUtils.retry(() -> {
+          MemoryClientReadHandler handler = handlers.get(currentHandlerIdx);
+          ShuffleDataResult shuffleDataResult = handler.readShuffleData();
+          return shuffleDataResult;

Review Comment:
   This logic is too tricky, and still cannot the solve the case when the first and third RPC are OK but block checking is failed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] xianjingfeng commented on pull request #129: [Improvement] try read from backup shuffle servers when fetched data is inconsistent

Posted by GitBox <gi...@apache.org>.
xianjingfeng commented on PR #129:
URL: https://github.com/apache/incubator-uniffle/pull/129#issuecomment-1299638926

   Split into #276 and #294 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] frankliee commented on pull request #129: [Improvement] try read from backup shuffle servers when fetched data is inconsistent

Posted by GitBox <gi...@apache.org>.
frankliee commented on PR #129:
URL: https://github.com/apache/incubator-uniffle/pull/129#issuecomment-1206650653

   > > > > I suggest to focus on the client side fallback in this PR. Skipping memory data may need careful design on the server side, which could be put in an another PR along with design doc.
   > > > 
   > > > 
   > > > I think this change in server side is not so big. Should we create a design doc?
   > > 
   > > 
   > > But each PR should do one thing as much as we can.
   > 
   > Agree with you. but this pr will not work without server side change
   
   You could only handle with fallback with backup servers, and put the part of memory skipping  to another PR.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] xianjingfeng commented on a diff in pull request #129: [Improvement] try read from backup shuffle servers when fetched data is inconsistent

Posted by GitBox <gi...@apache.org>.
xianjingfeng commented on code in PR #129:
URL: https://github.com/apache/incubator-uniffle/pull/129#discussion_r939497799


##########
storage/src/main/java/org/apache/uniffle/storage/handler/impl/MemoryQuorumClientReadHandler.java:
##########
@@ -20,54 +20,58 @@
 import java.util.List;
 
 import com.google.common.collect.Lists;
+import org.roaringbitmap.longlong.Roaring64NavigableMap;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.uniffle.client.api.ShuffleServerClient;
 import org.apache.uniffle.common.ShuffleDataResult;
-import org.apache.uniffle.common.exception.RssException;
 import org.apache.uniffle.common.util.Constants;
+import org.apache.uniffle.common.util.RetryUtils;
 
 public class MemoryQuorumClientReadHandler extends AbstractClientReadHandler {
 
   private static final Logger LOG = LoggerFactory.getLogger(MemoryQuorumClientReadHandler.class);
   private long lastBlockId = Constants.INVALID_BLOCK_ID;
   private List<MemoryClientReadHandler> handlers = Lists.newLinkedList();
+  private int currentHandlerIdx = 0;
 
   public MemoryQuorumClientReadHandler(
       String appId,
       int shuffleId,
       int partitionId,
       int readBufferSize,
-      List<ShuffleServerClient> shuffleServerClients) {
+      List<ShuffleServerClient> shuffleServerClients, Roaring64NavigableMap processBlockIds) {
     this.appId = appId;
     this.shuffleId = shuffleId;
     this.partitionId = partitionId;
     this.readBufferSize = readBufferSize;
     shuffleServerClients.forEach(client ->
         handlers.add(new MemoryClientReadHandler(
-            appId, shuffleId, partitionId, readBufferSize, client))
+            appId, shuffleId, partitionId, readBufferSize, client, processBlockIds))
     );
   }
 
   @Override
   public ShuffleDataResult readShuffleData() {
-    boolean readSuccessful = false;
     ShuffleDataResult result = null;
-
-    for (MemoryClientReadHandler handler: handlers) {
+    while (currentHandlerIdx < handlers.size()) {
       try {
-        result = handler.readShuffleData();
-        readSuccessful = true;
-        break;
-      } catch (Exception e) {
-        LOG.warn("Failed to read a replica due to ", e);
+        result = RetryUtils.retry(() -> {
+          MemoryClientReadHandler handler = handlers.get(currentHandlerIdx);
+          ShuffleDataResult shuffleDataResult = handler.readShuffleData();
+          return shuffleDataResult;

Review Comment:
   I think we can read the first shuffle server first. and then read the others in reverse order until reach the value `rss.data.replica.read`.  In this way, we can reduce reading redundant index data. What do you think? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] xianjingfeng commented on a diff in pull request #129: [Improvement] try read from backup shuffle servers when fetched data is inconsistent

Posted by GitBox <gi...@apache.org>.
xianjingfeng commented on code in PR #129:
URL: https://github.com/apache/incubator-uniffle/pull/129#discussion_r939504137


##########
storage/src/main/java/org/apache/uniffle/storage/handler/impl/MemoryQuorumClientReadHandler.java:
##########
@@ -20,54 +20,58 @@
 import java.util.List;
 
 import com.google.common.collect.Lists;
+import org.roaringbitmap.longlong.Roaring64NavigableMap;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.uniffle.client.api.ShuffleServerClient;
 import org.apache.uniffle.common.ShuffleDataResult;
-import org.apache.uniffle.common.exception.RssException;
 import org.apache.uniffle.common.util.Constants;
+import org.apache.uniffle.common.util.RetryUtils;
 
 public class MemoryQuorumClientReadHandler extends AbstractClientReadHandler {
 
   private static final Logger LOG = LoggerFactory.getLogger(MemoryQuorumClientReadHandler.class);
   private long lastBlockId = Constants.INVALID_BLOCK_ID;
   private List<MemoryClientReadHandler> handlers = Lists.newLinkedList();
+  private int currentHandlerIdx = 0;
 
   public MemoryQuorumClientReadHandler(
       String appId,
       int shuffleId,
       int partitionId,
       int readBufferSize,
-      List<ShuffleServerClient> shuffleServerClients) {
+      List<ShuffleServerClient> shuffleServerClients, Roaring64NavigableMap processBlockIds) {
     this.appId = appId;
     this.shuffleId = shuffleId;
     this.partitionId = partitionId;
     this.readBufferSize = readBufferSize;
     shuffleServerClients.forEach(client ->
         handlers.add(new MemoryClientReadHandler(
-            appId, shuffleId, partitionId, readBufferSize, client))
+            appId, shuffleId, partitionId, readBufferSize, client, processBlockIds))
     );
   }
 
   @Override
   public ShuffleDataResult readShuffleData() {
-    boolean readSuccessful = false;
     ShuffleDataResult result = null;
-
-    for (MemoryClientReadHandler handler: handlers) {
+    while (currentHandlerIdx < handlers.size()) {
       try {
-        result = handler.readShuffleData();
-        readSuccessful = true;
-        break;
-      } catch (Exception e) {
-        LOG.warn("Failed to read a replica due to ", e);
+        result = RetryUtils.retry(() -> {
+          MemoryClientReadHandler handler = handlers.get(currentHandlerIdx);
+          ShuffleDataResult shuffleDataResult = handler.readShuffleData();
+          return shuffleDataResult;

Review Comment:
   Why block checking will fail if the first and third RPC are OK? Lost blocks after write successfully? I don't understand. Can you explain?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] frankliee commented on a diff in pull request #129: [Improvement] try read from backup shuffle servers when fetched data is inconsistent

Posted by GitBox <gi...@apache.org>.
frankliee commented on code in PR #129:
URL: https://github.com/apache/incubator-uniffle/pull/129#discussion_r939503235


##########
storage/src/main/java/org/apache/uniffle/storage/handler/impl/MemoryQuorumClientReadHandler.java:
##########
@@ -20,54 +20,58 @@
 import java.util.List;
 
 import com.google.common.collect.Lists;
+import org.roaringbitmap.longlong.Roaring64NavigableMap;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.uniffle.client.api.ShuffleServerClient;
 import org.apache.uniffle.common.ShuffleDataResult;
-import org.apache.uniffle.common.exception.RssException;
 import org.apache.uniffle.common.util.Constants;
+import org.apache.uniffle.common.util.RetryUtils;
 
 public class MemoryQuorumClientReadHandler extends AbstractClientReadHandler {
 
   private static final Logger LOG = LoggerFactory.getLogger(MemoryQuorumClientReadHandler.class);
   private long lastBlockId = Constants.INVALID_BLOCK_ID;
   private List<MemoryClientReadHandler> handlers = Lists.newLinkedList();
+  private int currentHandlerIdx = 0;
 
   public MemoryQuorumClientReadHandler(
       String appId,
       int shuffleId,
       int partitionId,
       int readBufferSize,
-      List<ShuffleServerClient> shuffleServerClients) {
+      List<ShuffleServerClient> shuffleServerClients, Roaring64NavigableMap processBlockIds) {
     this.appId = appId;
     this.shuffleId = shuffleId;
     this.partitionId = partitionId;
     this.readBufferSize = readBufferSize;
     shuffleServerClients.forEach(client ->
         handlers.add(new MemoryClientReadHandler(
-            appId, shuffleId, partitionId, readBufferSize, client))
+            appId, shuffleId, partitionId, readBufferSize, client, processBlockIds))
     );
   }
 
   @Override
   public ShuffleDataResult readShuffleData() {
-    boolean readSuccessful = false;
     ShuffleDataResult result = null;
-
-    for (MemoryClientReadHandler handler: handlers) {
+    while (currentHandlerIdx < handlers.size()) {
       try {
-        result = handler.readShuffleData();
-        readSuccessful = true;
-        break;
-      } catch (Exception e) {
-        LOG.warn("Failed to read a replica due to ", e);
+        result = RetryUtils.retry(() -> {
+          MemoryClientReadHandler handler = handlers.get(currentHandlerIdx);
+          ShuffleDataResult shuffleDataResult = handler.readShuffleData();
+          return shuffleDataResult;

Review Comment:
   This logic is too tricky, and still cannot the solve the case when the first and third RPC are OK but  block checking is failed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org