You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uniffle.apache.org by ro...@apache.org on 2022/09/27 11:00:14 UTC
[incubator-uniffle] branch master updated: Fast fail when reading failed in ComposedClientReadHandler (#213)
This is an automated email from the ASF dual-hosted git repository.
roryqi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git
The following commit(s) were added to refs/heads/master by this push:
new 54ddca60 Fast fail when reading failed in ComposedClientReadHandler (#213)
54ddca60 is described below
commit 54ddca60f6469483ff87927ea496464cf9750dbb
Author: Junfan Zhang <ju...@outlook.com>
AuthorDate: Tue Sep 27 19:00:09 2022 +0800
Fast fail when reading failed in ComposedClientReadHandler (#213)
### What changes were proposed in this pull request?
Fast fail when reading failed in ComposedClientReadHandler
### Why are the changes needed?
It should fast fail on network failure when reading shuffle data from memory/localfile/hdfs in `ComposedClientReadHandler`. If not, it will throw inconsistent blockIds and make users confused.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
UTs.
---
.../common/exception/FileNotFoundException.java | 29 +++++++
.../ShuffleServerWithLocalOfExceptionTest.java | 94 ++++++++++++++++++++++
.../uniffle/test/ShuffleWithRssClientTest.java | 10 +--
.../uniffle/test/SparkClientWithLocalTest.java | 15 ++--
.../uniffle/server/ShuffleServerGrpcService.java | 7 ++
.../handler/impl/ComposedClientReadHandler.java | 4 +-
.../handler/impl/LocalFileServerReadHandler.java | 5 +-
7 files changed, 142 insertions(+), 22 deletions(-)
diff --git a/common/src/main/java/org/apache/uniffle/common/exception/FileNotFoundException.java b/common/src/main/java/org/apache/uniffle/common/exception/FileNotFoundException.java
new file mode 100644
index 00000000..b7d1a34b
--- /dev/null
+++ b/common/src/main/java/org/apache/uniffle/common/exception/FileNotFoundException.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.common.exception;
+
+public class FileNotFoundException extends RuntimeException {
+
+ public FileNotFoundException(String message) {
+ super(message);
+ }
+
+ public FileNotFoundException(String message, Throwable e) {
+ super(message, e);
+ }
+}
diff --git a/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerWithLocalOfExceptionTest.java b/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerWithLocalOfExceptionTest.java
new file mode 100644
index 00000000..03420fba
--- /dev/null
+++ b/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerWithLocalOfExceptionTest.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.test;
+
+import java.io.File;
+
+import com.google.common.collect.Lists;
+import com.google.common.io.Files;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import org.apache.uniffle.client.impl.grpc.ShuffleServerGrpcClient;
+import org.apache.uniffle.common.ShuffleDataResult;
+import org.apache.uniffle.common.exception.RssException;
+import org.apache.uniffle.coordinator.CoordinatorConf;
+import org.apache.uniffle.server.ShuffleServerConf;
+import org.apache.uniffle.storage.handler.api.ClientReadHandler;
+import org.apache.uniffle.storage.handler.impl.ComposedClientReadHandler;
+import org.apache.uniffle.storage.handler.impl.MemoryQuorumClientReadHandler;
+import org.apache.uniffle.storage.util.StorageType;
+
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.fail;
+
+public class ShuffleServerWithLocalOfExceptionTest extends ShuffleReadWriteBase {
+
+ private ShuffleServerGrpcClient shuffleServerClient;
+ private static String REMOTE_STORAGE = HDFS_URI + "rss/test";
+
+ @BeforeAll
+ public static void setupServers() throws Exception {
+ CoordinatorConf coordinatorConf = getCoordinatorConf();
+ createCoordinatorServer(coordinatorConf);
+
+ ShuffleServerConf shuffleServerConf = getShuffleServerConf();
+ File tmpDir = Files.createTempDir();
+ File dataDir1 = new File(tmpDir, "data1");
+ File dataDir2 = new File(tmpDir, "data2");
+ String basePath = dataDir1.getAbsolutePath() + "," + dataDir2.getAbsolutePath();
+ shuffleServerConf.setString("rss.storage.type", StorageType.LOCALFILE.name());
+ shuffleServerConf.setString("rss.storage.basePath", basePath);
+ shuffleServerConf.setString("rss.server.app.expired.withoutHeartbeat", "5000");
+ createShuffleServer(shuffleServerConf);
+
+ startServers();
+ }
+
+ @BeforeEach
+ public void createClient() {
+ shuffleServerClient = new ShuffleServerGrpcClient(LOCALHOST, SHUFFLE_SERVER_PORT);
+ }
+
+ @AfterEach
+ public void closeClient() {
+ shuffleServerClient.close();
+ }
+
+ @Test
+ public void testReadWhenConnectionFailedShouldThrowException() throws Exception {
+ String testAppId = "testReadWhenException";
+ int shuffleId = 0;
+ int partitionId = 0;
+
+ MemoryQuorumClientReadHandler memoryQuorumClientReadHandler = new MemoryQuorumClientReadHandler(
+ testAppId, shuffleId, partitionId, 150, Lists.newArrayList(shuffleServerClient));
+ ClientReadHandler[] handlers = new ClientReadHandler[1];
+ handlers[0] = memoryQuorumClientReadHandler;
+ ComposedClientReadHandler composedClientReadHandler = new ComposedClientReadHandler(handlers);
+ shuffleServers.get(0).stopServer();
+ try {
+ ShuffleDataResult sdr = composedClientReadHandler.readShuffleData();
+ fail("Should throw connection exception directly.");
+ } catch (RssException rssException) {
+ assertTrue(rssException.getMessage().contains("Failed to read shuffle data from HOT handler"));
+ }
+ }
+}
diff --git a/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleWithRssClientTest.java b/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleWithRssClientTest.java
index 29532908..86e78cff 100644
--- a/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleWithRssClientTest.java
+++ b/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleWithRssClientTest.java
@@ -52,8 +52,8 @@ import org.apache.uniffle.storage.util.StorageType;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
-import static org.junit.jupiter.api.Assertions.fail;
public class ShuffleWithRssClientTest extends ShuffleReadWriteBase {
@@ -232,13 +232,7 @@ public class ShuffleWithRssClientTest extends ShuffleReadWriteBase {
ShuffleReadClientImpl readClient = new ShuffleReadClientImpl(StorageType.LOCALFILE.name(), testAppId, 0, 0, 100, 1,
10, 1000, "", blockIdBitmap, taskIdBitmap,
Lists.newArrayList(shuffleServerInfo1, shuffleServerInfo2), null, new DefaultIdHelper());
-
- try {
- readClient.readShuffleBlockData();
- fail(EXPECTED_EXCEPTION_MESSAGE);
- } catch (Exception e) {
- assertTrue(e.getMessage().contains("Failed to read all replicas for"));
- }
+ assertNull(readClient.readShuffleBlockData());
readClient.close();
// send 2nd commit, data will be persisted to disk
diff --git a/integration-test/spark-common/src/test/java/org/apache/uniffle/test/SparkClientWithLocalTest.java b/integration-test/spark-common/src/test/java/org/apache/uniffle/test/SparkClientWithLocalTest.java
index 96ed80e3..f09be54a 100644
--- a/integration-test/spark-common/src/test/java/org/apache/uniffle/test/SparkClientWithLocalTest.java
+++ b/integration-test/spark-common/src/test/java/org/apache/uniffle/test/SparkClientWithLocalTest.java
@@ -142,25 +142,20 @@ public class SparkClientWithLocalTest extends ShuffleReadWriteBase {
Map<Long, byte[]> expectedData = Maps.newHashMap();
Roaring64NavigableMap blockIdBitmap = Roaring64NavigableMap.bitmapOf();
- Roaring64NavigableMap taskIdBitmap = Roaring64NavigableMap.bitmapOf(0);
List<ShuffleBlockInfo> blocks = createShuffleBlockList(
0, 0, 0, 2, 30, blockIdBitmap, expectedData, mockSSI);
sendTestData(testAppId, blocks);
- ShuffleReadClientImpl readClient = new ShuffleReadClientImpl(StorageType.LOCALFILE.name(),
- testAppId, 0, 0, 100, 1, 10, 1000,
- "", blockIdBitmap, taskIdBitmap, shuffleServerInfo, null, new DefaultIdHelper());
FileUtils.deleteDirectory(new File(DATA_DIR1.getAbsolutePath() + "/" + testAppId + "/0/0-0"));
FileUtils.deleteDirectory(new File(DATA_DIR2.getAbsolutePath() + "/" + testAppId + "/0/0-0"));
// sleep to wait delete operation
Thread.sleep(2000);
- try {
- readClient.readShuffleBlockData();
- fail(EXPECTED_EXCEPTION_MESSAGE);
- } catch (Exception e) {
- assertTrue(e.getMessage().contains("Failed to read all replicas for"));
- }
+ Roaring64NavigableMap taskIdBitmap = Roaring64NavigableMap.bitmapOf(0);
+ ShuffleReadClientImpl readClient = new ShuffleReadClientImpl(StorageType.LOCALFILE.name(),
+ testAppId, 0, 0, 100, 1, 10, 1000,
+ "", blockIdBitmap, taskIdBitmap, shuffleServerInfo, null, new DefaultIdHelper());
+ assertNull(readClient.readShuffleBlockData());
readClient.close();
}
diff --git a/server/src/main/java/org/apache/uniffle/server/ShuffleServerGrpcService.java b/server/src/main/java/org/apache/uniffle/server/ShuffleServerGrpcService.java
index 0501deca..e08fd1c8 100644
--- a/server/src/main/java/org/apache/uniffle/server/ShuffleServerGrpcService.java
+++ b/server/src/main/java/org/apache/uniffle/server/ShuffleServerGrpcService.java
@@ -40,6 +40,7 @@ import org.apache.uniffle.common.ShuffleIndexResult;
import org.apache.uniffle.common.ShufflePartitionedBlock;
import org.apache.uniffle.common.ShufflePartitionedData;
import org.apache.uniffle.common.config.RssBaseConf;
+import org.apache.uniffle.common.exception.FileNotFoundException;
import org.apache.uniffle.proto.RssProtos;
import org.apache.uniffle.proto.RssProtos.AppHeartBeatRequest;
import org.apache.uniffle.proto.RssProtos.AppHeartBeatResponse;
@@ -533,6 +534,12 @@ public class ShuffleServerGrpcService extends ShuffleServerImplBase {
builder.setIndexData(UnsafeByteOperations.unsafeWrap(data));
reply = builder.build();
+ } catch (FileNotFoundException indexFileNotFoundException) {
+ LOG.warn("Index file for {} is not found, maybe the data has been flushed to cold storage.",
+ requestInfo, indexFileNotFoundException);
+ reply = GetLocalShuffleIndexResponse.newBuilder()
+ .setStatus(valueOf(status))
+ .build();
} catch (Exception e) {
status = StatusCode.INTERNAL_ERROR;
msg = "Error happened when get shuffle index for " + requestInfo + ", " + e.getMessage();
diff --git a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/ComposedClientReadHandler.java b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/ComposedClientReadHandler.java
index f990c383..aab15b5d 100644
--- a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/ComposedClientReadHandler.java
+++ b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/ComposedClientReadHandler.java
@@ -25,9 +25,9 @@ import org.slf4j.LoggerFactory;
import org.apache.uniffle.common.BufferSegment;
import org.apache.uniffle.common.ShuffleDataResult;
+import org.apache.uniffle.common.exception.RssException;
import org.apache.uniffle.storage.handler.api.ClientReadHandler;
-
public class ComposedClientReadHandler implements ClientReadHandler {
private static final Logger LOG = LoggerFactory.getLogger(ComposedClientReadHandler.class);
@@ -127,7 +127,7 @@ public class ComposedClientReadHandler implements ClientReadHandler {
return null;
}
} catch (Exception e) {
- LOG.error("Failed to read shuffle data from " + getCurrentHandlerName() + " handler", e);
+ throw new RssException("Failed to read shuffle data from " + getCurrentHandlerName() + " handler", e);
}
// when is no data for current handler, and the upmostLevel is not reached,
// then try next one if there has
diff --git a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/LocalFileServerReadHandler.java b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/LocalFileServerReadHandler.java
index 1eeb75bc..d1d0bc7d 100644
--- a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/LocalFileServerReadHandler.java
+++ b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/LocalFileServerReadHandler.java
@@ -25,6 +25,7 @@ import org.slf4j.LoggerFactory;
import org.apache.uniffle.common.ShuffleDataResult;
import org.apache.uniffle.common.ShuffleIndexResult;
+import org.apache.uniffle.common.exception.FileNotFoundException;
import org.apache.uniffle.common.util.Constants;
import org.apache.uniffle.storage.common.FileBasedShuffleSegment;
import org.apache.uniffle.storage.handler.api.ServerReadHandler;
@@ -80,7 +81,7 @@ public class LocalFileServerReadHandler implements ServerReadHandler {
File baseFolder = new File(fullShufflePath);
if (!baseFolder.exists()) {
// the partition doesn't exist in this base folder, skip
- throw new RuntimeException("Can't find folder " + fullShufflePath);
+ throw new FileNotFoundException("Can't find folder " + fullShufflePath);
}
File[] indexFiles;
String failedGetIndexFileMsg = "No index file found in " + storageBasePath;
@@ -93,7 +94,7 @@ public class LocalFileServerReadHandler implements ServerReadHandler {
}
});
} catch (Exception e) {
- throw new RuntimeException(failedGetIndexFileMsg, e);
+ throw new FileNotFoundException(failedGetIndexFileMsg, e);
}
if (indexFiles != null && indexFiles.length > 0) {