You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2021/11/05 08:45:59 UTC
[flink] 03/04: [FLINK-24208][rest] Consistent nullable getters
This is an automated email from the ASF dual-hosted git repository.
chesnay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 9d707b309dde763c07811d4b9e4a9923c7700741
Author: Chesnay Schepler <ch...@apache.org>
AuthorDate: Thu Nov 4 09:47:35 2021 +0100
[FLINK-24208][rest] Consistent nullable getters
---
.../rest/RestClusterClientSavepointTriggerTest.java | 6 +++---
.../handler/job/savepoints/SavepointHandlers.java | 20 ++++++++------------
.../job/savepoints/SavepointTriggerRequestBody.java | 6 +++---
.../stop/StopWithSavepointRequestBody.java | 6 +++---
.../job/savepoints/SavepointHandlersTest.java | 2 +-
.../savepoints/StopWithSavepointHandlersTest.java | 2 +-
.../messages/SavepointHandlerRequestBodyTest.java | 5 ++---
7 files changed, 21 insertions(+), 26 deletions(-)
diff --git a/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientSavepointTriggerTest.java b/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientSavepointTriggerTest.java
index 061abea..5310e85 100644
--- a/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientSavepointTriggerTest.java
+++ b/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientSavepointTriggerTest.java
@@ -70,11 +70,11 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicBoolean;
+import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.instanceOf;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
@@ -129,7 +129,7 @@ public class RestClusterClientSavepointTriggerTest extends TestLogger {
try (final RestServerEndpoint restServerEndpoint =
createRestServerEndpoint(
request -> {
- assertNull(request.getTargetDirectory());
+ assertThat(request.getTargetDirectory().isPresent(), is(false));
assertFalse(request.isCancelJob());
return triggerId;
},
@@ -158,7 +158,7 @@ public class RestClusterClientSavepointTriggerTest extends TestLogger {
triggerRequestBody -> {
assertEquals(
expectedSubmittedSavepointDir,
- triggerRequestBody.getTargetDirectory());
+ triggerRequestBody.getTargetDirectory().get());
assertFalse(triggerRequestBody.isCancelJob());
return triggerId;
},
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/savepoints/SavepointHandlers.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/savepoints/SavepointHandlers.java
index 6ce3bb3..97cb1d0 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/savepoints/SavepointHandlers.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/savepoints/SavepointHandlers.java
@@ -179,9 +179,10 @@ public class SavepointHandlers {
AsynchronousJobOperationKey operationKey,
final RestfulGateway gateway)
throws RestHandlerException {
- final String requestedTargetDirectory = request.getRequestBody().getTargetDirectory();
+ final Optional<String> requestedTargetDirectory =
+ request.getRequestBody().getTargetDirectory();
- if (requestedTargetDirectory == null && defaultSavepointDir == null) {
+ if (!requestedTargetDirectory.isPresent() && defaultSavepointDir == null) {
throw new RestHandlerException(
String.format(
"Config key [%s] is not set. Property [%s] must be provided.",
@@ -194,10 +195,7 @@ public class SavepointHandlers {
request.getRequestBody().shouldDrain()
? TriggerSavepointMode.TERMINATE_WITH_SAVEPOINT
: TriggerSavepointMode.SUSPEND_WITH_SAVEPOINT;
- final String targetDirectory =
- requestedTargetDirectory != null
- ? requestedTargetDirectory
- : defaultSavepointDir;
+ final String targetDirectory = requestedTargetDirectory.orElse(defaultSavepointDir);
return gateway.stopWithSavepoint(
operationKey, targetDirectory, savepointMode, RpcUtils.INF_TIMEOUT);
}
@@ -224,9 +222,10 @@ public class SavepointHandlers {
AsynchronousJobOperationKey operationKey,
RestfulGateway gateway)
throws RestHandlerException {
- final String requestedTargetDirectory = request.getRequestBody().getTargetDirectory();
+ final Optional<String> requestedTargetDirectory =
+ request.getRequestBody().getTargetDirectory();
- if (requestedTargetDirectory == null && defaultSavepointDir == null) {
+ if (!requestedTargetDirectory.isPresent() && defaultSavepointDir == null) {
throw new RestHandlerException(
String.format(
"Config key [%s] is not set. Property [%s] must be provided.",
@@ -239,10 +238,7 @@ public class SavepointHandlers {
request.getRequestBody().isCancelJob()
? TriggerSavepointMode.CANCEL_WITH_SAVEPOINT
: TriggerSavepointMode.SAVEPOINT;
- final String targetDirectory =
- requestedTargetDirectory != null
- ? requestedTargetDirectory
- : defaultSavepointDir;
+ final String targetDirectory = requestedTargetDirectory.orElse(defaultSavepointDir);
return gateway.triggerSavepoint(
operationKey, targetDirectory, savepointMode, RpcUtils.INF_TIMEOUT);
}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/savepoints/SavepointTriggerRequestBody.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/savepoints/SavepointTriggerRequestBody.java
index 6ae61be..ed5ee1e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/savepoints/SavepointTriggerRequestBody.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/savepoints/SavepointTriggerRequestBody.java
@@ -59,9 +59,9 @@ public class SavepointTriggerRequestBody implements RequestBody {
this.triggerId = triggerId;
}
- @Nullable
- public String getTargetDirectory() {
- return targetDirectory;
+ @JsonIgnore
+ public Optional<String> getTargetDirectory() {
+ return Optional.ofNullable(targetDirectory);
}
@JsonIgnore
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/savepoints/stop/StopWithSavepointRequestBody.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/savepoints/stop/StopWithSavepointRequestBody.java
index a75bab7..cf80526 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/savepoints/stop/StopWithSavepointRequestBody.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/savepoints/stop/StopWithSavepointRequestBody.java
@@ -59,9 +59,9 @@ public class StopWithSavepointRequestBody implements RequestBody {
this.triggerId = triggerId;
}
- @Nullable
- public String getTargetDirectory() {
- return targetDirectory;
+ @JsonIgnore
+ public Optional<String> getTargetDirectory() {
+ return Optional.ofNullable(targetDirectory);
}
public boolean shouldDrain() {
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/savepoints/SavepointHandlersTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/savepoints/SavepointHandlersTest.java
index 468495f..95b7c36 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/savepoints/SavepointHandlersTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/savepoints/SavepointHandlersTest.java
@@ -262,7 +262,7 @@ public class SavepointHandlersTest extends TestLogger {
}
private static HandlerRequest<SavepointTriggerRequestBody> triggerSavepointRequest(
- final String targetDirectory, @Nullable TriggerId triggerId)
+ @Nullable final String targetDirectory, @Nullable TriggerId triggerId)
throws HandlerRequestException {
return HandlerRequest.resolveParametersAndCreate(
new SavepointTriggerRequestBody(targetDirectory, false, triggerId),
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/savepoints/StopWithSavepointHandlersTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/savepoints/StopWithSavepointHandlersTest.java
index feb7ed8..a2db348 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/savepoints/StopWithSavepointHandlersTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/savepoints/StopWithSavepointHandlersTest.java
@@ -267,7 +267,7 @@ public class StopWithSavepointHandlersTest extends TestLogger {
}
private static HandlerRequest<StopWithSavepointRequestBody> triggerSavepointRequest(
- final String targetDirectory, @Nullable TriggerId triggerId)
+ @Nullable final String targetDirectory, @Nullable TriggerId triggerId)
throws HandlerRequestException {
return HandlerRequest.resolveParametersAndCreate(
new StopWithSavepointRequestBody(targetDirectory, false, triggerId),
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/SavepointHandlerRequestBodyTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/SavepointHandlerRequestBodyTest.java
index 75daf85..40b6eef 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/SavepointHandlerRequestBodyTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/SavepointHandlerRequestBodyTest.java
@@ -27,7 +27,6 @@ import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMap
import org.junit.Test;
import static org.hamcrest.CoreMatchers.is;
-import static org.hamcrest.CoreMatchers.nullValue;
import static org.junit.Assert.assertThat;
/** Tests for the savepoint request bodies. */
@@ -40,7 +39,7 @@ public class SavepointHandlerRequestBodyTest {
assertThat(defaultParseResult.isCancelJob(), is(false));
- assertThat(defaultParseResult.getTargetDirectory(), nullValue());
+ assertThat(defaultParseResult.getTargetDirectory().isPresent(), is(false));
}
@Test
@@ -51,7 +50,7 @@ public class SavepointHandlerRequestBodyTest {
assertThat(defaultParseResult.shouldDrain(), is(false));
- assertThat(defaultParseResult.getTargetDirectory(), nullValue());
+ assertThat(defaultParseResult.getTargetDirectory().isPresent(), is(false));
}
private static <T> T getDefaultParseResult(Class<T> clazz) throws JsonProcessingException {