You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2021/05/07 07:53:31 UTC
[flink] 01/02: [FLINK-22406][tests] Add RestClusterClient to
MiniClusterWithClientResource
This is an automated email from the ASF dual-hosted git repository.
chesnay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit a1c184631a6b4d90bddaa352b1bc4a0ac11eb50d
Author: Chesnay Schepler <ch...@apache.org>
AuthorDate: Wed May 5 11:59:55 2021 +0200
[FLINK-22406][tests] Add RestClusterClient to MiniClusterWithClientResource
---
.../test/util/MiniClusterWithClientResource.java | 29 ++++++++++++++++++++++
1 file changed, 29 insertions(+)
diff --git a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterWithClientResource.java b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterWithClientResource.java
index 62e0318..07ab415 100644
--- a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterWithClientResource.java
+++ b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterWithClientResource.java
@@ -20,9 +20,11 @@ package org.apache.flink.test.util;
import org.apache.flink.client.program.ClusterClient;
import org.apache.flink.client.program.MiniClusterClient;
+import org.apache.flink.client.program.rest.RestClusterClient;
import org.apache.flink.runtime.testutils.MiniClusterResource;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import org.apache.flink.streaming.util.TestStreamEnvironment;
+import org.apache.flink.util.ExceptionUtils;
/**
* Starts a Flink mini cluster as a resource and registers the respective ExecutionEnvironment and
@@ -31,6 +33,7 @@ import org.apache.flink.streaming.util.TestStreamEnvironment;
public class MiniClusterWithClientResource extends MiniClusterResource {
private ClusterClient<?> clusterClient;
+ private RestClusterClient<MiniClusterClient.MiniClusterId> restClusterClient;
private TestEnvironment executionEnvironment;
@@ -43,6 +46,15 @@ public class MiniClusterWithClientResource extends MiniClusterResource {
return clusterClient;
}
+ /**
+ * Returns a {@link RestClusterClient} that can be used to communicate with this mini cluster.
+ * Only use this if the client returned via {@link #getClusterClient()} does not fulfill your
+ * needs.
+ */
+ public RestClusterClient<?> getRestClusterClient() throws Exception {
+ return restClusterClient;
+ }
+
public TestEnvironment getTestEnvironment() {
return executionEnvironment;
}
@@ -52,6 +64,7 @@ public class MiniClusterWithClientResource extends MiniClusterResource {
super.before();
clusterClient = createMiniClusterClient();
+ restClusterClient = createRestClusterClient();
executionEnvironment = new TestEnvironment(getMiniCluster(), getNumberSlots(), false);
executionEnvironment.setAsContext();
@@ -76,6 +89,16 @@ public class MiniClusterWithClientResource extends MiniClusterResource {
clusterClient = null;
+ if (restClusterClient != null) {
+ try {
+ restClusterClient.close();
+ } catch (Exception e) {
+ exception = ExceptionUtils.firstOrSuppressed(e, exception);
+ }
+ }
+
+ restClusterClient = null;
+
super.after();
if (exception != null) {
@@ -86,4 +109,10 @@ public class MiniClusterWithClientResource extends MiniClusterResource {
private MiniClusterClient createMiniClusterClient() {
return new MiniClusterClient(getClientConfiguration(), getMiniCluster());
}
+
+ private RestClusterClient<MiniClusterClient.MiniClusterId> createRestClusterClient()
+ throws Exception {
+ return new RestClusterClient<>(
+ getClientConfiguration(), MiniClusterClient.MiniClusterId.INSTANCE);
+ }
}