You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uniffle.apache.org by ro...@apache.org on 2023/05/28 09:25:51 UTC
[incubator-uniffle] branch master updated: [#417] refactor: Eliminate raw use of parameterized class (#891)
This is an automated email from the ASF dual-hosted git repository.
roryqi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git
The following commit(s) were added to refs/heads/master by this push:
new 753f426c [#417] refactor: Eliminate raw use of parameterized class (#891)
753f426c is described below
commit 753f426c6b4b8225d4ebd60a001d8883a55f4093
Author: Neo Chien <cc...@cs.ccu.edu.tw>
AuthorDate: Sun May 28 17:25:46 2023 +0800
[#417] refactor: Eliminate raw use of parameterized class (#891)
### What changes were proposed in this pull request?
Eliminate `raw use of parameterized class`
### Why are the changes needed?
Fix: #417
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
current UT
---
.../java/org/apache/uniffle/client/util/ClientUtils.java | 1 +
.../java/org/apache/uniffle/common/util/JavaUtils.java | 4 ++--
.../java/org/apache/uniffle/common/util/RetryUtils.java | 8 ++++----
.../java/org/apache/uniffle/common/KerberizedHadoop.java | 4 ++--
.../uniffle/coordinator/web/servlet/BaseServlet.java | 12 ++++++------
.../coordinator/web/servlet/CancelDecommissionServlet.java | 4 ++--
.../coordinator/web/servlet/DecommissionServlet.java | 4 ++--
.../uniffle/coordinator/web/servlet/NodesServlet.java | 4 ++--
.../java/org/apache/uniffle/test/CoordinatorGrpcTest.java | 3 ++-
.../src/test/java/org/apache/uniffle/test/ServletTest.java | 14 +++++++-------
.../handler/impl/PooledHadoopShuffleWriteHandlerTest.java | 6 +++---
11 files changed, 33 insertions(+), 31 deletions(-)
diff --git a/client/src/main/java/org/apache/uniffle/client/util/ClientUtils.java b/client/src/main/java/org/apache/uniffle/client/util/ClientUtils.java
index 7787c957..75c0cae1 100644
--- a/client/src/main/java/org/apache/uniffle/client/util/ClientUtils.java
+++ b/client/src/main/java/org/apache/uniffle/client/util/ClientUtils.java
@@ -81,6 +81,7 @@ public class ClientUtils {
|| StorageType.LOCALFILE_HDFS.name().equals(storageType);
}
+ @SuppressWarnings("rawtypes")
public static boolean waitUntilDoneOrFail(List<CompletableFuture<Boolean>> futures, boolean allowFastFail) {
int expected = futures.size();
int failed = 0;
diff --git a/common/src/main/java/org/apache/uniffle/common/util/JavaUtils.java b/common/src/main/java/org/apache/uniffle/common/util/JavaUtils.java
index ebbe34c5..ba28aedc 100644
--- a/common/src/main/java/org/apache/uniffle/common/util/JavaUtils.java
+++ b/common/src/main/java/org/apache/uniffle/common/util/JavaUtils.java
@@ -43,9 +43,9 @@ public class JavaUtils {
public static <K, V> ConcurrentHashMap<K, V> newConcurrentMap() {
if (SystemUtils.isJavaVersionAtLeast(JavaVersion.JAVA_9)) {
- return new ConcurrentHashMap();
+ return new ConcurrentHashMap<>();
} else {
- return new ConcurrentHashMapForJDK8();
+ return new ConcurrentHashMapForJDK8<>();
}
}
diff --git a/common/src/main/java/org/apache/uniffle/common/util/RetryUtils.java b/common/src/main/java/org/apache/uniffle/common/util/RetryUtils.java
index 6a4c1ae9..4610338b 100644
--- a/common/src/main/java/org/apache/uniffle/common/util/RetryUtils.java
+++ b/common/src/main/java/org/apache/uniffle/common/util/RetryUtils.java
@@ -32,7 +32,7 @@ public class RetryUtils {
}
public static <T> T retry(RetryCmd<T> cmd, long intervalMs, int retryTimes,
- Set<Class> exceptionClasses) throws Throwable {
+ Set<Class<? extends Throwable>> exceptionClasses) throws Throwable {
return retry(cmd, null, intervalMs, retryTimes, exceptionClasses);
}
@@ -47,7 +47,7 @@ public class RetryUtils {
* @throws Throwable
*/
public static <T> T retry(RetryCmd<T> cmd, RetryCallBack callBack, long intervalMs,
- int retryTimes, Set<Class> exceptionClasses) throws Throwable {
+ int retryTimes, Set<Class<? extends Throwable>> exceptionClasses) throws Throwable {
int retry = 0;
while (true) {
try {
@@ -69,8 +69,8 @@ public class RetryUtils {
}
}
- private static boolean isInstanceOf(Set<Class> classes, Throwable t) {
- for (Class c : classes) {
+ private static boolean isInstanceOf(Set<Class<? extends Throwable>> classes, Throwable t) {
+ for (Class<? extends Throwable> c : classes) {
if (c.isInstance(t)) {
return true;
}
diff --git a/common/src/test/java/org/apache/uniffle/common/KerberizedHadoop.java b/common/src/test/java/org/apache/uniffle/common/KerberizedHadoop.java
index 935f17dc..19364bfb 100644
--- a/common/src/test/java/org/apache/uniffle/common/KerberizedHadoop.java
+++ b/common/src/test/java/org/apache/uniffle/common/KerberizedHadoop.java
@@ -80,7 +80,7 @@ public class KerberizedHadoop implements Serializable {
private MiniDFSCluster kerberizedDfsCluster;
- private Class testRunnerCls = KerberizedHadoop.class;
+ private Class<?> testRunnerCls = KerberizedHadoop.class;
// The superuser for accessing HDFS
private String hdfsKeytab;
@@ -301,7 +301,7 @@ public class KerberizedHadoop implements Serializable {
* Should be invoked by extending class to solve the NPE.
* refer to: https://github.com/apache/hbase/pull/1207
*/
- public void setTestRunner(Class cls) {
+ public void setTestRunner(Class<?> cls) {
this.testRunnerCls = cls;
}
diff --git a/coordinator/src/main/java/org/apache/uniffle/coordinator/web/servlet/BaseServlet.java b/coordinator/src/main/java/org/apache/uniffle/coordinator/web/servlet/BaseServlet.java
index a67701f2..99948dae 100644
--- a/coordinator/src/main/java/org/apache/uniffle/coordinator/web/servlet/BaseServlet.java
+++ b/coordinator/src/main/java/org/apache/uniffle/coordinator/web/servlet/BaseServlet.java
@@ -30,7 +30,7 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.uniffle.coordinator.web.Response;
-public abstract class BaseServlet extends HttpServlet {
+public abstract class BaseServlet<T> extends HttpServlet {
public static final String JSON_MIME_TYPE = "application/json";
final ObjectMapper mapper = new ObjectMapper().setSerializationInclusion(JsonInclude.Include.NON_NULL);
@@ -44,9 +44,9 @@ public abstract class BaseServlet extends HttpServlet {
writeJSON(resp, handlerRequest(() -> handlePost(req, resp)));
}
- private Response handlerRequest(
- Callable<Response> function) {
- Response response;
+ private Response<T> handlerRequest(
+ Callable<Response<T>> function) {
+ Response<T> response;
try {
// todo: Do something for authentication
response = function.call();
@@ -56,13 +56,13 @@ public abstract class BaseServlet extends HttpServlet {
return response;
}
- protected Response handleGet(
+ protected Response<T> handleGet(
HttpServletRequest req,
HttpServletResponse resp) throws ServletException, IOException {
throw new IOException("Method not support!");
}
- protected Response handlePost(
+ protected Response<T> handlePost(
HttpServletRequest req,
HttpServletResponse resp) throws ServletException, IOException {
throw new IOException("Method not support!");
diff --git a/coordinator/src/main/java/org/apache/uniffle/coordinator/web/servlet/CancelDecommissionServlet.java b/coordinator/src/main/java/org/apache/uniffle/coordinator/web/servlet/CancelDecommissionServlet.java
index b7411d4e..24c77f8c 100644
--- a/coordinator/src/main/java/org/apache/uniffle/coordinator/web/servlet/CancelDecommissionServlet.java
+++ b/coordinator/src/main/java/org/apache/uniffle/coordinator/web/servlet/CancelDecommissionServlet.java
@@ -28,7 +28,7 @@ import org.apache.uniffle.coordinator.CoordinatorServer;
import org.apache.uniffle.coordinator.web.Response;
import org.apache.uniffle.coordinator.web.request.CancelDecommissionRequest;
-public class CancelDecommissionServlet extends BaseServlet {
+public class CancelDecommissionServlet extends BaseServlet<Object> {
private final CoordinatorServer coordinator;
public CancelDecommissionServlet(CoordinatorServer coordinator) {
@@ -36,7 +36,7 @@ public class CancelDecommissionServlet extends BaseServlet {
}
@Override
- protected Response handlePost(HttpServletRequest req, HttpServletResponse resp) throws IOException {
+ protected Response<Object> handlePost(HttpServletRequest req, HttpServletResponse resp) throws IOException {
CancelDecommissionRequest params = parseParamsFromJson(req, CancelDecommissionRequest.class);
if (CollectionUtils.isEmpty(params.getServerIds())) {
return Response.fail("Parameter[serverIds] should not be null!");
diff --git a/coordinator/src/main/java/org/apache/uniffle/coordinator/web/servlet/DecommissionServlet.java b/coordinator/src/main/java/org/apache/uniffle/coordinator/web/servlet/DecommissionServlet.java
index 96f06dd3..3f3ab1ef 100644
--- a/coordinator/src/main/java/org/apache/uniffle/coordinator/web/servlet/DecommissionServlet.java
+++ b/coordinator/src/main/java/org/apache/uniffle/coordinator/web/servlet/DecommissionServlet.java
@@ -28,7 +28,7 @@ import org.apache.uniffle.coordinator.CoordinatorServer;
import org.apache.uniffle.coordinator.web.Response;
import org.apache.uniffle.coordinator.web.request.DecommissionRequest;
-public class DecommissionServlet extends BaseServlet {
+public class DecommissionServlet extends BaseServlet<Object> {
private final CoordinatorServer coordinator;
public DecommissionServlet(CoordinatorServer coordinator) {
@@ -36,7 +36,7 @@ public class DecommissionServlet extends BaseServlet {
}
@Override
- protected Response handlePost(HttpServletRequest req, HttpServletResponse resp) throws IOException {
+ protected Response<Object> handlePost(HttpServletRequest req, HttpServletResponse resp) throws IOException {
DecommissionRequest params = parseParamsFromJson(req, DecommissionRequest.class);
if (CollectionUtils.isEmpty(params.getServerIds())) {
return Response.fail("Parameter[serverIds] should not be null!");
diff --git a/coordinator/src/main/java/org/apache/uniffle/coordinator/web/servlet/NodesServlet.java b/coordinator/src/main/java/org/apache/uniffle/coordinator/web/servlet/NodesServlet.java
index 04850235..788f7f0a 100644
--- a/coordinator/src/main/java/org/apache/uniffle/coordinator/web/servlet/NodesServlet.java
+++ b/coordinator/src/main/java/org/apache/uniffle/coordinator/web/servlet/NodesServlet.java
@@ -29,7 +29,7 @@ import org.apache.uniffle.coordinator.ServerNode;
import org.apache.uniffle.coordinator.web.Response;
-public class NodesServlet extends BaseServlet {
+public class NodesServlet extends BaseServlet<List<ServerNode>> {
private final CoordinatorServer coordinator;
public NodesServlet(CoordinatorServer coordinator) {
@@ -37,7 +37,7 @@ public class NodesServlet extends BaseServlet {
}
@Override
- protected Response handleGet(HttpServletRequest req, HttpServletResponse resp) {
+ protected Response<List<ServerNode>> handleGet(HttpServletRequest req, HttpServletResponse resp) {
List<ServerNode> serverList = coordinator.getClusterManager().getServerList(Collections.EMPTY_SET);
String id = req.getParameter("id");
String status = req.getParameter("status");
diff --git a/integration-test/common/src/test/java/org/apache/uniffle/test/CoordinatorGrpcTest.java b/integration-test/common/src/test/java/org/apache/uniffle/test/CoordinatorGrpcTest.java
index 800184b2..ac0c7c9b 100644
--- a/integration-test/common/src/test/java/org/apache/uniffle/test/CoordinatorGrpcTest.java
+++ b/integration-test/common/src/test/java/org/apache/uniffle/test/CoordinatorGrpcTest.java
@@ -135,7 +135,8 @@ public class CoordinatorGrpcTest extends CoordinatorTestBase {
Class<RssConf> clazz = RssConf.class;
Field field = clazz.getDeclaredField("settings");
field.setAccessible(true);
- ((ConcurrentHashMap) field.get(shuffleServerConf)).remove(ShuffleServerConf.NETTY_SERVER_PORT.key());
+ ((ConcurrentHashMap<Object, Object>) field.get(shuffleServerConf)).remove(
+ ShuffleServerConf.NETTY_SERVER_PORT.key());
String storageTypeJsonSource = String.format("{\"%s\": \"ssd\"}", baseDir);
withEnvironmentVariables("RSS_ENV_KEY", storageTypeJsonSource).execute(() -> {
ShuffleServer ss = new ShuffleServer((ShuffleServerConf) shuffleServerConf);
diff --git a/integration-test/common/src/test/java/org/apache/uniffle/test/ServletTest.java b/integration-test/common/src/test/java/org/apache/uniffle/test/ServletTest.java
index 943ecd40..a514660f 100644
--- a/integration-test/common/src/test/java/org/apache/uniffle/test/ServletTest.java
+++ b/integration-test/common/src/test/java/org/apache/uniffle/test/ServletTest.java
@@ -89,9 +89,9 @@ public class ServletTest extends IntegrationTestBase {
@Test
public void testNodesServlet() throws Exception {
String content = TestUtils.httpGet(NODES_URL);
- Response<List<HashMap>> response = objectMapper.readValue(content, new TypeReference<Response<List<HashMap>>>() {
- });
- List<HashMap> serverList = response.getData();
+ Response<List<HashMap<String, Object>>> response = objectMapper.readValue(content,
+ new TypeReference<Response<List<HashMap<String, Object>>>>() {});
+ List<HashMap<String, Object>> serverList = response.getData();
assertEquals(0, response.getCode());
assertEquals(2, serverList.size());
assertEquals(SHUFFLE_SERVER_PORT, Integer.parseInt(serverList.get(0).get("grpcPort").toString()));
@@ -102,18 +102,18 @@ public class ServletTest extends IntegrationTestBase {
// Only fetch one server.
ShuffleServer shuffleServer = shuffleServers.get(0);
content = TestUtils.httpGet(NODES_URL + "?id=" + shuffleServer.getId());
- response = objectMapper.readValue(content, new TypeReference<Response<List<HashMap>>>() {
+ response = objectMapper.readValue(content, new TypeReference<Response<List<HashMap<String, Object>>>>() {
});
serverList = response.getData();
assertEquals(1, serverList.size());
assertEquals(shuffleServer.getId(), serverList.get(0).get("id"));
content = TestUtils.httpGet(NODES_URL + "?status=DECOMMISSIONED");
- response = objectMapper.readValue(content, new TypeReference<Response<List<HashMap>>>() {});
+ response = objectMapper.readValue(content, new TypeReference<Response<List<HashMap<String, Object>>>>() {});
serverList = response.getData();
assertEquals(0, serverList.size());
content = TestUtils.httpGet(NODES_URL + "?status=ACTIVE");
- response = objectMapper.readValue(content, new TypeReference<Response<List<HashMap>>>() {});
+ response = objectMapper.readValue(content, new TypeReference<Response<List<HashMap<String, Object>>>>() {});
serverList = response.getData();
assertEquals(2, serverList.size());
}
@@ -125,7 +125,7 @@ public class ServletTest extends IntegrationTestBase {
DecommissionRequest decommissionRequest = new DecommissionRequest();
decommissionRequest.setServerIds(Sets.newHashSet("not_exist_serverId"));
String content = TestUtils.httpPost(CANCEL_DECOMMISSION_URL, objectMapper.writeValueAsString(decommissionRequest));
- Response response = objectMapper.readValue(content, Response.class);
+ Response<Object> response = objectMapper.readValue(content, Response.class);
assertEquals(-1, response.getCode());
assertNotNull(response.getErrMsg());
CancelDecommissionRequest cancelDecommissionRequest = new CancelDecommissionRequest();
diff --git a/storage/src/test/java/org/apache/uniffle/storage/handler/impl/PooledHadoopShuffleWriteHandlerTest.java b/storage/src/test/java/org/apache/uniffle/storage/handler/impl/PooledHadoopShuffleWriteHandlerTest.java
index 4c01b710..d946a372 100644
--- a/storage/src/test/java/org/apache/uniffle/storage/handler/impl/PooledHadoopShuffleWriteHandlerTest.java
+++ b/storage/src/test/java/org/apache/uniffle/storage/handler/impl/PooledHadoopShuffleWriteHandlerTest.java
@@ -64,7 +64,7 @@ public class PooledHadoopShuffleWriteHandlerTest {
@Test
public void lazyInitializeWriterHandlerTest() throws Exception {
int maxConcurrency = 5;
- LinkedBlockingDeque deque = new LinkedBlockingDeque(maxConcurrency);
+ LinkedBlockingDeque<ShuffleWriteHandler> deque = new LinkedBlockingDeque<>(maxConcurrency);
CopyOnWriteArrayList<Integer> invokedList = new CopyOnWriteArrayList<>();
CopyOnWriteArrayList<Integer> initializedList = new CopyOnWriteArrayList<>();
@@ -111,7 +111,7 @@ public class PooledHadoopShuffleWriteHandlerTest {
public void writeSameFileWhenNoRaceCondition() throws Exception {
int concurrency = 5;
CopyOnWriteArrayList<Integer> invokedIndexes = new CopyOnWriteArrayList<>();
- LinkedBlockingDeque deque = new LinkedBlockingDeque(concurrency);
+ LinkedBlockingDeque<ShuffleWriteHandler> deque = new LinkedBlockingDeque<>(concurrency);
for (int i = 0; i < concurrency; i++) {
deque.addFirst(
new FakedShuffleWriteHandler(invokedIndexes, i, () -> {
@@ -136,7 +136,7 @@ public class PooledHadoopShuffleWriteHandlerTest {
public void concurrentWrite() throws InterruptedException {
int concurrency = 5;
CopyOnWriteArrayList<Integer> invokedIndexes = new CopyOnWriteArrayList<>();
- LinkedBlockingDeque deque = new LinkedBlockingDeque(concurrency);
+ LinkedBlockingDeque<ShuffleWriteHandler> deque = new LinkedBlockingDeque<>(concurrency);
for (int i = 0; i < concurrency; i++) {
deque.addFirst(
new FakedShuffleWriteHandler(invokedIndexes, i, () -> {