You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uniffle.apache.org by ro...@apache.org on 2022/09/27 11:00:14 UTC

[incubator-uniffle] branch master updated: Fast fail when reading failed in ComposedClientReadHandler (#213)

This is an automated email from the ASF dual-hosted git repository.

roryqi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git


The following commit(s) were added to refs/heads/master by this push:
     new 54ddca60 Fast fail when reading failed in ComposedClientReadHandler (#213)
54ddca60 is described below

commit 54ddca60f6469483ff87927ea496464cf9750dbb
Author: Junfan Zhang <ju...@outlook.com>
AuthorDate: Tue Sep 27 19:00:09 2022 +0800

    Fast fail when reading failed in ComposedClientReadHandler (#213)
    
    ### What changes were proposed in this pull request?
    Fast fail when reading failed in ComposedClientReadHandler
    
    ### Why are the changes needed?
    It should fast fail on network failure when reading shuffle data from memory/localfile/hdfs in `ComposedClientReadHandler`. If not, it will throw inconsistent blockIds and make users confused.
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    UTs.
---
 .../common/exception/FileNotFoundException.java    | 29 +++++++
 .../ShuffleServerWithLocalOfExceptionTest.java     | 94 ++++++++++++++++++++++
 .../uniffle/test/ShuffleWithRssClientTest.java     | 10 +--
 .../uniffle/test/SparkClientWithLocalTest.java     | 15 ++--
 .../uniffle/server/ShuffleServerGrpcService.java   |  7 ++
 .../handler/impl/ComposedClientReadHandler.java    |  4 +-
 .../handler/impl/LocalFileServerReadHandler.java   |  5 +-
 7 files changed, 142 insertions(+), 22 deletions(-)

diff --git a/common/src/main/java/org/apache/uniffle/common/exception/FileNotFoundException.java b/common/src/main/java/org/apache/uniffle/common/exception/FileNotFoundException.java
new file mode 100644
index 00000000..b7d1a34b
--- /dev/null
+++ b/common/src/main/java/org/apache/uniffle/common/exception/FileNotFoundException.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.common.exception;
+
+public class FileNotFoundException extends RuntimeException {
+
+  public FileNotFoundException(String message) {
+    super(message);
+  }
+
+  public FileNotFoundException(String message, Throwable e) {
+    super(message, e);
+  }
+}
diff --git a/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerWithLocalOfExceptionTest.java b/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerWithLocalOfExceptionTest.java
new file mode 100644
index 00000000..03420fba
--- /dev/null
+++ b/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerWithLocalOfExceptionTest.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.test;
+
+import java.io.File;
+
+import com.google.common.collect.Lists;
+import com.google.common.io.Files;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import org.apache.uniffle.client.impl.grpc.ShuffleServerGrpcClient;
+import org.apache.uniffle.common.ShuffleDataResult;
+import org.apache.uniffle.common.exception.RssException;
+import org.apache.uniffle.coordinator.CoordinatorConf;
+import org.apache.uniffle.server.ShuffleServerConf;
+import org.apache.uniffle.storage.handler.api.ClientReadHandler;
+import org.apache.uniffle.storage.handler.impl.ComposedClientReadHandler;
+import org.apache.uniffle.storage.handler.impl.MemoryQuorumClientReadHandler;
+import org.apache.uniffle.storage.util.StorageType;
+
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.fail;
+
+public class ShuffleServerWithLocalOfExceptionTest extends ShuffleReadWriteBase {
+
+  private ShuffleServerGrpcClient shuffleServerClient;
+  private static String REMOTE_STORAGE = HDFS_URI + "rss/test";
+
+  @BeforeAll
+  public static void setupServers() throws Exception {
+    CoordinatorConf coordinatorConf = getCoordinatorConf();
+    createCoordinatorServer(coordinatorConf);
+
+    ShuffleServerConf shuffleServerConf = getShuffleServerConf();
+    File tmpDir = Files.createTempDir();
+    File dataDir1 = new File(tmpDir, "data1");
+    File dataDir2 = new File(tmpDir, "data2");
+    String basePath = dataDir1.getAbsolutePath() + "," + dataDir2.getAbsolutePath();
+    shuffleServerConf.setString("rss.storage.type", StorageType.LOCALFILE.name());
+    shuffleServerConf.setString("rss.storage.basePath", basePath);
+    shuffleServerConf.setString("rss.server.app.expired.withoutHeartbeat", "5000");
+    createShuffleServer(shuffleServerConf);
+
+    startServers();
+  }
+
+  @BeforeEach
+  public void createClient() {
+    shuffleServerClient = new ShuffleServerGrpcClient(LOCALHOST, SHUFFLE_SERVER_PORT);
+  }
+
+  @AfterEach
+  public void closeClient() {
+    shuffleServerClient.close();
+  }
+
+  @Test
+  public void testReadWhenConnectionFailedShouldThrowException() throws Exception {
+    String testAppId = "testReadWhenException";
+    int shuffleId = 0;
+    int partitionId = 0;
+
+    MemoryQuorumClientReadHandler memoryQuorumClientReadHandler = new MemoryQuorumClientReadHandler(
+        testAppId, shuffleId, partitionId, 150, Lists.newArrayList(shuffleServerClient));
+    ClientReadHandler[] handlers = new ClientReadHandler[1];
+    handlers[0] = memoryQuorumClientReadHandler;
+    ComposedClientReadHandler composedClientReadHandler = new ComposedClientReadHandler(handlers);
+    shuffleServers.get(0).stopServer();
+    try {
+      ShuffleDataResult sdr  = composedClientReadHandler.readShuffleData();
+      fail("Should throw connection exception directly.");
+    } catch (RssException rssException) {
+      assertTrue(rssException.getMessage().contains("Failed to read shuffle data from HOT handler"));
+    }
+  }
+}
diff --git a/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleWithRssClientTest.java b/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleWithRssClientTest.java
index 29532908..86e78cff 100644
--- a/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleWithRssClientTest.java
+++ b/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleWithRssClientTest.java
@@ -52,8 +52,8 @@ import org.apache.uniffle.storage.util.StorageType;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
 import static org.junit.jupiter.api.Assertions.assertTrue;
-import static org.junit.jupiter.api.Assertions.fail;
 
 public class ShuffleWithRssClientTest extends ShuffleReadWriteBase {
 
@@ -232,13 +232,7 @@ public class ShuffleWithRssClientTest extends ShuffleReadWriteBase {
     ShuffleReadClientImpl readClient = new ShuffleReadClientImpl(StorageType.LOCALFILE.name(), testAppId, 0, 0, 100, 1,
         10, 1000, "", blockIdBitmap, taskIdBitmap,
         Lists.newArrayList(shuffleServerInfo1, shuffleServerInfo2), null, new DefaultIdHelper());
-
-    try {
-      readClient.readShuffleBlockData();
-      fail(EXPECTED_EXCEPTION_MESSAGE);
-    } catch (Exception e) {
-      assertTrue(e.getMessage().contains("Failed to read all replicas for"));
-    }
+    assertNull(readClient.readShuffleBlockData());
     readClient.close();
 
     // send 2nd commit, data will be persisted to disk
diff --git a/integration-test/spark-common/src/test/java/org/apache/uniffle/test/SparkClientWithLocalTest.java b/integration-test/spark-common/src/test/java/org/apache/uniffle/test/SparkClientWithLocalTest.java
index 96ed80e3..f09be54a 100644
--- a/integration-test/spark-common/src/test/java/org/apache/uniffle/test/SparkClientWithLocalTest.java
+++ b/integration-test/spark-common/src/test/java/org/apache/uniffle/test/SparkClientWithLocalTest.java
@@ -142,25 +142,20 @@ public class SparkClientWithLocalTest extends ShuffleReadWriteBase {
 
     Map<Long, byte[]> expectedData = Maps.newHashMap();
     Roaring64NavigableMap blockIdBitmap = Roaring64NavigableMap.bitmapOf();
-    Roaring64NavigableMap taskIdBitmap = Roaring64NavigableMap.bitmapOf(0);
     List<ShuffleBlockInfo> blocks = createShuffleBlockList(
         0, 0, 0, 2, 30, blockIdBitmap, expectedData, mockSSI);
     sendTestData(testAppId, blocks);
 
-    ShuffleReadClientImpl readClient = new ShuffleReadClientImpl(StorageType.LOCALFILE.name(),
-        testAppId, 0, 0, 100, 1, 10, 1000,
-        "", blockIdBitmap, taskIdBitmap, shuffleServerInfo, null, new DefaultIdHelper());
     FileUtils.deleteDirectory(new File(DATA_DIR1.getAbsolutePath() + "/" + testAppId + "/0/0-0"));
     FileUtils.deleteDirectory(new File(DATA_DIR2.getAbsolutePath() + "/" + testAppId + "/0/0-0"));
     // sleep to wait delete operation
     Thread.sleep(2000);
 
-    try {
-      readClient.readShuffleBlockData();
-      fail(EXPECTED_EXCEPTION_MESSAGE);
-    } catch (Exception e) {
-      assertTrue(e.getMessage().contains("Failed to read all replicas for"));
-    }
+    Roaring64NavigableMap taskIdBitmap = Roaring64NavigableMap.bitmapOf(0);
+    ShuffleReadClientImpl readClient = new ShuffleReadClientImpl(StorageType.LOCALFILE.name(),
+        testAppId, 0, 0, 100, 1, 10, 1000,
+        "", blockIdBitmap, taskIdBitmap, shuffleServerInfo, null, new DefaultIdHelper());
+    assertNull(readClient.readShuffleBlockData());
     readClient.close();
   }
 
diff --git a/server/src/main/java/org/apache/uniffle/server/ShuffleServerGrpcService.java b/server/src/main/java/org/apache/uniffle/server/ShuffleServerGrpcService.java
index 0501deca..e08fd1c8 100644
--- a/server/src/main/java/org/apache/uniffle/server/ShuffleServerGrpcService.java
+++ b/server/src/main/java/org/apache/uniffle/server/ShuffleServerGrpcService.java
@@ -40,6 +40,7 @@ import org.apache.uniffle.common.ShuffleIndexResult;
 import org.apache.uniffle.common.ShufflePartitionedBlock;
 import org.apache.uniffle.common.ShufflePartitionedData;
 import org.apache.uniffle.common.config.RssBaseConf;
+import org.apache.uniffle.common.exception.FileNotFoundException;
 import org.apache.uniffle.proto.RssProtos;
 import org.apache.uniffle.proto.RssProtos.AppHeartBeatRequest;
 import org.apache.uniffle.proto.RssProtos.AppHeartBeatResponse;
@@ -533,6 +534,12 @@ public class ShuffleServerGrpcService extends ShuffleServerImplBase {
 
         builder.setIndexData(UnsafeByteOperations.unsafeWrap(data));
         reply = builder.build();
+      } catch (FileNotFoundException indexFileNotFoundException) {
+        LOG.warn("Index file for {} is not found, maybe the data has been flushed to cold storage.",
+            requestInfo, indexFileNotFoundException);
+        reply = GetLocalShuffleIndexResponse.newBuilder()
+            .setStatus(valueOf(status))
+            .build();
       } catch (Exception e) {
         status = StatusCode.INTERNAL_ERROR;
         msg = "Error happened when get shuffle index for " + requestInfo + ", " + e.getMessage();
diff --git a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/ComposedClientReadHandler.java b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/ComposedClientReadHandler.java
index f990c383..aab15b5d 100644
--- a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/ComposedClientReadHandler.java
+++ b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/ComposedClientReadHandler.java
@@ -25,9 +25,9 @@ import org.slf4j.LoggerFactory;
 
 import org.apache.uniffle.common.BufferSegment;
 import org.apache.uniffle.common.ShuffleDataResult;
+import org.apache.uniffle.common.exception.RssException;
 import org.apache.uniffle.storage.handler.api.ClientReadHandler;
 
-
 public class ComposedClientReadHandler implements ClientReadHandler {
 
   private static final Logger LOG = LoggerFactory.getLogger(ComposedClientReadHandler.class);
@@ -127,7 +127,7 @@ public class ComposedClientReadHandler implements ClientReadHandler {
           return null;
       }
     } catch (Exception e) {
-      LOG.error("Failed to read shuffle data from " + getCurrentHandlerName() + " handler", e);
+      throw new RssException("Failed to read shuffle data from " + getCurrentHandlerName() + " handler", e);
     }
     // when is no data for current handler, and the upmostLevel is not reached,
     // then try next one if there has
diff --git a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/LocalFileServerReadHandler.java b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/LocalFileServerReadHandler.java
index 1eeb75bc..d1d0bc7d 100644
--- a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/LocalFileServerReadHandler.java
+++ b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/LocalFileServerReadHandler.java
@@ -25,6 +25,7 @@ import org.slf4j.LoggerFactory;
 
 import org.apache.uniffle.common.ShuffleDataResult;
 import org.apache.uniffle.common.ShuffleIndexResult;
+import org.apache.uniffle.common.exception.FileNotFoundException;
 import org.apache.uniffle.common.util.Constants;
 import org.apache.uniffle.storage.common.FileBasedShuffleSegment;
 import org.apache.uniffle.storage.handler.api.ServerReadHandler;
@@ -80,7 +81,7 @@ public class LocalFileServerReadHandler implements ServerReadHandler {
     File baseFolder = new File(fullShufflePath);
     if (!baseFolder.exists()) {
       // the partition doesn't exist in this base folder, skip
-      throw new RuntimeException("Can't find folder " + fullShufflePath);
+      throw new FileNotFoundException("Can't find folder " + fullShufflePath);
     }
     File[] indexFiles;
     String failedGetIndexFileMsg = "No index file found in  " + storageBasePath;
@@ -93,7 +94,7 @@ public class LocalFileServerReadHandler implements ServerReadHandler {
         }
       });
     } catch (Exception e) {
-      throw new RuntimeException(failedGetIndexFileMsg, e);
+      throw new FileNotFoundException(failedGetIndexFileMsg, e);
     }
 
     if (indexFiles != null && indexFiles.length > 0) {