You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uniffle.apache.org by ro...@apache.org on 2023/03/22 16:06:09 UTC
[incubator-uniffle] branch master updated: [#752] refactor: replace RuntimeException with RssException (#753)
This is an automated email from the ASF dual-hosted git repository.
roryqi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git
The following commit(s) were added to refs/heads/master by this push:
new 8db93301 [#752] refactor: replace RuntimeException with RssException (#753)
8db93301 is described below
commit 8db93301da38fb9cd551329f72cea87b08da991d
Author: Kaijie Chen <ck...@apache.org>
AuthorDate: Thu Mar 23 00:06:01 2023 +0800
[#752] refactor: replace RuntimeException with RssException (#753)
### What changes were proposed in this pull request?
Replace raw use of `RuntimeException` with `RssException` in src/main.
Enable spotbugs check: [THROWS_METHOD_THROWS_RUNTIMEEXCEPTION][1]
[1]: https://spotbugs.readthedocs.io/en/stable/bugDescriptions.html#throws-method-intentionally-throws-runtimeexception-throws-method-throws-runtimeexception
### Why are the changes needed?
Fix: #752
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Tested by spotbugs and existing tests.
---
.../org/apache/hadoop/mapred/SortWriteBufferManager.java | 2 +-
.../main/java/org/apache/hadoop/mapreduce/RssMRUtils.java | 8 ++++----
.../mapreduce/task/reduce/RssRemoteMergeManagerImpl.java | 2 +-
.../org/apache/hadoop/mapreduce/v2/app/RssMRAppMaster.java | 4 ++--
.../apache/spark/shuffle/writer/BufferManagerOptions.java | 6 ++++--
.../java/org/apache/spark/network/util/NettyUtils.java | 4 +++-
.../java/org/apache/spark/shuffle/RssShuffleManager.java | 8 ++++----
.../org/apache/spark/shuffle/writer/RssShuffleWriter.java | 2 +-
.../apache/spark/shuffle/DelegationRssShuffleManager.java | 4 ++--
.../java/org/apache/spark/shuffle/RssShuffleManager.java | 14 +++++++-------
.../org/apache/spark/shuffle/writer/RssShuffleWriter.java | 2 +-
.../main/java/org/apache/uniffle/common/BufferSegment.java | 4 +++-
.../org/apache/uniffle/common/exception/RssException.java | 4 ++++
.../prometheus/PrometheusPushGatewayMetricReporter.java | 5 +++--
.../uniffle/common/security/SecurityContextFactory.java | 4 +++-
.../main/java/org/apache/uniffle/common/util/RssUtils.java | 14 +++++++-------
.../org/apache/uniffle/coordinator/CoordinatorServer.java | 3 ++-
.../apache/uniffle/coordinator/SimpleClusterManager.java | 3 ++-
.../access/checker/AccessCandidatesChecker.java | 5 +++--
.../strategy/assignment/AbstractAssignmentStrategy.java | 5 +++--
.../assignment/PartitionBalanceAssignmentStrategy.java | 5 +++--
.../uniffle/client/factory/CoordinatorClientFactory.java | 5 +++--
.../uniffle/client/impl/grpc/ShuffleServerGrpcClient.java | 4 ++--
.../main/java/org/apache/uniffle/server/ShuffleServer.java | 3 ++-
.../java/org/apache/uniffle/server/ShuffleTaskManager.java | 7 ++++---
.../storage/AbstractStorageManagerFallbackStrategy.java | 3 ++-
.../apache/uniffle/server/storage/HdfsStorageManager.java | 3 ++-
.../apache/uniffle/server/storage/LocalStorageManager.java | 3 ++-
.../apache/uniffle/server/storage/MultiStorageManager.java | 3 ++-
spotbugs-exclude.xml | 2 +-
.../org/apache/uniffle/storage/common/HdfsStorage.java | 5 +++--
.../org/apache/uniffle/storage/common/LocalStorage.java | 3 ++-
.../uniffle/storage/factory/ShuffleHandlerFactory.java | 2 +-
.../storage/handler/impl/HdfsClientReadHandler.java | 3 ++-
.../storage/handler/impl/HdfsShuffleWriteHandler.java | 3 ++-
.../uniffle/storage/handler/impl/LocalFileReader.java | 3 ++-
.../storage/handler/impl/LocalFileServerReadHandler.java | 3 ++-
.../storage/handler/impl/LocalFileWriteHandler.java | 2 +-
.../handler/impl/PooledHdfsShuffleWriteHandler.java | 3 ++-
.../apache/uniffle/storage/util/ShuffleStorageUtils.java | 3 ++-
40 files changed, 102 insertions(+), 69 deletions(-)
diff --git a/client-mr/src/main/java/org/apache/hadoop/mapred/SortWriteBufferManager.java b/client-mr/src/main/java/org/apache/hadoop/mapred/SortWriteBufferManager.java
index cb76bef2..f36b429d 100644
--- a/client-mr/src/main/java/org/apache/hadoop/mapred/SortWriteBufferManager.java
+++ b/client-mr/src/main/java/org/apache/hadoop/mapred/SortWriteBufferManager.java
@@ -350,7 +350,7 @@ public class SortWriteBufferManager<K, V> {
} catch (InterruptedException ie) {
LOG.warn("Ignore the InterruptedException which should be caused by internal killed");
} catch (Exception e) {
- throw new RuntimeException("Exception happened when get commit status", e);
+ throw new RssException("Exception happened when get commit status", e);
} finally {
executor.shutdown();
}
diff --git a/client-mr/src/main/java/org/apache/hadoop/mapreduce/RssMRUtils.java b/client-mr/src/main/java/org/apache/hadoop/mapreduce/RssMRUtils.java
index 942b93f8..7a732353 100644
--- a/client-mr/src/main/java/org/apache/hadoop/mapreduce/RssMRUtils.java
+++ b/client-mr/src/main/java/org/apache/hadoop/mapreduce/RssMRUtils.java
@@ -170,22 +170,22 @@ public class RssMRUtils {
public static long getBlockId(long partitionId, long taskAttemptId, int nextSeqNo) {
long attemptId = taskAttemptId >> (Constants.PARTITION_ID_MAX_LENGTH + Constants.TASK_ATTEMPT_ID_MAX_LENGTH);
if (attemptId < 0 || attemptId > MAX_ATTEMPT_ID) {
- throw new RuntimeException("Can't support attemptId [" + attemptId
+ throw new RssException("Can't support attemptId [" + attemptId
+ "], the max value should be " + MAX_ATTEMPT_ID);
}
long atomicInt = (nextSeqNo << MAX_ATTEMPT_LENGTH) + attemptId;
if (atomicInt < 0 || atomicInt > Constants.MAX_SEQUENCE_NO) {
- throw new RuntimeException("Can't support sequence [" + atomicInt
+ throw new RssException("Can't support sequence [" + atomicInt
+ "], the max value should be " + Constants.MAX_SEQUENCE_NO);
}
if (partitionId < 0 || partitionId > Constants.MAX_PARTITION_ID) {
- throw new RuntimeException("Can't support partitionId["
+ throw new RssException("Can't support partitionId["
+ partitionId + "], the max value should be " + Constants.MAX_PARTITION_ID);
}
long taskId = taskAttemptId - (attemptId
<< (Constants.PARTITION_ID_MAX_LENGTH + Constants.TASK_ATTEMPT_ID_MAX_LENGTH));
if (taskId < 0 || taskId > Constants.MAX_TASK_ATTEMPT_ID) {
- throw new RuntimeException("Can't support taskId["
+ throw new RssException("Can't support taskId["
+ taskId + "], the max value should be " + Constants.MAX_TASK_ATTEMPT_ID);
}
return (atomicInt << (Constants.PARTITION_ID_MAX_LENGTH + Constants.TASK_ATTEMPT_ID_MAX_LENGTH))
diff --git a/client-mr/src/main/java/org/apache/hadoop/mapreduce/task/reduce/RssRemoteMergeManagerImpl.java b/client-mr/src/main/java/org/apache/hadoop/mapreduce/task/reduce/RssRemoteMergeManagerImpl.java
index 54e1e104..3ef7b20c 100644
--- a/client-mr/src/main/java/org/apache/hadoop/mapreduce/task/reduce/RssRemoteMergeManagerImpl.java
+++ b/client-mr/src/main/java/org/apache/hadoop/mapreduce/task/reduce/RssRemoteMergeManagerImpl.java
@@ -149,7 +149,7 @@ public class RssRemoteMergeManagerImpl<K, V> extends MergeManagerImpl<K, V> {
remoteConf
);
} catch (Exception e) {
- throw new RuntimeException("Cannot init remoteFS on path:" + basePath);
+ throw new RssException("Cannot init remoteFS on path:" + basePath);
}
this.basePath = basePath;
diff --git a/client-mr/src/main/java/org/apache/hadoop/mapreduce/v2/app/RssMRAppMaster.java b/client-mr/src/main/java/org/apache/hadoop/mapreduce/v2/app/RssMRAppMaster.java
index 044eae0b..a82b10ba 100644
--- a/client-mr/src/main/java/org/apache/hadoop/mapreduce/v2/app/RssMRAppMaster.java
+++ b/client-mr/src/main/java/org/apache/hadoop/mapreduce/v2/app/RssMRAppMaster.java
@@ -287,7 +287,7 @@ public class RssMRAppMaster extends MRAppMaster {
String jobDirStr = conf.get(MRJobConfig.MAPREDUCE_JOB_DIR);
if (jobDirStr == null) {
- throw new RuntimeException("jobDir is empty");
+ throw new RssException("jobDir is empty");
}
}
try {
@@ -375,7 +375,7 @@ public class RssMRAppMaster extends MRAppMaster {
conf.set(MRJobConfig.CACHE_FILES_SIZES, sizes == null ? String.valueOf(size) : size + "," + sizes);
} catch (InterruptedException | IOException e) {
LOG.error("Upload extra conf exception", e);
- throw new RuntimeException("Upload extra conf exception ", e);
+ throw new RssException("Upload extra conf exception ", e);
}
}
diff --git a/client-spark/common/src/main/java/org/apache/spark/shuffle/writer/BufferManagerOptions.java b/client-spark/common/src/main/java/org/apache/spark/shuffle/writer/BufferManagerOptions.java
index 3ae878cc..fd0a7267 100644
--- a/client-spark/common/src/main/java/org/apache/spark/shuffle/writer/BufferManagerOptions.java
+++ b/client-spark/common/src/main/java/org/apache/spark/shuffle/writer/BufferManagerOptions.java
@@ -22,6 +22,8 @@ import org.apache.spark.shuffle.RssSparkConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.uniffle.common.exception.RssException;
+
public class BufferManagerOptions {
private static final Logger LOG = LoggerFactory.getLogger(BufferManagerOptions.class);
@@ -55,11 +57,11 @@ public class BufferManagerOptions {
private void checkBufferSize() {
if (bufferSize < 0) {
- throw new RuntimeException("Unexpected value of " + RssSparkConfig.RSS_WRITER_BUFFER_SIZE.key()
+ throw new RssException("Unexpected value of " + RssSparkConfig.RSS_WRITER_BUFFER_SIZE.key()
+ "=" + bufferSize);
}
if (bufferSpillThreshold < 0) {
- throw new RuntimeException("Unexpected value of " + RssSparkConfig.RSS_WRITER_BUFFER_SPILL_SIZE.key()
+ throw new RssException("Unexpected value of " + RssSparkConfig.RSS_WRITER_BUFFER_SPILL_SIZE.key()
+ "=" + bufferSpillThreshold);
}
if (bufferSegmentSize > bufferSize) {
diff --git a/client-spark/spark2/src/main/java/org/apache/spark/network/util/NettyUtils.java b/client-spark/spark2/src/main/java/org/apache/spark/network/util/NettyUtils.java
index b231e626..c2d9492d 100644
--- a/client-spark/spark2/src/main/java/org/apache/spark/network/util/NettyUtils.java
+++ b/client-spark/spark2/src/main/java/org/apache/spark/network/util/NettyUtils.java
@@ -34,6 +34,8 @@ import io.netty.util.concurrent.DefaultThreadFactory;
import io.netty.util.internal.PlatformDependent;
import io.netty.util.internal.SystemPropertyUtil;
+import org.apache.uniffle.common.exception.RssException;
+
/** copy from spark, In order to override the createPooledByteBufAllocator method,
* the property DEFAULT_TINY_CACHE_SIZE does not exist in netty>4.1.47. */
public class NettyUtils {
@@ -131,7 +133,7 @@ public class NettyUtils {
f.setAccessible(true);
return f.getInt(null);
} catch (Exception e) {
- throw new RuntimeException(e);
+ throw new RssException(e);
}
}
}
diff --git a/client-spark/spark2/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java b/client-spark/spark2/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
index 61e201cb..e7efbf86 100644
--- a/client-spark/spark2/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
+++ b/client-spark/spark2/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
@@ -374,7 +374,7 @@ public class RssShuffleManager implements ShuffleManager {
writeMetrics, this, sparkConf, shuffleWriteClient, rssHandle,
(Function<String, Boolean>) this::markFailedTask);
} else {
- throw new RuntimeException("Unexpected ShuffleHandle:" + handle.getClass().getName());
+ throw new RssException("Unexpected ShuffleHandle:" + handle.getClass().getName());
}
}
@@ -423,7 +423,7 @@ public class RssShuffleManager implements ShuffleManager {
storageType, (int) readBufferSize, partitionNumPerRange, partitionNum,
blockIdBitmap, taskIdBitmap, RssSparkConfig.toRssConf(sparkConf));
} else {
- throw new RuntimeException("Unexpected ShuffleHandle:" + handle.getClass().getName());
+ throw new RssException("Unexpected ShuffleHandle:" + handle.getClass().getName());
}
}
@@ -455,7 +455,7 @@ public class RssShuffleManager implements ShuffleManager {
@Override
public ShuffleBlockResolver shuffleBlockResolver() {
- throw new RuntimeException("RssShuffleManager.shuffleBlockResolver is not implemented");
+ throw new RssException("RssShuffleManager.shuffleBlockResolver is not implemented");
}
public EventLoop<AddBlockEvent> getEventLoop() {
@@ -482,7 +482,7 @@ public class RssShuffleManager implements ShuffleManager {
if (topologyInfo.isDefined()) {
taskIdBitmap.addLong(Long.parseLong(tuple2._1().topologyInfo().get()));
} else {
- throw new RuntimeException("Can't get expected taskAttemptId");
+ throw new RssException("Can't get expected taskAttemptId");
}
}
LOG.info("Got result from MapStatus for expected tasks " + taskIdBitmap.getLongCardinality());
diff --git a/client-spark/spark2/src/main/java/org/apache/spark/shuffle/writer/RssShuffleWriter.java b/client-spark/spark2/src/main/java/org/apache/spark/shuffle/writer/RssShuffleWriter.java
index 978924bc..81445f44 100644
--- a/client-spark/spark2/src/main/java/org/apache/spark/shuffle/writer/RssShuffleWriter.java
+++ b/client-spark/spark2/src/main/java/org/apache/spark/shuffle/writer/RssShuffleWriter.java
@@ -278,7 +278,7 @@ public class RssShuffleWriter<K, V, C> extends ShuffleWriter<K, V> {
} catch (InterruptedException ie) {
LOG.warn("Ignore the InterruptedException which should be caused by internal killed");
} catch (Exception e) {
- throw new RuntimeException("Exception happened when get commit status", e);
+ throw new RssException("Exception happened when get commit status", e);
} finally {
executor.shutdown();
}
diff --git a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/DelegationRssShuffleManager.java b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/DelegationRssShuffleManager.java
index 0056ce88..3fa80c11 100644
--- a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/DelegationRssShuffleManager.java
+++ b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/DelegationRssShuffleManager.java
@@ -225,7 +225,7 @@ public class DelegationRssShuffleManager implements ShuffleManager {
context,
metrics);
} catch (Exception e) {
- throw new RuntimeException(e);
+ throw new RssException(e);
}
return reader;
}
@@ -258,7 +258,7 @@ public class DelegationRssShuffleManager implements ShuffleManager {
context,
metrics);
} catch (Exception e) {
- throw new RuntimeException(e);
+ throw new RssException(e);
}
return reader;
}
diff --git a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
index 4a574bc5..504001c5 100644
--- a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
+++ b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
@@ -385,7 +385,7 @@ public class RssShuffleManager implements ShuffleManager {
TaskContext context,
ShuffleWriteMetricsReporter metrics) {
if (!(handle instanceof RssShuffleHandle)) {
- throw new RuntimeException("Unexpected ShuffleHandle:" + handle.getClass().getName());
+ throw new RssException("Unexpected ShuffleHandle:" + handle.getClass().getName());
}
RssShuffleHandle<K, V, ?> rssHandle = (RssShuffleHandle<K, V, ?>) handle;
// todo: this implement is tricky, we should refactor it
@@ -479,7 +479,7 @@ public class RssShuffleManager implements ShuffleManager {
ShuffleReadMetricsReporter metrics,
Roaring64NavigableMap taskIdBitmap) {
if (!(handle instanceof RssShuffleHandle)) {
- throw new RuntimeException("Unexpected ShuffleHandle:" + handle.getClass().getName());
+ throw new RssException("Unexpected ShuffleHandle:" + handle.getClass().getName());
}
final String storageType = sparkConf.get(RssSparkConfig.RSS_STORAGE_TYPE.key());
final int indexReadLimit = sparkConf.get(RssSparkConfig.RSS_INDEX_READ_LIMIT);
@@ -591,14 +591,14 @@ public class RssShuffleManager implements ShuffleManager {
startPartition,
endPartition);
} catch (Exception e) {
- throw new RuntimeException(e);
+ throw new RssException(e);
}
}
}
while (mapStatusIter.hasNext()) {
Tuple2<BlockManagerId, Seq<Tuple3<BlockId, Object, Object>>> tuple2 = mapStatusIter.next();
if (!tuple2._1().topologyInfo().isDefined()) {
- throw new RuntimeException("Can't get expected taskAttemptId");
+ throw new RssException("Can't get expected taskAttemptId");
}
taskIdBitmap.add(Long.parseLong(tuple2._1().topologyInfo().get()));
}
@@ -627,12 +627,12 @@ public class RssShuffleManager implements ShuffleManager {
startPartition,
endPartition);
} catch (Exception e) {
- throw new RuntimeException(e);
+ throw new RssException(e);
}
while (mapStatusIter.hasNext()) {
Tuple2<BlockManagerId, Seq<Tuple3<BlockId, Object, Object>>> tuple2 = mapStatusIter.next();
if (!tuple2._1().topologyInfo().isDefined()) {
- throw new RuntimeException("Can't get expected taskAttemptId");
+ throw new RssException("Can't get expected taskAttemptId");
}
taskIdBitmap.add(Long.parseLong(tuple2._1().topologyInfo().get()));
}
@@ -653,7 +653,7 @@ public class RssShuffleManager implements ShuffleManager {
@Override
public ShuffleBlockResolver shuffleBlockResolver() {
- throw new RuntimeException("RssShuffleManager.shuffleBlockResolver is not implemented");
+ throw new RssException("RssShuffleManager.shuffleBlockResolver is not implemented");
}
@Override
diff --git a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/writer/RssShuffleWriter.java b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/writer/RssShuffleWriter.java
index d0a6ea71..4784c390 100644
--- a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/writer/RssShuffleWriter.java
+++ b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/writer/RssShuffleWriter.java
@@ -307,7 +307,7 @@ public class RssShuffleWriter<K, V, C> extends ShuffleWriter<K, V> {
} catch (InterruptedException ie) {
LOG.warn("Ignore the InterruptedException which should be caused by internal killed");
} catch (Exception e) {
- throw new RuntimeException("Exception happened when get commit status", e);
+ throw new RssException("Exception happened when get commit status", e);
} finally {
executor.shutdown();
}
diff --git a/common/src/main/java/org/apache/uniffle/common/BufferSegment.java b/common/src/main/java/org/apache/uniffle/common/BufferSegment.java
index bf0e3026..73795b75 100644
--- a/common/src/main/java/org/apache/uniffle/common/BufferSegment.java
+++ b/common/src/main/java/org/apache/uniffle/common/BufferSegment.java
@@ -19,6 +19,8 @@ package org.apache.uniffle.common;
import java.util.Objects;
+import org.apache.uniffle.common.exception.RssException;
+
public class BufferSegment {
private long blockId;
@@ -64,7 +66,7 @@ public class BufferSegment {
public int getOffset() {
if (offset > Integer.MAX_VALUE) {
- throw new RuntimeException("Unsupported offset[" + offset + "] for BufferSegment");
+ throw new RssException("Unsupported offset[" + offset + "] for BufferSegment");
}
return (int) offset;
}
diff --git a/common/src/main/java/org/apache/uniffle/common/exception/RssException.java b/common/src/main/java/org/apache/uniffle/common/exception/RssException.java
index c48aac2d..aedbe88e 100644
--- a/common/src/main/java/org/apache/uniffle/common/exception/RssException.java
+++ b/common/src/main/java/org/apache/uniffle/common/exception/RssException.java
@@ -23,6 +23,10 @@ public class RssException extends RuntimeException {
super(message);
}
+ public RssException(Throwable e) {
+ super(e);
+ }
+
public RssException(String message, Throwable e) {
super(message, e);
}
diff --git a/common/src/main/java/org/apache/uniffle/common/metrics/prometheus/PrometheusPushGatewayMetricReporter.java b/common/src/main/java/org/apache/uniffle/common/metrics/prometheus/PrometheusPushGatewayMetricReporter.java
index 8795d5b6..43aaf03b 100644
--- a/common/src/main/java/org/apache/uniffle/common/metrics/prometheus/PrometheusPushGatewayMetricReporter.java
+++ b/common/src/main/java/org/apache/uniffle/common/metrics/prometheus/PrometheusPushGatewayMetricReporter.java
@@ -31,6 +31,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.uniffle.common.config.RssConf;
+import org.apache.uniffle.common.exception.RssException;
import org.apache.uniffle.common.metrics.AbstractMetricReporter;
import org.apache.uniffle.common.util.ThreadUtils;
@@ -52,13 +53,13 @@ public class PrometheusPushGatewayMetricReporter extends AbstractMetricReporter
if (pushGateway == null) {
String address = conf.getString(PUSHGATEWAY_ADDR, null);
if (StringUtils.isEmpty(address)) {
- throw new RuntimeException(PUSHGATEWAY_ADDR + " should not be empty!");
+ throw new RssException(PUSHGATEWAY_ADDR + " should not be empty!");
}
pushGateway = new PushGateway(address);
}
String jobName = conf.getString(JOB_NAME, null);
if (StringUtils.isEmpty(jobName)) {
- throw new RuntimeException(JOB_NAME + " should not be empty!");
+ throw new RssException(JOB_NAME + " should not be empty!");
}
Map<String, String> groupingKey = parseGroupingKey(conf.getString(GROUPING_KEY, ""));
groupingKey.put("instance", instanceId);
diff --git a/common/src/main/java/org/apache/uniffle/common/security/SecurityContextFactory.java b/common/src/main/java/org/apache/uniffle/common/security/SecurityContextFactory.java
index 47c743c9..94b23878 100644
--- a/common/src/main/java/org/apache/uniffle/common/security/SecurityContextFactory.java
+++ b/common/src/main/java/org/apache/uniffle/common/security/SecurityContextFactory.java
@@ -20,6 +20,8 @@ package org.apache.uniffle.common.security;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.uniffle.common.exception.RssException;
+
public class SecurityContextFactory {
private static final Logger LOGGER = LoggerFactory.getLogger(SecurityContextFactory.class);
@@ -50,7 +52,7 @@ public class SecurityContextFactory {
public SecurityContext getSecurityContext() {
if (securityContext == null) {
- throw new RuntimeException("No initialized security context.");
+ throw new RssException("No initialized security context.");
}
return securityContext;
}
diff --git a/common/src/main/java/org/apache/uniffle/common/util/RssUtils.java b/common/src/main/java/org/apache/uniffle/common/util/RssUtils.java
index f391a33e..dfae9e23 100644
--- a/common/src/main/java/org/apache/uniffle/common/util/RssUtils.java
+++ b/common/src/main/java/org/apache/uniffle/common/util/RssUtils.java
@@ -112,7 +112,7 @@ public class RssUtils {
String ip = System.getenv("RSS_IP");
if (ip != null) {
if (!InetAddresses.isInetAddress(ip)) {
- throw new RuntimeException("Environment RSS_IP: " + ip + " is wrong format");
+ throw new RssException("Environment RSS_IP: " + ip + " is wrong format");
}
return ip;
}
@@ -159,7 +159,7 @@ public class RssUtils {
public static byte[] serializeBitMap(Roaring64NavigableMap bitmap) throws IOException {
long size = bitmap.serializedSizeInBytes();
if (size > Integer.MAX_VALUE) {
- throw new RuntimeException("Unsupported serialized size of bitmap: " + size);
+ throw new RssException("Unsupported serialized size of bitmap: " + size);
}
ByteArrayOutputStream arrayOutputStream = new ByteArrayOutputStream((int) size);
DataOutputStream dataOutputStream = new DataOutputStream(arrayOutputStream);
@@ -194,7 +194,7 @@ public class RssUtils {
@SuppressWarnings("unchecked")
public static <T> List<T> loadExtensions(Class<T> extClass, List<String> classes, Object obj) {
if (classes == null || classes.isEmpty()) {
- throw new RuntimeException("Empty classes");
+ throw new RssException("Empty classes");
}
List<T> extensions = Lists.newArrayList();
@@ -203,7 +203,7 @@ public class RssUtils {
try {
Class<?> klass = Class.forName(name);
if (!extClass.isAssignableFrom(klass)) {
- throw new RuntimeException(name + " is not subclass of " + extClass.getName());
+ throw new RssException(name + " is not subclass of " + extClass.getName());
}
Constructor<?> constructor;
@@ -218,7 +218,7 @@ public class RssUtils {
extensions.add(instance);
} catch (Exception e) {
LOGGER.error("Fail to new instance using default constructor.", e);
- throw new RuntimeException(e);
+ throw new RssException(e);
}
}
return extensions;
@@ -226,10 +226,10 @@ public class RssUtils {
public static void checkQuorumSetting(int replica, int replicaWrite, int replicaRead) {
if (replica < 1 || replicaWrite > replica || replicaRead > replica) {
- throw new RuntimeException("Replica config is invalid, recommend replica.write + replica.read > replica");
+ throw new RssException("Replica config is invalid, recommend replica.write + replica.read > replica");
}
if (replicaWrite + replicaRead <= replica) {
- throw new RuntimeException("Replica config is unsafe, recommend replica.write + replica.read > replica");
+ throw new RssException("Replica config is unsafe, recommend replica.write + replica.read > replica");
}
}
diff --git a/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorServer.java b/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorServer.java
index 686f0596..7016c692 100644
--- a/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorServer.java
+++ b/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorServer.java
@@ -26,6 +26,7 @@ import picocli.CommandLine;
import org.apache.uniffle.common.Arguments;
import org.apache.uniffle.common.config.ReconfigurableBase;
import org.apache.uniffle.common.config.RssConf;
+import org.apache.uniffle.common.exception.RssException;
import org.apache.uniffle.common.metrics.GRPCMetrics;
import org.apache.uniffle.common.metrics.JvmMetrics;
import org.apache.uniffle.common.metrics.MetricReporter;
@@ -147,7 +148,7 @@ public class CoordinatorServer extends ReconfigurableBase {
private void initialization() throws Exception {
String ip = RssUtils.getHostIp();
if (ip == null) {
- throw new RuntimeException("Couldn't acquire host Ip");
+ throw new RssException("Couldn't acquire host Ip");
}
int port = coordinatorConf.getInteger(CoordinatorConf.RPC_SERVER_PORT);
id = ip + "-" + port;
diff --git a/coordinator/src/main/java/org/apache/uniffle/coordinator/SimpleClusterManager.java b/coordinator/src/main/java/org/apache/uniffle/coordinator/SimpleClusterManager.java
index 700e06f1..4e5e4597 100644
--- a/coordinator/src/main/java/org/apache/uniffle/coordinator/SimpleClusterManager.java
+++ b/coordinator/src/main/java/org/apache/uniffle/coordinator/SimpleClusterManager.java
@@ -53,6 +53,7 @@ import org.apache.uniffle.client.request.RssDecommissionRequest;
import org.apache.uniffle.common.ServerStatus;
import org.apache.uniffle.common.config.RssConf;
import org.apache.uniffle.common.exception.InvalidRequestException;
+import org.apache.uniffle.common.exception.RssException;
import org.apache.uniffle.common.filesystem.HadoopFilesystemProvider;
import org.apache.uniffle.common.util.ThreadUtils;
import org.apache.uniffle.coordinator.metric.CoordinatorMetrics;
@@ -289,7 +290,7 @@ public class SimpleClusterManager implements ClusterManager {
return clientCache.get(serverNode,
() -> new ShuffleServerInternalGrpcClient(serverNode.getIp(), serverNode.getGrpcPort()));
} catch (ExecutionException e) {
- throw new RuntimeException(e);
+ throw new RssException(e);
}
}
diff --git a/coordinator/src/main/java/org/apache/uniffle/coordinator/access/checker/AccessCandidatesChecker.java b/coordinator/src/main/java/org/apache/uniffle/coordinator/access/checker/AccessCandidatesChecker.java
index 76f091cd..6b1f6d2a 100644
--- a/coordinator/src/main/java/org/apache/uniffle/coordinator/access/checker/AccessCandidatesChecker.java
+++ b/coordinator/src/main/java/org/apache/uniffle/coordinator/access/checker/AccessCandidatesChecker.java
@@ -38,6 +38,7 @@ import org.apache.hadoop.fs.Path;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.uniffle.common.exception.RssException;
import org.apache.uniffle.common.filesystem.HadoopFilesystemProvider;
import org.apache.uniffle.common.util.Constants;
import org.apache.uniffle.common.util.ThreadUtils;
@@ -71,13 +72,13 @@ public class AccessCandidatesChecker extends AbstractAccessChecker {
if (!fileSystem.isFile(path)) {
String msg = String.format("Fail to init AccessCandidatesChecker, %s is not a file.", path.toUri());
LOG.error(msg);
- throw new RuntimeException(msg);
+ throw new RssException(msg);
}
updateAccessCandidatesInternal();
if (candidates.get() == null || candidates.get().isEmpty()) {
String msg = "Candidates must be non-empty and can be loaded successfully at coordinator startup.";
LOG.error(msg);
- throw new RuntimeException(msg);
+ throw new RssException(msg);
}
LOG.debug("Load candidates: {}", String.join(";", candidates.get()));
diff --git a/coordinator/src/main/java/org/apache/uniffle/coordinator/strategy/assignment/AbstractAssignmentStrategy.java b/coordinator/src/main/java/org/apache/uniffle/coordinator/strategy/assignment/AbstractAssignmentStrategy.java
index be6d371b..c3bf1010 100644
--- a/coordinator/src/main/java/org/apache/uniffle/coordinator/strategy/assignment/AbstractAssignmentStrategy.java
+++ b/coordinator/src/main/java/org/apache/uniffle/coordinator/strategy/assignment/AbstractAssignmentStrategy.java
@@ -21,6 +21,7 @@ import java.util.List;
import java.util.SortedMap;
import org.apache.uniffle.common.PartitionRange;
+import org.apache.uniffle.common.exception.RssException;
import org.apache.uniffle.coordinator.CoordinatorConf;
import org.apache.uniffle.coordinator.ServerNode;
import org.apache.uniffle.coordinator.strategy.host.BasicHostAssignmentStrategy;
@@ -53,7 +54,7 @@ public abstract class AbstractAssignmentStrategy implements AssignmentStrategy {
} else if (selectPartitionStrategyName == SelectPartitionStrategyName.CONTINUOUS) {
selectPartitionStrategy = new ContinuousSelectPartitionStrategy();
} else {
- throw new RuntimeException("Unsupported partition assignment strategy:" + selectPartitionStrategyName);
+ throw new RssException("Unsupported partition assignment strategy:" + selectPartitionStrategyName);
}
}
@@ -66,7 +67,7 @@ public abstract class AbstractAssignmentStrategy implements AssignmentStrategy {
} else if (hostAssignmentStrategyName == HostAssignmentStrategyName.NONE) {
hostAssignmentStrategy = new BasicHostAssignmentStrategy();
} else {
- throw new RuntimeException("Unsupported partition assignment strategy:" + hostAssignmentStrategyName);
+ throw new RssException("Unsupported partition assignment strategy:" + hostAssignmentStrategyName);
}
}
diff --git a/coordinator/src/main/java/org/apache/uniffle/coordinator/strategy/assignment/PartitionBalanceAssignmentStrategy.java b/coordinator/src/main/java/org/apache/uniffle/coordinator/strategy/assignment/PartitionBalanceAssignmentStrategy.java
index 0b8964a4..ce6a577e 100644
--- a/coordinator/src/main/java/org/apache/uniffle/coordinator/strategy/assignment/PartitionBalanceAssignmentStrategy.java
+++ b/coordinator/src/main/java/org/apache/uniffle/coordinator/strategy/assignment/PartitionBalanceAssignmentStrategy.java
@@ -30,6 +30,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.uniffle.common.PartitionRange;
+import org.apache.uniffle.common.exception.RssException;
import org.apache.uniffle.coordinator.ClusterManager;
import org.apache.uniffle.coordinator.CoordinatorConf;
import org.apache.uniffle.coordinator.ServerNode;
@@ -74,7 +75,7 @@ public class PartitionBalanceAssignmentStrategy extends AbstractAssignmentStrate
int estimateTaskConcurrency) {
if (partitionNumPerRange != 1) {
- throw new RuntimeException("PartitionNumPerRange must be one");
+ throw new RssException("PartitionNumPerRange must be one");
}
SortedMap<PartitionRange, List<ServerNode>> assignments;
@@ -109,7 +110,7 @@ public class PartitionBalanceAssignmentStrategy extends AbstractAssignmentStrate
});
if (nodes.isEmpty() || nodes.size() < replica) {
- throw new RuntimeException("There isn't enough shuffle servers");
+ throw new RssException("There isn't enough shuffle servers");
}
final int assignmentMaxNum = clusterManager.getShuffleNodesMax();
diff --git a/internal-client/src/main/java/org/apache/uniffle/client/factory/CoordinatorClientFactory.java b/internal-client/src/main/java/org/apache/uniffle/client/factory/CoordinatorClientFactory.java
index fab74160..68fd78c3 100644
--- a/internal-client/src/main/java/org/apache/uniffle/client/factory/CoordinatorClientFactory.java
+++ b/internal-client/src/main/java/org/apache/uniffle/client/factory/CoordinatorClientFactory.java
@@ -28,6 +28,7 @@ import org.slf4j.LoggerFactory;
import org.apache.uniffle.client.api.CoordinatorClient;
import org.apache.uniffle.client.impl.grpc.CoordinatorGrpcClient;
import org.apache.uniffle.common.ClientType;
+import org.apache.uniffle.common.exception.RssException;
public class CoordinatorClientFactory {
private static final Logger LOG = LoggerFactory.getLogger(CoordinatorClientFactory.class);
@@ -53,7 +54,7 @@ public class CoordinatorClientFactory {
if (coordinatorList.length == 0) {
String msg = "Invalid " + coordinators;
LOG.error(msg);
- throw new RuntimeException(msg);
+ throw new RssException(msg);
}
for (String coordinator: coordinatorList) {
@@ -61,7 +62,7 @@ public class CoordinatorClientFactory {
if (ipPort.length != 2) {
String msg = "Invalid coordinator format " + Arrays.toString(ipPort);
LOG.error(msg);
- throw new RuntimeException(msg);
+ throw new RssException(msg);
}
String host = ipPort[0];
diff --git a/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcClient.java b/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcClient.java
index d9208f02..f2ed1e87 100644
--- a/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcClient.java
+++ b/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcClient.java
@@ -515,7 +515,7 @@ public class ShuffleServerGrpcClient extends GrpcClient implements ShuffleServer
response = new RssGetShuffleResultResponse(StatusCode.SUCCESS,
rpcResponse.getSerializedBitmap().toByteArray());
} catch (Exception e) {
- throw new RuntimeException(e);
+ throw new RssException(e);
}
break;
default:
@@ -547,7 +547,7 @@ public class ShuffleServerGrpcClient extends GrpcClient implements ShuffleServer
response = new RssGetShuffleResultResponse(StatusCode.SUCCESS,
rpcResponse.getSerializedBitmap().toByteArray());
} catch (Exception e) {
- throw new RuntimeException(e);
+ throw new RssException(e);
}
break;
default:
diff --git a/server/src/main/java/org/apache/uniffle/server/ShuffleServer.java b/server/src/main/java/org/apache/uniffle/server/ShuffleServer.java
index 263884c1..43e4feab 100644
--- a/server/src/main/java/org/apache/uniffle/server/ShuffleServer.java
+++ b/server/src/main/java/org/apache/uniffle/server/ShuffleServer.java
@@ -38,6 +38,7 @@ import org.apache.uniffle.common.Arguments;
import org.apache.uniffle.common.ClientType;
import org.apache.uniffle.common.ServerStatus;
import org.apache.uniffle.common.exception.InvalidRequestException;
+import org.apache.uniffle.common.exception.RssException;
import org.apache.uniffle.common.metrics.GRPCMetrics;
import org.apache.uniffle.common.metrics.JvmMetrics;
import org.apache.uniffle.common.metrics.MetricReporter;
@@ -191,7 +192,7 @@ public class ShuffleServer {
}
ip = RssUtils.getHostIp();
if (ip == null) {
- throw new RuntimeException("Couldn't acquire host Ip");
+ throw new RssException("Couldn't acquire host Ip");
}
grpcPort = shuffleServerConf.getInteger(ShuffleServerConf.RPC_SERVER_PORT);
nettyPort = shuffleServerConf.getInteger(ShuffleServerConf.NETTY_SERVER_PORT);
diff --git a/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java b/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java
index c7a964fd..6718062e 100644
--- a/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java
+++ b/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java
@@ -52,6 +52,7 @@ import org.apache.uniffle.common.ShufflePartitionedBlock;
import org.apache.uniffle.common.ShufflePartitionedData;
import org.apache.uniffle.common.config.RssBaseConf;
import org.apache.uniffle.common.exception.FileNotFoundException;
+import org.apache.uniffle.common.exception.RssException;
import org.apache.uniffle.common.rpc.StatusCode;
import org.apache.uniffle.common.util.Constants;
import org.apache.uniffle.common.util.RssUtils;
@@ -237,7 +238,7 @@ public class ShuffleTaskManager {
synchronized (lock) {
long commitTimeout = conf.get(ShuffleServerConf.SERVER_COMMIT_TIMEOUT);
if (System.currentTimeMillis() - start > commitTimeout) {
- throw new RuntimeException("Shuffle data commit timeout for " + commitTimeout + " ms");
+ throw new RssException("Shuffle data commit timeout for " + commitTimeout + " ms");
}
synchronized (cachedBlockIds) {
cloneBlockIds = RssUtils.cloneBitMap(cachedBlockIds);
@@ -258,7 +259,7 @@ public class ShuffleTaskManager {
}
Thread.sleep(checkInterval);
if (System.currentTimeMillis() - start > commitTimeout) {
- throw new RuntimeException("Shuffle data commit timeout for " + commitTimeout + " ms");
+ throw new RssException("Shuffle data commit timeout for " + commitTimeout + " ms");
}
LOG.info("Checking commit result for appId[" + appId + "], shuffleId[" + shuffleId
+ "], expect committed[" + expectedCommitted
@@ -277,7 +278,7 @@ public class ShuffleTaskManager {
refreshAppId(appId);
Map<Integer, Roaring64NavigableMap[]> shuffleIdToPartitions = partitionsToBlockIds.get(appId);
if (shuffleIdToPartitions == null) {
- throw new RuntimeException("appId[" + appId + "] is expired!");
+ throw new RssException("appId[" + appId + "] is expired!");
}
if (!shuffleIdToPartitions.containsKey(shuffleId)) {
Roaring64NavigableMap[] blockIds = new Roaring64NavigableMap[bitmapNum];
diff --git a/server/src/main/java/org/apache/uniffle/server/storage/AbstractStorageManagerFallbackStrategy.java b/server/src/main/java/org/apache/uniffle/server/storage/AbstractStorageManagerFallbackStrategy.java
index e07080bb..9aef86b2 100644
--- a/server/src/main/java/org/apache/uniffle/server/storage/AbstractStorageManagerFallbackStrategy.java
+++ b/server/src/main/java/org/apache/uniffle/server/storage/AbstractStorageManagerFallbackStrategy.java
@@ -22,6 +22,7 @@ import java.util.Set;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.uniffle.common.exception.RssException;
import org.apache.uniffle.server.ShuffleDataFlushEvent;
import org.apache.uniffle.server.ShuffleServerConf;
@@ -47,7 +48,7 @@ public abstract class AbstractStorageManagerFallbackStrategy {
}
}
if (nextIdx == -1) {
- throw new RuntimeException("Current StorageManager is not in candidates");
+ throw new RssException("Current StorageManager is not in candidates");
}
for (int i = 0; i < candidates.length - 1; i++) {
StorageManager storageManager = candidates[(i + nextIdx) % candidates.length];
diff --git a/server/src/main/java/org/apache/uniffle/server/storage/HdfsStorageManager.java b/server/src/main/java/org/apache/uniffle/server/storage/HdfsStorageManager.java
index 1532ff2b..eefc0e5a 100644
--- a/server/src/main/java/org/apache/uniffle/server/storage/HdfsStorageManager.java
+++ b/server/src/main/java/org/apache/uniffle/server/storage/HdfsStorageManager.java
@@ -32,6 +32,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.uniffle.common.RemoteStorageInfo;
+import org.apache.uniffle.common.exception.RssException;
import org.apache.uniffle.common.filesystem.HadoopFilesystemProvider;
import org.apache.uniffle.common.storage.StorageInfo;
import org.apache.uniffle.common.util.Constants;
@@ -114,7 +115,7 @@ public class HdfsStorageManager extends SingleStorageManager {
@Override
public Checker getStorageChecker() {
- throw new RuntimeException("Not support storage checker");
+ throw new RssException("Not support storage checker");
}
@Override
diff --git a/server/src/main/java/org/apache/uniffle/server/storage/LocalStorageManager.java b/server/src/main/java/org/apache/uniffle/server/storage/LocalStorageManager.java
index a8403d0b..84ccf173 100644
--- a/server/src/main/java/org/apache/uniffle/server/storage/LocalStorageManager.java
+++ b/server/src/main/java/org/apache/uniffle/server/storage/LocalStorageManager.java
@@ -47,6 +47,7 @@ import org.slf4j.LoggerFactory;
import org.apache.uniffle.common.RemoteStorageInfo;
import org.apache.uniffle.common.UnionKey;
+import org.apache.uniffle.common.exception.RssException;
import org.apache.uniffle.common.storage.StorageInfo;
import org.apache.uniffle.common.storage.StorageMedia;
import org.apache.uniffle.common.storage.StorageStatus;
@@ -141,7 +142,7 @@ public class LocalStorageManager extends SingleStorageManager {
int failedCount = storageBasePaths.size() - successCount.get();
long maxFailedNumber = conf.getLong(LOCAL_STORAGE_INITIALIZE_MAX_FAIL_NUMBER);
if (failedCount > maxFailedNumber || successCount.get() == 0) {
- throw new RuntimeException(
+ throw new RssException(
String.format("Initialize %s local storage(s) failed, "
+ "specified local storage paths size: %s, the conf of %s size: %s",
failedCount, localStorageArray.length, LOCAL_STORAGE_INITIALIZE_MAX_FAIL_NUMBER.key(), maxFailedNumber)
diff --git a/server/src/main/java/org/apache/uniffle/server/storage/MultiStorageManager.java b/server/src/main/java/org/apache/uniffle/server/storage/MultiStorageManager.java
index a9c4af78..615d0d09 100644
--- a/server/src/main/java/org/apache/uniffle/server/storage/MultiStorageManager.java
+++ b/server/src/main/java/org/apache/uniffle/server/storage/MultiStorageManager.java
@@ -28,6 +28,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.uniffle.common.RemoteStorageInfo;
+import org.apache.uniffle.common.exception.RssException;
import org.apache.uniffle.common.storage.StorageInfo;
import org.apache.uniffle.server.Checker;
import org.apache.uniffle.server.ShuffleDataFlushEvent;
@@ -60,7 +61,7 @@ public class MultiStorageManager implements StorageManager {
coldStorageManager
);
} catch (Exception e) {
- throw new RuntimeException("Errors on loading selector manager.", e);
+ throw new RssException("Errors on loading selector manager.", e);
}
long cacheTimeout = conf.getLong(ShuffleServerConf.STORAGEMANAGER_CACHE_TIMEOUT);
diff --git a/spotbugs-exclude.xml b/spotbugs-exclude.xml
index 01e57ee3..9231414f 100644
--- a/spotbugs-exclude.xml
+++ b/spotbugs-exclude.xml
@@ -20,6 +20,6 @@
<Package name="~org\.apache\.uniffle\.proto.*"/>
</Match>
<Match>
- <Bug pattern="MS_EXPOSE_REP,EI_EXPOSE_REP,EI_EXPOSE_REP2,EI_EXPOSE_STATIC_REP2,THROWS_METHOD_THROWS_RUNTIMEEXCEPTION,THROWS_METHOD_THROWS_CLAUSE_BASIC_EXCEPTION,THROWS_METHOD_THROWS_CLAUSE_THROWABLE,MS_PKGPROTECT,MS_CANNOT_BE_FINAL,SE_BAD_FIELD,SWL_SLEEP_WITH_LOCK_HELD,IS2_INCONSISTENT_SYNC" />
+ <Bug pattern="MS_EXPOSE_REP,EI_EXPOSE_REP,EI_EXPOSE_REP2,EI_EXPOSE_STATIC_REP2,THROWS_METHOD_THROWS_CLAUSE_BASIC_EXCEPTION,THROWS_METHOD_THROWS_CLAUSE_THROWABLE,MS_PKGPROTECT,MS_CANNOT_BE_FINAL,SE_BAD_FIELD,SWL_SLEEP_WITH_LOCK_HELD,IS2_INCONSISTENT_SYNC" />
</Match>
</FindBugsFilter>
diff --git a/storage/src/main/java/org/apache/uniffle/storage/common/HdfsStorage.java b/storage/src/main/java/org/apache/uniffle/storage/common/HdfsStorage.java
index 42c93c03..4b023d1e 100644
--- a/storage/src/main/java/org/apache/uniffle/storage/common/HdfsStorage.java
+++ b/storage/src/main/java/org/apache/uniffle/storage/common/HdfsStorage.java
@@ -24,6 +24,7 @@ import org.apache.hadoop.conf.Configuration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.uniffle.common.exception.RssException;
import org.apache.uniffle.storage.handler.api.ServerReadHandler;
import org.apache.uniffle.storage.handler.api.ShuffleWriteHandler;
import org.apache.uniffle.storage.handler.impl.HdfsShuffleWriteHandler;
@@ -115,13 +116,13 @@ public class HdfsStorage extends AbstractStorage {
);
}
} catch (Exception e) {
- throw new RuntimeException(e);
+ throw new RssException(e);
}
}
@Override
protected ServerReadHandler newReadHandler(CreateShuffleReadHandlerRequest request) {
- throw new RuntimeException("Hdfs storage don't support to read from sever");
+ throw new RssException("Hdfs storage don't support to read from sever");
}
@Override
diff --git a/storage/src/main/java/org/apache/uniffle/storage/common/LocalStorage.java b/storage/src/main/java/org/apache/uniffle/storage/common/LocalStorage.java
index 90a147f0..b5ca09e8 100644
--- a/storage/src/main/java/org/apache/uniffle/storage/common/LocalStorage.java
+++ b/storage/src/main/java/org/apache/uniffle/storage/common/LocalStorage.java
@@ -32,6 +32,7 @@ import org.roaringbitmap.RoaringBitmap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.uniffle.common.exception.RssException;
import org.apache.uniffle.common.storage.StorageMedia;
import org.apache.uniffle.common.util.RssUtils;
import org.apache.uniffle.storage.handler.api.ServerReadHandler;
@@ -74,7 +75,7 @@ public class LocalStorage extends AbstractStorage {
this.mountPoint = store.name();
} catch (IOException ioe) {
LOG.warn("Init base directory " + basePath + " fail, the disk should be corrupted", ioe);
- throw new RuntimeException(ioe);
+ throw new RssException(ioe);
}
if (capacity < 0L) {
long totalSpace = baseFolder.getTotalSpace();
diff --git a/storage/src/main/java/org/apache/uniffle/storage/factory/ShuffleHandlerFactory.java b/storage/src/main/java/org/apache/uniffle/storage/factory/ShuffleHandlerFactory.java
index a588809e..08dd548c 100644
--- a/storage/src/main/java/org/apache/uniffle/storage/factory/ShuffleHandlerFactory.java
+++ b/storage/src/main/java/org/apache/uniffle/storage/factory/ShuffleHandlerFactory.java
@@ -61,7 +61,7 @@ public class ShuffleHandlerFactory {
public ClientReadHandler createShuffleReadHandler(CreateShuffleReadHandlerRequest request) {
if (CollectionUtils.isEmpty(request.getShuffleServerInfoList())) {
- throw new RuntimeException("Shuffle servers should not be empty!");
+ throw new RssException("Shuffle servers should not be empty!");
}
if (request.getShuffleServerInfoList().size() > 1) {
List<ClientReadHandler> handlers = Lists.newArrayList();
diff --git a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/HdfsClientReadHandler.java b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/HdfsClientReadHandler.java
index 8c76b368..a8ac6425 100644
--- a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/HdfsClientReadHandler.java
+++ b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/HdfsClientReadHandler.java
@@ -33,6 +33,7 @@ import org.slf4j.LoggerFactory;
import org.apache.uniffle.common.ShuffleDataDistributionType;
import org.apache.uniffle.common.ShuffleDataResult;
+import org.apache.uniffle.common.exception.RssException;
import org.apache.uniffle.common.filesystem.HadoopFilesystemProvider;
import org.apache.uniffle.common.util.Constants;
import org.apache.uniffle.storage.util.ShuffleStorageUtils;
@@ -109,7 +110,7 @@ public class HdfsClientReadHandler extends AbstractClientReadHandler {
try {
fs = HadoopFilesystemProvider.getFilesystem(baseFolder, hadoopConf);
} catch (Exception ioe) {
- throw new RuntimeException("Can't get FileSystem for " + baseFolder);
+ throw new RssException("Can't get FileSystem for " + baseFolder);
}
FileStatus[] indexFiles = null;
diff --git a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/HdfsShuffleWriteHandler.java b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/HdfsShuffleWriteHandler.java
index dc4b94f0..da701702 100644
--- a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/HdfsShuffleWriteHandler.java
+++ b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/HdfsShuffleWriteHandler.java
@@ -30,6 +30,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.uniffle.common.ShufflePartitionedBlock;
+import org.apache.uniffle.common.exception.RssException;
import org.apache.uniffle.common.filesystem.HadoopFilesystemProvider;
import org.apache.uniffle.storage.common.FileBasedShuffleSegment;
import org.apache.uniffle.storage.handler.api.ShuffleWriteHandler;
@@ -130,7 +131,7 @@ public class HdfsShuffleWriteHandler implements ShuffleWriteHandler {
} catch (IOException e) {
LOG.warn("Write failed with " + shuffleBlocks.size() + " blocks for " + fileNamePrefix + "_" + failTimes, e);
failTimes++;
- throw new RuntimeException(e);
+ throw new RssException(e);
}
} finally {
writeLock.unlock();
diff --git a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/LocalFileReader.java b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/LocalFileReader.java
index 52d0c8bb..99eb0cd0 100644
--- a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/LocalFileReader.java
+++ b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/LocalFileReader.java
@@ -26,6 +26,7 @@ import org.apache.commons.io.IOUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.uniffle.common.exception.RssException;
import org.apache.uniffle.storage.api.FileReader;
public class LocalFileReader implements FileReader, Closeable {
@@ -49,7 +50,7 @@ public class LocalFileReader implements FileReader, Closeable {
while (targetSkip > 0) {
long realSkip = dataInputStream.skip(targetSkip);
if (realSkip == -1) {
- throw new RuntimeException("Unexpected EOF when skip bytes");
+ throw new RssException("Unexpected EOF when skip bytes");
}
targetSkip -= realSkip;
if (targetSkip > 0) {
diff --git a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/LocalFileServerReadHandler.java b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/LocalFileServerReadHandler.java
index b786cfc7..5df89b9f 100644
--- a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/LocalFileServerReadHandler.java
+++ b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/LocalFileServerReadHandler.java
@@ -26,6 +26,7 @@ import org.slf4j.LoggerFactory;
import org.apache.uniffle.common.ShuffleDataResult;
import org.apache.uniffle.common.ShuffleIndexResult;
import org.apache.uniffle.common.exception.FileNotFoundException;
+import org.apache.uniffle.common.exception.RssException;
import org.apache.uniffle.common.util.Constants;
import org.apache.uniffle.storage.common.FileBasedShuffleSegment;
import org.apache.uniffle.storage.handler.api.ServerReadHandler;
@@ -99,7 +100,7 @@ public class LocalFileServerReadHandler implements ServerReadHandler {
if (indexFiles != null && indexFiles.length > 0) {
if (indexFiles.length != 1) {
- throw new RuntimeException("More index file than expected: " + indexFiles.length);
+ throw new RssException("More index file than expected: " + indexFiles.length);
}
String fileNamePrefix = getFileNamePrefix(indexFiles[0].getName());
indexFileName = fullShufflePath + "/" + ShuffleStorageUtils.generateIndexFileName(fileNamePrefix);
diff --git a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/LocalFileWriteHandler.java b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/LocalFileWriteHandler.java
index 1eb1a885..00e43927 100644
--- a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/LocalFileWriteHandler.java
+++ b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/LocalFileWriteHandler.java
@@ -71,7 +71,7 @@ public class LocalFileWriteHandler implements ShuffleWriteHandler {
int shuffleId,
int startPartition) {
if (storageBasePaths == null || storageBasePaths.length == 0) {
- throw new RuntimeException("Base path can't be empty, please check rss.storage.localFile.basePaths");
+ throw new RssException("Base path can't be empty, please check rss.storage.localFile.basePaths");
}
int index = ShuffleStorageUtils.getStorageIndex(
storageBasePaths.length,
diff --git a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/PooledHdfsShuffleWriteHandler.java b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/PooledHdfsShuffleWriteHandler.java
index d225448d..915285cf 100644
--- a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/PooledHdfsShuffleWriteHandler.java
+++ b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/PooledHdfsShuffleWriteHandler.java
@@ -27,6 +27,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.uniffle.common.ShufflePartitionedBlock;
+import org.apache.uniffle.common.exception.RssException;
import org.apache.uniffle.storage.handler.api.ShuffleWriteHandler;
import org.apache.uniffle.storage.util.ShuffleStorageUtils;
@@ -88,7 +89,7 @@ public class PooledHdfsShuffleWriteHandler implements ShuffleWriteHandler {
);
}
} catch (Exception e) {
- throw new RuntimeException("Errors on initializing Hdfs writer handler.", e);
+ throw new RssException("Errors on initializing Hdfs writer handler.", e);
}
}
diff --git a/storage/src/main/java/org/apache/uniffle/storage/util/ShuffleStorageUtils.java b/storage/src/main/java/org/apache/uniffle/storage/util/ShuffleStorageUtils.java
index a34daf76..4b19408f 100644
--- a/storage/src/main/java/org/apache/uniffle/storage/util/ShuffleStorageUtils.java
+++ b/storage/src/main/java/org/apache/uniffle/storage/util/ShuffleStorageUtils.java
@@ -33,6 +33,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.uniffle.common.BufferSegment;
+import org.apache.uniffle.common.exception.RssException;
import org.apache.uniffle.common.util.Constants;
import org.apache.uniffle.storage.common.FileBasedShuffleSegment;
import org.apache.uniffle.storage.handler.impl.DataFileSegment;
@@ -160,7 +161,7 @@ public class ShuffleStorageUtils {
return getShuffleDataPath(appId, shuffleId, start, end);
}
}
- throw new RuntimeException("Can't generate ShuffleData Path for appId[" + appId + "], shuffleId["
+ throw new RssException("Can't generate ShuffleData Path for appId[" + appId + "], shuffleId["
+ shuffleId + "], partitionId[" + partitionId + "], partitionNumPerRange[" + partitionNumPerRange
+ "], partitionNum[" + partitionNum + "]");
}