You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2021/05/07 07:53:31 UTC

[flink] 01/02: [FLINK-22406][tests] Add RestClusterClient to MiniClusterWithClientResource

This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit a1c184631a6b4d90bddaa352b1bc4a0ac11eb50d
Author: Chesnay Schepler <ch...@apache.org>
AuthorDate: Wed May 5 11:59:55 2021 +0200

    [FLINK-22406][tests] Add RestClusterClient to MiniClusterWithClientResource
---
 .../test/util/MiniClusterWithClientResource.java   | 29 ++++++++++++++++++++++
 1 file changed, 29 insertions(+)

diff --git a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterWithClientResource.java b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterWithClientResource.java
index 62e0318..07ab415 100644
--- a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterWithClientResource.java
+++ b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterWithClientResource.java
@@ -20,9 +20,11 @@ package org.apache.flink.test.util;
 
 import org.apache.flink.client.program.ClusterClient;
 import org.apache.flink.client.program.MiniClusterClient;
+import org.apache.flink.client.program.rest.RestClusterClient;
 import org.apache.flink.runtime.testutils.MiniClusterResource;
 import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
 import org.apache.flink.streaming.util.TestStreamEnvironment;
+import org.apache.flink.util.ExceptionUtils;
 
 /**
  * Starts a Flink mini cluster as a resource and registers the respective ExecutionEnvironment and
@@ -31,6 +33,7 @@ import org.apache.flink.streaming.util.TestStreamEnvironment;
 public class MiniClusterWithClientResource extends MiniClusterResource {
 
     private ClusterClient<?> clusterClient;
+    private RestClusterClient<MiniClusterClient.MiniClusterId> restClusterClient;
 
     private TestEnvironment executionEnvironment;
 
@@ -43,6 +46,15 @@ public class MiniClusterWithClientResource extends MiniClusterResource {
         return clusterClient;
     }
 
+    /**
+     * Returns a {@link RestClusterClient} that can be used to communicate with this mini cluster.
+     * Only use this if the client returned via {@link #getClusterClient()} does not fulfill your
+     * needs.
+     */
+    public RestClusterClient<?> getRestClusterClient() throws Exception {
+        return restClusterClient;
+    }
+
     public TestEnvironment getTestEnvironment() {
         return executionEnvironment;
     }
@@ -52,6 +64,7 @@ public class MiniClusterWithClientResource extends MiniClusterResource {
         super.before();
 
         clusterClient = createMiniClusterClient();
+        restClusterClient = createRestClusterClient();
 
         executionEnvironment = new TestEnvironment(getMiniCluster(), getNumberSlots(), false);
         executionEnvironment.setAsContext();
@@ -76,6 +89,16 @@ public class MiniClusterWithClientResource extends MiniClusterResource {
 
         clusterClient = null;
 
+        if (restClusterClient != null) {
+            try {
+                restClusterClient.close();
+            } catch (Exception e) {
+                exception = ExceptionUtils.firstOrSuppressed(e, exception);
+            }
+        }
+
+        restClusterClient = null;
+
         super.after();
 
         if (exception != null) {
@@ -86,4 +109,10 @@ public class MiniClusterWithClientResource extends MiniClusterResource {
     private MiniClusterClient createMiniClusterClient() {
         return new MiniClusterClient(getClientConfiguration(), getMiniCluster());
     }
+
+    private RestClusterClient<MiniClusterClient.MiniClusterId> createRestClusterClient()
+            throws Exception {
+        return new RestClusterClient<>(
+                getClientConfiguration(), MiniClusterClient.MiniClusterId.INSTANCE);
+    }
 }