You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by walterddr <gi...@git.apache.org> on 2018/04/15 15:13:08 UTC
[GitHub] flink pull request #5849: [FLINK-8986][e2e-test] Flink end to end test REST ...
GitHub user walterddr opened a pull request:
https://github.com/apache/flink/pull/5849
[FLINK-8986][e2e-test] Flink end to end test REST API
## What is the purpose of the change
Adding end to end test for REST APIs for FLIP-6 generated endpoints
## Brief change log
- Added `flink-rest-api-test` module in `flink-end-to-end-test` with a periodic stream job and a test suite that runs the REST API tests.
- Added test script to run the REST API test.
## Verifying this change
N/A, this is a test
## Does this pull request potentially affect one of the following parts:
No
## Documentation
You can merge this pull request into a Git repository by running:
$ git pull https://github.com/walterddr/flink FLINK-8986
Alternatively you can review and apply these changes as the patch at:
https://github.com/apache/flink/pull/5849.patch
To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:
This closes #5849
----
commit 526e5ac1f9758451417972143d7117095b18f2ab
Author: Rong Rong <wa...@...>
Date: 2018-04-15T15:05:08Z
Flink end to end test REST API with generated endpoints only
----
---
[GitHub] flink pull request #5849: [FLINK-8986][e2e-test] Flink end to end test REST ...
Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:
https://github.com/apache/flink/pull/5849#discussion_r181688947
--- Diff: flink-end-to-end-tests/flink-rest-api-test/src/main/java/org/apache/flink/runtime/rest/tests/RestApiTestSuite.java ---
@@ -0,0 +1,363 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.tests;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.RestOptions;
+import org.apache.flink.runtime.blob.TransientBlobKey;
+import org.apache.flink.runtime.blob.TransientBlobService;
+import org.apache.flink.runtime.concurrent.Executors;
+import org.apache.flink.runtime.dispatcher.DispatcherGateway;
+import org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint;
+import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.leaderelection.LeaderContender;
+import org.apache.flink.runtime.leaderelection.LeaderElectionService;
+import org.apache.flink.runtime.messages.webmonitor.JobIdsWithStatusOverview;
+import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
+import org.apache.flink.runtime.rest.HttpMethodWrapper;
+import org.apache.flink.runtime.rest.RestServerEndpoint;
+import org.apache.flink.runtime.rest.RestServerEndpointConfiguration;
+import org.apache.flink.runtime.rest.handler.RestHandlerConfiguration;
+import org.apache.flink.runtime.rest.handler.RestHandlerSpecification;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.MessageHeaders;
+import org.apache.flink.runtime.rest.messages.job.JobDetailsInfo;
+import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagersInfo;
+import org.apache.flink.runtime.rpc.FatalErrorHandler;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+import org.apache.flink.runtime.webmonitor.retriever.MetricQueryServiceRetriever;
+import org.apache.flink.runtime.webmonitor.testutils.HttpTestClient;
+import org.apache.flink.util.ConfigurationException;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandler;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+
+import scala.concurrent.duration.FiniteDuration;
+
+/**
+ * Rest API test suite.
+ */
+public class RestApiTestSuite {
+ private static final FiniteDuration TEST_TIMEOUT = new FiniteDuration(10L, TimeUnit.SECONDS);
+ private static final ObjectMapper MAPPER = new ObjectMapper();
+
+ private static int testSuccessCount = 0;
+ private static int testFailureCount = 0;
+ private static int testSkipCount = 0;
+
+ public static void main(String[] args) throws Exception {
+
+ ParameterTool params = ParameterTool.fromArgs(args);
+ final String host = params.get("host", "localhost");
+ final int port = params.getInt("port", 8081);
+ final HttpTestClient httpClient = new HttpTestClient(host, port);
+
+ // Validate Flink cluster is running
+ JobIdsWithStatusOverview jobOverview = getJobOverview(httpClient);
+
+ // Get necessary parameters for testing GET endpoints
+ Map<String, String> parameterMap = getParameterMaps(httpClient, jobOverview);
+
+ // Get list of endpoints
+ List<MessageHeaders> specs = new E2ETestDispatcherRestEndpoint().getSpecs();
+ specs.forEach(spec -> testMonitoringEndpointSpecs(httpClient, spec, parameterMap));
+
+ if (testFailureCount != 0) {
+ throw new RuntimeException("There are test failures. Success: " + testSuccessCount +
+ " Failures: " + testFailureCount + " Skipped: " + testSkipCount);
+ }
+ }
+
+ @SuppressWarnings("ConstantConditions")
+ private static Map<String, String> getParameterMaps(HttpTestClient httpClient,
+ JobIdsWithStatusOverview jobOverview) throws InterruptedException, IOException, TimeoutException {
+ // Get necessary parameters used for all REST API testings.
+ final Map<String, String> parameterMap = new HashMap<>();
+ Preconditions.checkState(jobOverview.getJobsWithStatus().stream()
+ .filter(jobIdWithStatus -> jobIdWithStatus.getJobStatus() == JobStatus.RUNNING)
+ .count() >= 1, "Cannot found active running jobs, discontinuing test!");
+ String jobId = jobOverview.getJobsWithStatus().stream()
+ .filter(jobIdWithStatus -> jobIdWithStatus.getJobStatus() == JobStatus.RUNNING)
--- End diff --
replace with `firstMatch`
---
[GitHub] flink pull request #5849: [FLINK-8986][e2e-test] Flink end to end test REST ...
Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:
https://github.com/apache/flink/pull/5849#discussion_r181669868
--- Diff: flink-end-to-end-tests/flink-rest-api-test/src/main/java/org/apache/flink/runtime/rest/tests/RestApiTestSuite.java ---
@@ -0,0 +1,363 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.tests;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.RestOptions;
+import org.apache.flink.runtime.blob.TransientBlobKey;
+import org.apache.flink.runtime.blob.TransientBlobService;
+import org.apache.flink.runtime.concurrent.Executors;
+import org.apache.flink.runtime.dispatcher.DispatcherGateway;
+import org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint;
+import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.leaderelection.LeaderContender;
+import org.apache.flink.runtime.leaderelection.LeaderElectionService;
+import org.apache.flink.runtime.messages.webmonitor.JobIdsWithStatusOverview;
+import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
+import org.apache.flink.runtime.rest.HttpMethodWrapper;
+import org.apache.flink.runtime.rest.RestServerEndpoint;
+import org.apache.flink.runtime.rest.RestServerEndpointConfiguration;
+import org.apache.flink.runtime.rest.handler.RestHandlerConfiguration;
+import org.apache.flink.runtime.rest.handler.RestHandlerSpecification;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.MessageHeaders;
+import org.apache.flink.runtime.rest.messages.job.JobDetailsInfo;
+import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagersInfo;
+import org.apache.flink.runtime.rpc.FatalErrorHandler;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+import org.apache.flink.runtime.webmonitor.retriever.MetricQueryServiceRetriever;
+import org.apache.flink.runtime.webmonitor.testutils.HttpTestClient;
+import org.apache.flink.util.ConfigurationException;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandler;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+
+import scala.concurrent.duration.FiniteDuration;
+
+/**
+ * Rest API test suite.
+ */
+public class RestApiTestSuite {
+ private static final FiniteDuration TEST_TIMEOUT = new FiniteDuration(10L, TimeUnit.SECONDS);
+ private static final ObjectMapper MAPPER = new ObjectMapper();
+
+ private static int testSuccessCount = 0;
+ private static int testFailureCount = 0;
+ private static int testSkipCount = 0;
+
+ public static void main(String[] args) throws Exception {
+
+ ParameterTool params = ParameterTool.fromArgs(args);
+ final String host = params.get("host", "localhost");
+ final int port = params.getInt("port", 8081);
+ final HttpTestClient httpClient = new HttpTestClient(host, port);
+
+ // Validate Flink cluster is running
+ JobIdsWithStatusOverview jobOverview = getJobOverview(httpClient);
+
+ // Get necessary parameters for testing GET endpoints
+ Map<String, String> parameterMap = getParameterMaps(httpClient, jobOverview);
+
+ // Get list of endpoints
+ List<MessageHeaders> specs = new E2ETestDispatcherRestEndpoint().getSpecs();
+ specs.forEach(spec -> testMonitoringEndpointSpecs(httpClient, spec, parameterMap));
+
+ if (testFailureCount != 0) {
+ throw new RuntimeException("There are test failures. Success: " + testSuccessCount +
+ " Failures: " + testFailureCount + " Skipped: " + testSkipCount);
+ }
+ }
+
+ @SuppressWarnings("ConstantConditions")
+ private static Map<String, String> getParameterMaps(HttpTestClient httpClient,
+ JobIdsWithStatusOverview jobOverview) throws InterruptedException, IOException, TimeoutException {
+ // Get necessary parameters used for all REST API testings.
+ final Map<String, String> parameterMap = new HashMap<>();
+ Preconditions.checkState(jobOverview.getJobsWithStatus().stream()
+ .filter(jobIdWithStatus -> jobIdWithStatus.getJobStatus() == JobStatus.RUNNING)
+ .count() >= 1, "Cannot found active running jobs, discontinuing test!");
+ String jobId = jobOverview.getJobsWithStatus().stream()
+ .filter(jobIdWithStatus -> jobIdWithStatus.getJobStatus() == JobStatus.RUNNING)
+ .findFirst().get().getJobId().toString();
+ parameterMap.put(":jobid", jobId);
+
+ JobDetailsInfo jobDetailsInfo = getJobDetailInfo(httpClient, jobId);
+ String vertexId = jobDetailsInfo.getJobVertexInfos().stream()
+ .findFirst().get().getJobVertexID().toString();
+ parameterMap.put(":vertexid", vertexId);
+ parameterMap.put(":checkpointid", "1"); // test first checkpoint
+ parameterMap.put(":subtaskindex", "0"); // test first subtask
+
+ TaskManagersInfo taskManagersInfo = getTaskManagers(httpClient);
+ String taskMgrId = taskManagersInfo.getTaskManagerInfos().stream().findFirst().get().getResourceId().toString();
+ parameterMap.put(":taskmanagerid", taskMgrId);
+ parameterMap.put(":triggerid", "");
+
+ return parameterMap;
+ }
+
+ private static JobIdsWithStatusOverview getJobOverview(HttpTestClient httpClient)
+ throws TimeoutException, InterruptedException, IOException {
+ httpClient.sendGetRequest("/jobs", TEST_TIMEOUT);
+ HttpTestClient.SimpleHttpResponse resp = httpClient.getNextResponse();
+ Preconditions.checkState(resp.getStatus().code() == 200,
+ "Cannot fetch Flink cluster status!");
+ return MAPPER.readValue(resp.getContent(), JobIdsWithStatusOverview.class);
+ }
+
+ private static JobDetailsInfo getJobDetailInfo(HttpTestClient httpClient, String jobId)
+ throws TimeoutException, InterruptedException, IOException {
+ httpClient.sendGetRequest("/jobs/" + jobId, TEST_TIMEOUT);
+ HttpTestClient.SimpleHttpResponse resp = httpClient.getNextResponse();
+ Preconditions.checkState(resp.getStatus().code() == 200,
+ "Cannot fetch job detail information for job " + jobId);
+ return MAPPER.readValue(resp.getContent(), JobDetailsInfo.class);
+ }
+
+ private static TaskManagersInfo getTaskManagers(HttpTestClient httpClient)
+ throws TimeoutException, InterruptedException, IOException {
+ httpClient.sendGetRequest("/taskmanagers", TEST_TIMEOUT);
+ HttpTestClient.SimpleHttpResponse resp = httpClient.getNextResponse();
+ Preconditions.checkState(resp.getStatus().code() == 200,
+ "Cannot fetch task manager status!");
+ return MAPPER.readValue(resp.getContent(), TaskManagersInfo.class);
+ }
+
+ private static void testMonitoringEndpointSpecs(HttpTestClient httpClient, MessageHeaders spec,
+ Map<String, String> parameterMap) {
+ try {
+ HttpMethodWrapper method = spec.getHttpMethod();
+ String path = getRestEndpointPath(spec.getTargetRestEndpointURL(), parameterMap);
+ if (spec.getRequestClass() == EmptyRequestBody.class) {
+ switch (method) {
+ case GET:
+ httpClient.sendGetRequest(path, TEST_TIMEOUT);
+ break;
+ case DELETE:
+ httpClient.sendDeleteRequest(path, TEST_TIMEOUT);
+ break;
+ default:
+ throw new UnsupportedOperationException("Cannot handle REST Test for " + path +
+ " with method " + method + ". Only GET and DELETE requests are supported!");
+ }
+
+ HttpTestClient.SimpleHttpResponse resp = httpClient.getNextResponse();
+
+ Preconditions.checkState(resp.getStatus().code() == spec.getResponseStatusCode().code(),
--- End diff --
this should be an assertion, and you can compare the status directly.
---
[GitHub] flink pull request #5849: [FLINK-8986][e2e-test] Flink end to end test REST ...
Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:
https://github.com/apache/flink/pull/5849#discussion_r181685888
--- Diff: flink-end-to-end-tests/flink-rest-api-test/src/main/java/org/apache/flink/runtime/rest/tests/RestApiTestSuite.java ---
@@ -0,0 +1,363 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.tests;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.RestOptions;
+import org.apache.flink.runtime.blob.TransientBlobKey;
+import org.apache.flink.runtime.blob.TransientBlobService;
+import org.apache.flink.runtime.concurrent.Executors;
+import org.apache.flink.runtime.dispatcher.DispatcherGateway;
+import org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint;
+import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.leaderelection.LeaderContender;
+import org.apache.flink.runtime.leaderelection.LeaderElectionService;
+import org.apache.flink.runtime.messages.webmonitor.JobIdsWithStatusOverview;
+import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
+import org.apache.flink.runtime.rest.HttpMethodWrapper;
+import org.apache.flink.runtime.rest.RestServerEndpoint;
+import org.apache.flink.runtime.rest.RestServerEndpointConfiguration;
+import org.apache.flink.runtime.rest.handler.RestHandlerConfiguration;
+import org.apache.flink.runtime.rest.handler.RestHandlerSpecification;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.MessageHeaders;
+import org.apache.flink.runtime.rest.messages.job.JobDetailsInfo;
+import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagersInfo;
+import org.apache.flink.runtime.rpc.FatalErrorHandler;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+import org.apache.flink.runtime.webmonitor.retriever.MetricQueryServiceRetriever;
+import org.apache.flink.runtime.webmonitor.testutils.HttpTestClient;
+import org.apache.flink.util.ConfigurationException;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandler;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+
+import scala.concurrent.duration.FiniteDuration;
+
+/**
+ * Rest API test suite.
+ */
+public class RestApiTestSuite {
+ private static final FiniteDuration TEST_TIMEOUT = new FiniteDuration(10L, TimeUnit.SECONDS);
+ private static final ObjectMapper MAPPER = new ObjectMapper();
+
+ private static int testSuccessCount = 0;
+ private static int testFailureCount = 0;
+ private static int testSkipCount = 0;
+
+ public static void main(String[] args) throws Exception {
+
+ ParameterTool params = ParameterTool.fromArgs(args);
+ final String host = params.get("host", "localhost");
+ final int port = params.getInt("port", 8081);
+ final HttpTestClient httpClient = new HttpTestClient(host, port);
+
+ // Validate Flink cluster is running
+ JobIdsWithStatusOverview jobOverview = getJobOverview(httpClient);
+
+ // Get necessary parameters for testing GET endpoints
+ Map<String, String> parameterMap = getParameterMaps(httpClient, jobOverview);
+
+ // Get list of endpoints
+ List<MessageHeaders> specs = new E2ETestDispatcherRestEndpoint().getSpecs();
--- End diff --
raw parameter usage
---
[GitHub] flink pull request #5849: [FLINK-8986][e2e-test][WIP] Flink end to end test ...
Posted by walterddr <gi...@git.apache.org>.
Github user walterddr closed the pull request at:
https://github.com/apache/flink/pull/5849
---
[GitHub] flink issue #5849: [FLINK-8986][e2e-test][WIP] Flink end to end test REST AP...
Posted by tzulitai <gi...@git.apache.org>.
Github user tzulitai commented on the issue:
https://github.com/apache/flink/pull/5849
@walterddr yes, you can do that. In the description, just leave some notice that the PR is based on another, and which of the commits are relevant.
---
[GitHub] flink pull request #5849: [FLINK-8986][e2e-test] Flink end to end test REST ...
Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:
https://github.com/apache/flink/pull/5849#discussion_r181688583
--- Diff: flink-end-to-end-tests/test-scripts/test_rest_api.sh ---
@@ -0,0 +1,44 @@
+#!/usr/bin/env bash
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+source "$(dirname "$0")"/common.sh
+
+TEST_JAR_JAR=$TEST_INFRA_DIR/../../flink-end-to-end-tests/flink-rest-api-test/target/PeriodicStreamingJob.jar
+TEST_PROGRAM_JAR=$TEST_INFRA_DIR/../../flink-end-to-end-tests/flink-rest-api-test/target/RestApiTestSuite.jar
+
+echo "Run Rest-Api-Test Program"
+
+start_cluster
+$FLINK_DIR/bin/taskmanager.sh start
--- End diff --
what are the extra taskmanagers for?
---
[GitHub] flink issue #5849: [FLINK-8986][e2e-test] Flink end to end test REST API
Posted by walterddr <gi...@git.apache.org>.
Github user walterddr commented on the issue:
https://github.com/apache/flink/pull/5849
At this moment it skips 8 of the 39 tests (PATCH & POST methods). And there's 2 test failures and I think should be fix in another JIRA. Namely,
`/jobs/:jobid/vertices/:vertexid/subtasks/:subtaskid/attempts/attempt/accumulators` returns a 500
`/jobs/:jobid/vertices/:vertexid/subtasks/:subtaskid/attempts/attempt/` returns a 500
---
[GitHub] flink pull request #5849: [FLINK-8986][e2e-test] Flink end to end test REST ...
Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:
https://github.com/apache/flink/pull/5849#discussion_r181688557
--- Diff: flink-end-to-end-tests/test-scripts/test_rest_api.sh ---
@@ -0,0 +1,44 @@
+#!/usr/bin/env bash
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+source "$(dirname "$0")"/common.sh
+
+TEST_JAR_JAR=$TEST_INFRA_DIR/../../flink-end-to-end-tests/flink-rest-api-test/target/PeriodicStreamingJob.jar
+TEST_PROGRAM_JAR=$TEST_INFRA_DIR/../../flink-end-to-end-tests/flink-rest-api-test/target/RestApiTestSuite.jar
+
+echo "Run Rest-Api-Test Program"
+
+start_cluster
+$FLINK_DIR/bin/taskmanager.sh start
+$FLINK_DIR/bin/taskmanager.sh start
+$FLINK_DIR/bin/taskmanager.sh start
+
+# Start periodic streaming job to test against
+$FLINK_DIR/bin/flink run -p 4 $TEST_JAR_JAR -outputPath file://${TEST_DATA_DIR}/out/result &
--- End diff --
there's no benefit to running this with a higher parallelism
---
[GitHub] flink issue #5849: [FLINK-8986][e2e-test][WIP] Flink end to end test REST AP...
Posted by walterddr <gi...@git.apache.org>.
Github user walterddr commented on the issue:
https://github.com/apache/flink/pull/5849
Hi @tzulitai . I've actually created a new version of the test based on @zentol 's comment on this PR: https://github.com/walterddr/flink/compare/FLINK-8985...walterddr:FLINK-8986-test
But it actually depends on https://github.com/apache/flink/pull/5863 as I reused the periodic stream job for testing. Is it possible to create a PR on top of another currently pending PR?
---
[GitHub] flink pull request #5849: [FLINK-8986][e2e-test] Flink end to end test REST ...
Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:
https://github.com/apache/flink/pull/5849#discussion_r181683094
--- Diff: flink-end-to-end-tests/flink-rest-api-test/src/main/java/org/apache/flink/runtime/rest/tests/RestApiTestSuite.java ---
@@ -0,0 +1,363 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.tests;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.RestOptions;
+import org.apache.flink.runtime.blob.TransientBlobKey;
+import org.apache.flink.runtime.blob.TransientBlobService;
+import org.apache.flink.runtime.concurrent.Executors;
+import org.apache.flink.runtime.dispatcher.DispatcherGateway;
+import org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint;
+import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.leaderelection.LeaderContender;
+import org.apache.flink.runtime.leaderelection.LeaderElectionService;
+import org.apache.flink.runtime.messages.webmonitor.JobIdsWithStatusOverview;
+import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
+import org.apache.flink.runtime.rest.HttpMethodWrapper;
+import org.apache.flink.runtime.rest.RestServerEndpoint;
+import org.apache.flink.runtime.rest.RestServerEndpointConfiguration;
+import org.apache.flink.runtime.rest.handler.RestHandlerConfiguration;
+import org.apache.flink.runtime.rest.handler.RestHandlerSpecification;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.MessageHeaders;
+import org.apache.flink.runtime.rest.messages.job.JobDetailsInfo;
+import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagersInfo;
+import org.apache.flink.runtime.rpc.FatalErrorHandler;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+import org.apache.flink.runtime.webmonitor.retriever.MetricQueryServiceRetriever;
+import org.apache.flink.runtime.webmonitor.testutils.HttpTestClient;
+import org.apache.flink.util.ConfigurationException;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandler;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+
+import scala.concurrent.duration.FiniteDuration;
+
+/**
+ * Rest API test suite.
+ */
+public class RestApiTestSuite {
+ private static final FiniteDuration TEST_TIMEOUT = new FiniteDuration(10L, TimeUnit.SECONDS);
+ private static final ObjectMapper MAPPER = new ObjectMapper();
+
+ private static int testSuccessCount = 0;
+ private static int testFailureCount = 0;
+ private static int testSkipCount = 0;
+
+ public static void main(String[] args) throws Exception {
+
+ ParameterTool params = ParameterTool.fromArgs(args);
+ final String host = params.get("host", "localhost");
+ final int port = params.getInt("port", 8081);
+ final HttpTestClient httpClient = new HttpTestClient(host, port);
+
+ // Validate Flink cluster is running
+ JobIdsWithStatusOverview jobOverview = getJobOverview(httpClient);
+
+ // Get necessary parameters for testing GET endpoints
+ Map<String, String> parameterMap = getParameterMaps(httpClient, jobOverview);
+
+ // Get list of endpoints
+ List<MessageHeaders> specs = new E2ETestDispatcherRestEndpoint().getSpecs();
+ specs.forEach(spec -> testMonitoringEndpointSpecs(httpClient, spec, parameterMap));
+
+ if (testFailureCount != 0) {
+ throw new RuntimeException("There are test failures. Success: " + testSuccessCount +
+ " Failures: " + testFailureCount + " Skipped: " + testSkipCount);
+ }
+ }
+
+ @SuppressWarnings("ConstantConditions")
+ private static Map<String, String> getParameterMaps(HttpTestClient httpClient,
+ JobIdsWithStatusOverview jobOverview) throws InterruptedException, IOException, TimeoutException {
+ // Get necessary parameters used for all REST API testings.
+ final Map<String, String> parameterMap = new HashMap<>();
+ Preconditions.checkState(jobOverview.getJobsWithStatus().stream()
+ .filter(jobIdWithStatus -> jobIdWithStatus.getJobStatus() == JobStatus.RUNNING)
+ .count() >= 1, "Cannot found active running jobs, discontinuing test!");
+ String jobId = jobOverview.getJobsWithStatus().stream()
+ .filter(jobIdWithStatus -> jobIdWithStatus.getJobStatus() == JobStatus.RUNNING)
+ .findFirst().get().getJobId().toString();
+ parameterMap.put(":jobid", jobId);
+
+ JobDetailsInfo jobDetailsInfo = getJobDetailInfo(httpClient, jobId);
+ String vertexId = jobDetailsInfo.getJobVertexInfos().stream()
+ .findFirst().get().getJobVertexID().toString();
+ parameterMap.put(":vertexid", vertexId);
+ parameterMap.put(":checkpointid", "1"); // test first checkpoint
+ parameterMap.put(":subtaskindex", "0"); // test first subtask
+
+ TaskManagersInfo taskManagersInfo = getTaskManagers(httpClient);
+ String taskMgrId = taskManagersInfo.getTaskManagerInfos().stream().findFirst().get().getResourceId().toString();
+ parameterMap.put(":taskmanagerid", taskMgrId);
+ parameterMap.put(":triggerid", "");
+
+ return parameterMap;
+ }
+
+ private static JobIdsWithStatusOverview getJobOverview(HttpTestClient httpClient)
+ throws TimeoutException, InterruptedException, IOException {
+ httpClient.sendGetRequest("/jobs", TEST_TIMEOUT);
+ HttpTestClient.SimpleHttpResponse resp = httpClient.getNextResponse();
+ Preconditions.checkState(resp.getStatus().code() == 200,
+ "Cannot fetch Flink cluster status!");
+ return MAPPER.readValue(resp.getContent(), JobIdsWithStatusOverview.class);
+ }
+
+ private static JobDetailsInfo getJobDetailInfo(HttpTestClient httpClient, String jobId)
+ throws TimeoutException, InterruptedException, IOException {
+ httpClient.sendGetRequest("/jobs/" + jobId, TEST_TIMEOUT);
+ HttpTestClient.SimpleHttpResponse resp = httpClient.getNextResponse();
+ Preconditions.checkState(resp.getStatus().code() == 200,
+ "Cannot fetch job detail information for job " + jobId);
+ return MAPPER.readValue(resp.getContent(), JobDetailsInfo.class);
+ }
+
+ private static TaskManagersInfo getTaskManagers(HttpTestClient httpClient)
+ throws TimeoutException, InterruptedException, IOException {
+ httpClient.sendGetRequest("/taskmanagers", TEST_TIMEOUT);
+ HttpTestClient.SimpleHttpResponse resp = httpClient.getNextResponse();
+ Preconditions.checkState(resp.getStatus().code() == 200,
+ "Cannot fetch task manager status!");
+ return MAPPER.readValue(resp.getContent(), TaskManagersInfo.class);
+ }
+
+ private static void testMonitoringEndpointSpecs(HttpTestClient httpClient, MessageHeaders spec,
+ Map<String, String> parameterMap) {
+ try {
+ HttpMethodWrapper method = spec.getHttpMethod();
+ String path = getRestEndpointPath(spec.getTargetRestEndpointURL(), parameterMap);
+ if (spec.getRequestClass() == EmptyRequestBody.class) {
--- End diff --
I would filter out unsupported requests before calling `testMonitoringEndpointSpecs`.
---
[GitHub] flink pull request #5849: [FLINK-8986][e2e-test] Flink end to end test REST ...
Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:
https://github.com/apache/flink/pull/5849#discussion_r181681917
--- Diff: flink-end-to-end-tests/flink-rest-api-test/src/main/java/org/apache/flink/runtime/rest/tests/PeriodicStreamingJob.java ---
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.tests;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.streaming.api.TimeCharacteristic;
+import org.apache.flink.streaming.api.checkpoint.ListCheckpointed;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.types.Row;
+
+import java.util.Collections;
+import java.util.List;
+
+/**
+ */
--- End diff --
proper javadoc missing
---
[GitHub] flink pull request #5849: [FLINK-8986][e2e-test] Flink end to end test REST ...
Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:
https://github.com/apache/flink/pull/5849#discussion_r181685328
--- Diff: flink-end-to-end-tests/flink-rest-api-test/src/main/java/org/apache/flink/runtime/rest/tests/RestApiTestSuite.java ---
@@ -0,0 +1,363 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.tests;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.RestOptions;
+import org.apache.flink.runtime.blob.TransientBlobKey;
+import org.apache.flink.runtime.blob.TransientBlobService;
+import org.apache.flink.runtime.concurrent.Executors;
+import org.apache.flink.runtime.dispatcher.DispatcherGateway;
+import org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint;
+import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.leaderelection.LeaderContender;
+import org.apache.flink.runtime.leaderelection.LeaderElectionService;
+import org.apache.flink.runtime.messages.webmonitor.JobIdsWithStatusOverview;
+import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
+import org.apache.flink.runtime.rest.HttpMethodWrapper;
+import org.apache.flink.runtime.rest.RestServerEndpoint;
+import org.apache.flink.runtime.rest.RestServerEndpointConfiguration;
+import org.apache.flink.runtime.rest.handler.RestHandlerConfiguration;
+import org.apache.flink.runtime.rest.handler.RestHandlerSpecification;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.MessageHeaders;
+import org.apache.flink.runtime.rest.messages.job.JobDetailsInfo;
+import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagersInfo;
+import org.apache.flink.runtime.rpc.FatalErrorHandler;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+import org.apache.flink.runtime.webmonitor.retriever.MetricQueryServiceRetriever;
+import org.apache.flink.runtime.webmonitor.testutils.HttpTestClient;
+import org.apache.flink.util.ConfigurationException;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandler;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+
+import scala.concurrent.duration.FiniteDuration;
+
+/**
+ * Rest API test suite.
+ */
+public class RestApiTestSuite {
+ private static final FiniteDuration TEST_TIMEOUT = new FiniteDuration(10L, TimeUnit.SECONDS);
+ private static final ObjectMapper MAPPER = new ObjectMapper();
+
+ private static int testSuccessCount = 0;
+ private static int testFailureCount = 0;
+ private static int testSkipCount = 0;
+
+ public static void main(String[] args) throws Exception {
+
+ ParameterTool params = ParameterTool.fromArgs(args);
+ final String host = params.get("host", "localhost");
+ final int port = params.getInt("port", 8081);
+ final HttpTestClient httpClient = new HttpTestClient(host, port);
+
+ // Validate Flink cluster is running
+ JobIdsWithStatusOverview jobOverview = getJobOverview(httpClient);
+
+ // Get necessary parameters for testing GET endpoints
+ Map<String, String> parameterMap = getParameterMaps(httpClient, jobOverview);
+
+ // Get list of endpoints
+ List<MessageHeaders> specs = new E2ETestDispatcherRestEndpoint().getSpecs();
+ specs.forEach(spec -> testMonitoringEndpointSpecs(httpClient, spec, parameterMap));
+
+ if (testFailureCount != 0) {
+ throw new RuntimeException("There are test failures. Success: " + testSuccessCount +
+ " Failures: " + testFailureCount + " Skipped: " + testSkipCount);
+ }
+ }
+
+ @SuppressWarnings("ConstantConditions")
+ private static Map<String, String> getParameterMaps(HttpTestClient httpClient,
+ JobIdsWithStatusOverview jobOverview) throws InterruptedException, IOException, TimeoutException {
+ // Get necessary parameters used for all REST API testings.
+ final Map<String, String> parameterMap = new HashMap<>();
+ Preconditions.checkState(jobOverview.getJobsWithStatus().stream()
+ .filter(jobIdWithStatus -> jobIdWithStatus.getJobStatus() == JobStatus.RUNNING)
+ .count() >= 1, "Cannot found active running jobs, discontinuing test!");
+ String jobId = jobOverview.getJobsWithStatus().stream()
+ .filter(jobIdWithStatus -> jobIdWithStatus.getJobStatus() == JobStatus.RUNNING)
+ .findFirst().get().getJobId().toString();
+ parameterMap.put(":jobid", jobId);
+
+ JobDetailsInfo jobDetailsInfo = getJobDetailInfo(httpClient, jobId);
+ String vertexId = jobDetailsInfo.getJobVertexInfos().stream()
+ .findFirst().get().getJobVertexID().toString();
+ parameterMap.put(":vertexid", vertexId);
+ parameterMap.put(":checkpointid", "1"); // test first checkpoint
+ parameterMap.put(":subtaskindex", "0"); // test first subtask
+
+ TaskManagersInfo taskManagersInfo = getTaskManagers(httpClient);
+ String taskMgrId = taskManagersInfo.getTaskManagerInfos().stream().findFirst().get().getResourceId().toString();
+ parameterMap.put(":taskmanagerid", taskMgrId);
+ parameterMap.put(":triggerid", "");
--- End diff --
this should probably be removed.
---
[GitHub] flink issue #5849: [FLINK-8986][e2e-test][WIP] Flink end to end test REST AP...
Posted by tzulitai <gi...@git.apache.org>.
Github user tzulitai commented on the issue:
https://github.com/apache/flink/pull/5849
Hi @walterddr, what is the status of this PR? Would be nice if we can move forward with this PR (and also the CLI e2e test PR that also you opened.)
---
[GitHub] flink pull request #5849: [FLINK-8986][e2e-test] Flink end to end test REST ...
Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:
https://github.com/apache/flink/pull/5849#discussion_r181669761
--- Diff: flink-end-to-end-tests/flink-rest-api-test/src/main/java/org/apache/flink/runtime/rest/tests/RestApiTestSuite.java ---
@@ -0,0 +1,363 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.tests;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.RestOptions;
+import org.apache.flink.runtime.blob.TransientBlobKey;
+import org.apache.flink.runtime.blob.TransientBlobService;
+import org.apache.flink.runtime.concurrent.Executors;
+import org.apache.flink.runtime.dispatcher.DispatcherGateway;
+import org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint;
+import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.leaderelection.LeaderContender;
+import org.apache.flink.runtime.leaderelection.LeaderElectionService;
+import org.apache.flink.runtime.messages.webmonitor.JobIdsWithStatusOverview;
+import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
+import org.apache.flink.runtime.rest.HttpMethodWrapper;
+import org.apache.flink.runtime.rest.RestServerEndpoint;
+import org.apache.flink.runtime.rest.RestServerEndpointConfiguration;
+import org.apache.flink.runtime.rest.handler.RestHandlerConfiguration;
+import org.apache.flink.runtime.rest.handler.RestHandlerSpecification;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.MessageHeaders;
+import org.apache.flink.runtime.rest.messages.job.JobDetailsInfo;
+import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagersInfo;
+import org.apache.flink.runtime.rpc.FatalErrorHandler;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+import org.apache.flink.runtime.webmonitor.retriever.MetricQueryServiceRetriever;
+import org.apache.flink.runtime.webmonitor.testutils.HttpTestClient;
+import org.apache.flink.util.ConfigurationException;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandler;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+
+import scala.concurrent.duration.FiniteDuration;
+
+/**
+ * Rest API test suite.
+ */
+public class RestApiTestSuite {
+ private static final FiniteDuration TEST_TIMEOUT = new FiniteDuration(10L, TimeUnit.SECONDS);
+ private static final ObjectMapper MAPPER = new ObjectMapper();
+
+ private static int testSuccessCount = 0;
+ private static int testFailureCount = 0;
+ private static int testSkipCount = 0;
+
+ public static void main(String[] args) throws Exception {
+
+ ParameterTool params = ParameterTool.fromArgs(args);
+ final String host = params.get("host", "localhost");
+ final int port = params.getInt("port", 8081);
+ final HttpTestClient httpClient = new HttpTestClient(host, port);
+
+ // Validate Flink cluster is running
+ JobIdsWithStatusOverview jobOverview = getJobOverview(httpClient);
+
+ // Get necessary parameters for testing GET endpoints
+ Map<String, String> parameterMap = getParameterMaps(httpClient, jobOverview);
+
+ // Get list of endpoints
+ List<MessageHeaders> specs = new E2ETestDispatcherRestEndpoint().getSpecs();
+ specs.forEach(spec -> testMonitoringEndpointSpecs(httpClient, spec, parameterMap));
+
+ if (testFailureCount != 0) {
+ throw new RuntimeException("There are test failures. Success: " + testSuccessCount +
+ " Failures: " + testFailureCount + " Skipped: " + testSkipCount);
+ }
+ }
+
+ @SuppressWarnings("ConstantConditions")
+ private static Map<String, String> getParameterMaps(HttpTestClient httpClient,
+ JobIdsWithStatusOverview jobOverview) throws InterruptedException, IOException, TimeoutException {
+ // Get necessary parameters used for all REST API testings.
+ final Map<String, String> parameterMap = new HashMap<>();
+ Preconditions.checkState(jobOverview.getJobsWithStatus().stream()
+ .filter(jobIdWithStatus -> jobIdWithStatus.getJobStatus() == JobStatus.RUNNING)
+ .count() >= 1, "Cannot found active running jobs, discontinuing test!");
+ String jobId = jobOverview.getJobsWithStatus().stream()
+ .filter(jobIdWithStatus -> jobIdWithStatus.getJobStatus() == JobStatus.RUNNING)
+ .findFirst().get().getJobId().toString();
+ parameterMap.put(":jobid", jobId);
+
+ JobDetailsInfo jobDetailsInfo = getJobDetailInfo(httpClient, jobId);
+ String vertexId = jobDetailsInfo.getJobVertexInfos().stream()
+ .findFirst().get().getJobVertexID().toString();
+ parameterMap.put(":vertexid", vertexId);
+ parameterMap.put(":checkpointid", "1"); // test first checkpoint
+ parameterMap.put(":subtaskindex", "0"); // test first subtask
+
+ TaskManagersInfo taskManagersInfo = getTaskManagers(httpClient);
+ String taskMgrId = taskManagersInfo.getTaskManagerInfos().stream().findFirst().get().getResourceId().toString();
+ parameterMap.put(":taskmanagerid", taskMgrId);
+ parameterMap.put(":triggerid", "");
+
+ return parameterMap;
+ }
+
+ private static JobIdsWithStatusOverview getJobOverview(HttpTestClient httpClient)
+ throws TimeoutException, InterruptedException, IOException {
+ httpClient.sendGetRequest("/jobs", TEST_TIMEOUT);
+ HttpTestClient.SimpleHttpResponse resp = httpClient.getNextResponse();
+ Preconditions.checkState(resp.getStatus().code() == 200,
+ "Cannot fetch Flink cluster status!");
+ return MAPPER.readValue(resp.getContent(), JobIdsWithStatusOverview.class);
+ }
+
+ private static JobDetailsInfo getJobDetailInfo(HttpTestClient httpClient, String jobId)
+ throws TimeoutException, InterruptedException, IOException {
+ httpClient.sendGetRequest("/jobs/" + jobId, TEST_TIMEOUT);
+ HttpTestClient.SimpleHttpResponse resp = httpClient.getNextResponse();
+ Preconditions.checkState(resp.getStatus().code() == 200,
+ "Cannot fetch job detail information for job " + jobId);
+ return MAPPER.readValue(resp.getContent(), JobDetailsInfo.class);
+ }
+
+ private static TaskManagersInfo getTaskManagers(HttpTestClient httpClient)
+ throws TimeoutException, InterruptedException, IOException {
+ httpClient.sendGetRequest("/taskmanagers", TEST_TIMEOUT);
+ HttpTestClient.SimpleHttpResponse resp = httpClient.getNextResponse();
+ Preconditions.checkState(resp.getStatus().code() == 200,
+ "Cannot fetch task manager status!");
+ return MAPPER.readValue(resp.getContent(), TaskManagersInfo.class);
+ }
+
+ private static void testMonitoringEndpointSpecs(HttpTestClient httpClient, MessageHeaders spec,
+ Map<String, String> parameterMap) {
+ try {
+ HttpMethodWrapper method = spec.getHttpMethod();
+ String path = getRestEndpointPath(spec.getTargetRestEndpointURL(), parameterMap);
+ if (spec.getRequestClass() == EmptyRequestBody.class) {
+ switch (method) {
+ case GET:
+ httpClient.sendGetRequest(path, TEST_TIMEOUT);
+ break;
+ case DELETE:
+ httpClient.sendDeleteRequest(path, TEST_TIMEOUT);
+ break;
+ default:
+ throw new UnsupportedOperationException("Cannot handle REST Test for " + path +
+ " with method " + method + ". Only GET and DELETE requests are supported!");
+ }
+
+ HttpTestClient.SimpleHttpResponse resp = httpClient.getNextResponse();
+
+ Preconditions.checkState(resp.getStatus().code() == spec.getResponseStatusCode().code(),
+ "Found mismatching status code from endpoint " + path + " with method " + method +
+ ", expecting: " + spec.getResponseStatusCode().code() + ", but was: " + resp.getStatus().code());
+ // System.out.println("Found matching status for endpoint " + path + " with method " + method);
+ @SuppressWarnings("unchecked")
--- End diff --
this is unnecessary. `MessageHeaders` have the response class as a generic argument.
Change the method signature to
```
private static <P extends ResponseBody> void testMonitoringEndpointSpecs(HttpTestClient httpClient, MessageHeaders<?, P, ?> spec, Map<String, String> parameterMap)
```
---
[GitHub] flink pull request #5849: [FLINK-8986][e2e-test] Flink end to end test REST ...
Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:
https://github.com/apache/flink/pull/5849#discussion_r181687398
--- Diff: flink-end-to-end-tests/flink-rest-api-test/src/main/java/org/apache/flink/runtime/rest/tests/RestApiTestSuite.java ---
@@ -0,0 +1,363 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.tests;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.RestOptions;
+import org.apache.flink.runtime.blob.TransientBlobKey;
+import org.apache.flink.runtime.blob.TransientBlobService;
+import org.apache.flink.runtime.concurrent.Executors;
+import org.apache.flink.runtime.dispatcher.DispatcherGateway;
+import org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint;
+import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.leaderelection.LeaderContender;
+import org.apache.flink.runtime.leaderelection.LeaderElectionService;
+import org.apache.flink.runtime.messages.webmonitor.JobIdsWithStatusOverview;
+import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
+import org.apache.flink.runtime.rest.HttpMethodWrapper;
+import org.apache.flink.runtime.rest.RestServerEndpoint;
+import org.apache.flink.runtime.rest.RestServerEndpointConfiguration;
+import org.apache.flink.runtime.rest.handler.RestHandlerConfiguration;
+import org.apache.flink.runtime.rest.handler.RestHandlerSpecification;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.MessageHeaders;
+import org.apache.flink.runtime.rest.messages.job.JobDetailsInfo;
+import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagersInfo;
+import org.apache.flink.runtime.rpc.FatalErrorHandler;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+import org.apache.flink.runtime.webmonitor.retriever.MetricQueryServiceRetriever;
+import org.apache.flink.runtime.webmonitor.testutils.HttpTestClient;
+import org.apache.flink.util.ConfigurationException;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandler;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+
+import scala.concurrent.duration.FiniteDuration;
+
+/**
+ * Rest API test suite.
+ */
+public class RestApiTestSuite {
+ private static final FiniteDuration TEST_TIMEOUT = new FiniteDuration(10L, TimeUnit.SECONDS);
+ private static final ObjectMapper MAPPER = new ObjectMapper();
+
+ private static int testSuccessCount = 0;
+ private static int testFailureCount = 0;
+ private static int testSkipCount = 0;
+
+ public static void main(String[] args) throws Exception {
+
+ ParameterTool params = ParameterTool.fromArgs(args);
+ final String host = params.get("host", "localhost");
+ final int port = params.getInt("port", 8081);
+ final HttpTestClient httpClient = new HttpTestClient(host, port);
+
+ // Validate Flink cluster is running
+ JobIdsWithStatusOverview jobOverview = getJobOverview(httpClient);
+
+ // Get necessary parameters for testing GET endpoints
+ Map<String, String> parameterMap = getParameterMaps(httpClient, jobOverview);
+
+ // Get list of endpoints
+ List<MessageHeaders> specs = new E2ETestDispatcherRestEndpoint().getSpecs();
+ specs.forEach(spec -> testMonitoringEndpointSpecs(httpClient, spec, parameterMap));
+
+ if (testFailureCount != 0) {
+ throw new RuntimeException("There are test failures. Success: " + testSuccessCount +
+ " Failures: " + testFailureCount + " Skipped: " + testSkipCount);
+ }
+ }
+
+ @SuppressWarnings("ConstantConditions")
+ private static Map<String, String> getParameterMaps(HttpTestClient httpClient,
+ JobIdsWithStatusOverview jobOverview) throws InterruptedException, IOException, TimeoutException {
+ // Get necessary parameters used for all REST API testings.
+ final Map<String, String> parameterMap = new HashMap<>();
+ Preconditions.checkState(jobOverview.getJobsWithStatus().stream()
+ .filter(jobIdWithStatus -> jobIdWithStatus.getJobStatus() == JobStatus.RUNNING)
+ .count() >= 1, "Cannot found active running jobs, discontinuing test!");
+ String jobId = jobOverview.getJobsWithStatus().stream()
+ .filter(jobIdWithStatus -> jobIdWithStatus.getJobStatus() == JobStatus.RUNNING)
+ .findFirst().get().getJobId().toString();
+ parameterMap.put(":jobid", jobId);
+
+ JobDetailsInfo jobDetailsInfo = getJobDetailInfo(httpClient, jobId);
+ String vertexId = jobDetailsInfo.getJobVertexInfos().stream()
+ .findFirst().get().getJobVertexID().toString();
+ parameterMap.put(":vertexid", vertexId);
+ parameterMap.put(":checkpointid", "1"); // test first checkpoint
+ parameterMap.put(":subtaskindex", "0"); // test first subtask
+
+ TaskManagersInfo taskManagersInfo = getTaskManagers(httpClient);
+ String taskMgrId = taskManagersInfo.getTaskManagerInfos().stream().findFirst().get().getResourceId().toString();
+ parameterMap.put(":taskmanagerid", taskMgrId);
+ parameterMap.put(":triggerid", "");
+
+ return parameterMap;
+ }
+
+ private static JobIdsWithStatusOverview getJobOverview(HttpTestClient httpClient)
+ throws TimeoutException, InterruptedException, IOException {
+ httpClient.sendGetRequest("/jobs", TEST_TIMEOUT);
+ HttpTestClient.SimpleHttpResponse resp = httpClient.getNextResponse();
+ Preconditions.checkState(resp.getStatus().code() == 200,
+ "Cannot fetch Flink cluster status!");
+ return MAPPER.readValue(resp.getContent(), JobIdsWithStatusOverview.class);
+ }
+
+ private static JobDetailsInfo getJobDetailInfo(HttpTestClient httpClient, String jobId)
--- End diff --
these methods can be made less brittle my using the corresponding `MessageHeader` objects.
```
JobDetailsHeaders headers = JobDetailsHeaders.getInstance();
httpClient.sendGetRequest(headers.getTargetRestEndpointURL().replace(JobIDPathParameter.KEY, jobId), TEST_TIMEOUT);
HttpTestClient.SimpleHttpResponse resp = httpClient.getNextResponse();
Preconditions.checkState(resp.getStatus() == headers.getResponseStatusCode(),
"Cannot fetch job detail information for job " + jobId);
return MAPPER.readValue(resp.getContent(), headers.getResponseClass());
```
---
[GitHub] flink pull request #5849: [FLINK-8986][e2e-test] Flink end to end test REST ...
Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:
https://github.com/apache/flink/pull/5849#discussion_r181686235
--- Diff: flink-end-to-end-tests/flink-rest-api-test/src/main/java/org/apache/flink/runtime/rest/tests/RestApiTestSuite.java ---
@@ -0,0 +1,363 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.tests;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.RestOptions;
+import org.apache.flink.runtime.blob.TransientBlobKey;
+import org.apache.flink.runtime.blob.TransientBlobService;
+import org.apache.flink.runtime.concurrent.Executors;
+import org.apache.flink.runtime.dispatcher.DispatcherGateway;
+import org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint;
+import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.leaderelection.LeaderContender;
+import org.apache.flink.runtime.leaderelection.LeaderElectionService;
+import org.apache.flink.runtime.messages.webmonitor.JobIdsWithStatusOverview;
+import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
+import org.apache.flink.runtime.rest.HttpMethodWrapper;
+import org.apache.flink.runtime.rest.RestServerEndpoint;
+import org.apache.flink.runtime.rest.RestServerEndpointConfiguration;
+import org.apache.flink.runtime.rest.handler.RestHandlerConfiguration;
+import org.apache.flink.runtime.rest.handler.RestHandlerSpecification;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.MessageHeaders;
+import org.apache.flink.runtime.rest.messages.job.JobDetailsInfo;
+import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagersInfo;
+import org.apache.flink.runtime.rpc.FatalErrorHandler;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+import org.apache.flink.runtime.webmonitor.retriever.MetricQueryServiceRetriever;
+import org.apache.flink.runtime.webmonitor.testutils.HttpTestClient;
+import org.apache.flink.util.ConfigurationException;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandler;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+
+import scala.concurrent.duration.FiniteDuration;
+
+/**
+ * Rest API test suite.
+ */
+public class RestApiTestSuite {
+ private static final FiniteDuration TEST_TIMEOUT = new FiniteDuration(10L, TimeUnit.SECONDS);
+ private static final ObjectMapper MAPPER = new ObjectMapper();
+
+ private static int testSuccessCount = 0;
+ private static int testFailureCount = 0;
+ private static int testSkipCount = 0;
+
+ public static void main(String[] args) throws Exception {
+
+ ParameterTool params = ParameterTool.fromArgs(args);
+ final String host = params.get("host", "localhost");
+ final int port = params.getInt("port", 8081);
+ final HttpTestClient httpClient = new HttpTestClient(host, port);
+
+ // Validate Flink cluster is running
+ JobIdsWithStatusOverview jobOverview = getJobOverview(httpClient);
+
+ // Get necessary parameters for testing GET endpoints
+ Map<String, String> parameterMap = getParameterMaps(httpClient, jobOverview);
+
+ // Get list of endpoints
+ List<MessageHeaders> specs = new E2ETestDispatcherRestEndpoint().getSpecs();
+ specs.forEach(spec -> testMonitoringEndpointSpecs(httpClient, spec, parameterMap));
+
+ if (testFailureCount != 0) {
+ throw new RuntimeException("There are test failures. Success: " + testSuccessCount +
+ " Failures: " + testFailureCount + " Skipped: " + testSkipCount);
+ }
+ }
+
+ @SuppressWarnings("ConstantConditions")
+ private static Map<String, String> getParameterMaps(HttpTestClient httpClient,
+ JobIdsWithStatusOverview jobOverview) throws InterruptedException, IOException, TimeoutException {
+ // Get necessary parameters used for all REST API testings.
+ final Map<String, String> parameterMap = new HashMap<>();
+ Preconditions.checkState(jobOverview.getJobsWithStatus().stream()
--- End diff --
May be better to make this an assertion, effectively we are relying on the REST API functionality here already.
---
[GitHub] flink pull request #5849: [FLINK-8986][e2e-test] Flink end to end test REST ...
Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:
https://github.com/apache/flink/pull/5849#discussion_r181669905
--- Diff: flink-end-to-end-tests/flink-rest-api-test/src/main/java/org/apache/flink/runtime/rest/tests/RestApiTestSuite.java ---
@@ -0,0 +1,363 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.tests;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.RestOptions;
+import org.apache.flink.runtime.blob.TransientBlobKey;
+import org.apache.flink.runtime.blob.TransientBlobService;
+import org.apache.flink.runtime.concurrent.Executors;
+import org.apache.flink.runtime.dispatcher.DispatcherGateway;
+import org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint;
+import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.leaderelection.LeaderContender;
+import org.apache.flink.runtime.leaderelection.LeaderElectionService;
+import org.apache.flink.runtime.messages.webmonitor.JobIdsWithStatusOverview;
+import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
+import org.apache.flink.runtime.rest.HttpMethodWrapper;
+import org.apache.flink.runtime.rest.RestServerEndpoint;
+import org.apache.flink.runtime.rest.RestServerEndpointConfiguration;
+import org.apache.flink.runtime.rest.handler.RestHandlerConfiguration;
+import org.apache.flink.runtime.rest.handler.RestHandlerSpecification;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.MessageHeaders;
+import org.apache.flink.runtime.rest.messages.job.JobDetailsInfo;
+import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagersInfo;
+import org.apache.flink.runtime.rpc.FatalErrorHandler;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+import org.apache.flink.runtime.webmonitor.retriever.MetricQueryServiceRetriever;
+import org.apache.flink.runtime.webmonitor.testutils.HttpTestClient;
+import org.apache.flink.util.ConfigurationException;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandler;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+
+import scala.concurrent.duration.FiniteDuration;
+
+/**
+ * Rest API test suite.
+ */
+public class RestApiTestSuite {
+ private static final FiniteDuration TEST_TIMEOUT = new FiniteDuration(10L, TimeUnit.SECONDS);
+ private static final ObjectMapper MAPPER = new ObjectMapper();
+
+ private static int testSuccessCount = 0;
+ private static int testFailureCount = 0;
+ private static int testSkipCount = 0;
+
+ public static void main(String[] args) throws Exception {
+
+ ParameterTool params = ParameterTool.fromArgs(args);
+ final String host = params.get("host", "localhost");
+ final int port = params.getInt("port", 8081);
+ final HttpTestClient httpClient = new HttpTestClient(host, port);
+
+ // Validate Flink cluster is running
+ JobIdsWithStatusOverview jobOverview = getJobOverview(httpClient);
+
+ // Get necessary parameters for testing GET endpoints
+ Map<String, String> parameterMap = getParameterMaps(httpClient, jobOverview);
+
+ // Get list of endpoints
+ List<MessageHeaders> specs = new E2ETestDispatcherRestEndpoint().getSpecs();
+ specs.forEach(spec -> testMonitoringEndpointSpecs(httpClient, spec, parameterMap));
+
+ if (testFailureCount != 0) {
+ throw new RuntimeException("There are test failures. Success: " + testSuccessCount +
+ " Failures: " + testFailureCount + " Skipped: " + testSkipCount);
+ }
+ }
+
+ @SuppressWarnings("ConstantConditions")
+ private static Map<String, String> getParameterMaps(HttpTestClient httpClient,
+ JobIdsWithStatusOverview jobOverview) throws InterruptedException, IOException, TimeoutException {
+ // Get necessary parameters used for all REST API testings.
+ final Map<String, String> parameterMap = new HashMap<>();
+ Preconditions.checkState(jobOverview.getJobsWithStatus().stream()
+ .filter(jobIdWithStatus -> jobIdWithStatus.getJobStatus() == JobStatus.RUNNING)
+ .count() >= 1, "Cannot found active running jobs, discontinuing test!");
+ String jobId = jobOverview.getJobsWithStatus().stream()
+ .filter(jobIdWithStatus -> jobIdWithStatus.getJobStatus() == JobStatus.RUNNING)
+ .findFirst().get().getJobId().toString();
+ parameterMap.put(":jobid", jobId);
+
+ JobDetailsInfo jobDetailsInfo = getJobDetailInfo(httpClient, jobId);
+ String vertexId = jobDetailsInfo.getJobVertexInfos().stream()
+ .findFirst().get().getJobVertexID().toString();
+ parameterMap.put(":vertexid", vertexId);
+ parameterMap.put(":checkpointid", "1"); // test first checkpoint
+ parameterMap.put(":subtaskindex", "0"); // test first subtask
+
+ TaskManagersInfo taskManagersInfo = getTaskManagers(httpClient);
+ String taskMgrId = taskManagersInfo.getTaskManagerInfos().stream().findFirst().get().getResourceId().toString();
+ parameterMap.put(":taskmanagerid", taskMgrId);
+ parameterMap.put(":triggerid", "");
+
+ return parameterMap;
+ }
+
+ private static JobIdsWithStatusOverview getJobOverview(HttpTestClient httpClient)
+ throws TimeoutException, InterruptedException, IOException {
+ httpClient.sendGetRequest("/jobs", TEST_TIMEOUT);
+ HttpTestClient.SimpleHttpResponse resp = httpClient.getNextResponse();
+ Preconditions.checkState(resp.getStatus().code() == 200,
+ "Cannot fetch Flink cluster status!");
+ return MAPPER.readValue(resp.getContent(), JobIdsWithStatusOverview.class);
+ }
+
+ private static JobDetailsInfo getJobDetailInfo(HttpTestClient httpClient, String jobId)
+ throws TimeoutException, InterruptedException, IOException {
+ httpClient.sendGetRequest("/jobs/" + jobId, TEST_TIMEOUT);
+ HttpTestClient.SimpleHttpResponse resp = httpClient.getNextResponse();
+ Preconditions.checkState(resp.getStatus().code() == 200,
+ "Cannot fetch job detail information for job " + jobId);
+ return MAPPER.readValue(resp.getContent(), JobDetailsInfo.class);
+ }
+
+ private static TaskManagersInfo getTaskManagers(HttpTestClient httpClient)
+ throws TimeoutException, InterruptedException, IOException {
+ httpClient.sendGetRequest("/taskmanagers", TEST_TIMEOUT);
+ HttpTestClient.SimpleHttpResponse resp = httpClient.getNextResponse();
+ Preconditions.checkState(resp.getStatus().code() == 200,
+ "Cannot fetch task manager status!");
+ return MAPPER.readValue(resp.getContent(), TaskManagersInfo.class);
+ }
+
+ private static void testMonitoringEndpointSpecs(HttpTestClient httpClient, MessageHeaders spec,
+ Map<String, String> parameterMap) {
+ try {
+ HttpMethodWrapper method = spec.getHttpMethod();
+ String path = getRestEndpointPath(spec.getTargetRestEndpointURL(), parameterMap);
+ if (spec.getRequestClass() == EmptyRequestBody.class) {
+ switch (method) {
+ case GET:
+ httpClient.sendGetRequest(path, TEST_TIMEOUT);
+ break;
+ case DELETE:
+ httpClient.sendDeleteRequest(path, TEST_TIMEOUT);
+ break;
+ default:
+ throw new UnsupportedOperationException("Cannot handle REST Test for " + path +
+ " with method " + method + ". Only GET and DELETE requests are supported!");
+ }
+
+ HttpTestClient.SimpleHttpResponse resp = httpClient.getNextResponse();
+
+ Preconditions.checkState(resp.getStatus().code() == spec.getResponseStatusCode().code(),
+ "Found mismatching status code from endpoint " + path + " with method " + method +
+ ", expecting: " + spec.getResponseStatusCode().code() + ", but was: " + resp.getStatus().code());
+ // System.out.println("Found matching status for endpoint " + path + " with method " + method);
--- End diff --
remove
---
[GitHub] flink pull request #5849: [FLINK-8986][e2e-test] Flink end to end test REST ...
Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:
https://github.com/apache/flink/pull/5849#discussion_r181668393
--- Diff: flink-end-to-end-tests/flink-rest-api-test/src/main/java/org/apache/flink/runtime/rest/tests/RestApiTestSuite.java ---
@@ -0,0 +1,363 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.tests;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.RestOptions;
+import org.apache.flink.runtime.blob.TransientBlobKey;
+import org.apache.flink.runtime.blob.TransientBlobService;
+import org.apache.flink.runtime.concurrent.Executors;
+import org.apache.flink.runtime.dispatcher.DispatcherGateway;
+import org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint;
+import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.leaderelection.LeaderContender;
+import org.apache.flink.runtime.leaderelection.LeaderElectionService;
+import org.apache.flink.runtime.messages.webmonitor.JobIdsWithStatusOverview;
+import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
+import org.apache.flink.runtime.rest.HttpMethodWrapper;
+import org.apache.flink.runtime.rest.RestServerEndpoint;
+import org.apache.flink.runtime.rest.RestServerEndpointConfiguration;
+import org.apache.flink.runtime.rest.handler.RestHandlerConfiguration;
+import org.apache.flink.runtime.rest.handler.RestHandlerSpecification;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.MessageHeaders;
+import org.apache.flink.runtime.rest.messages.job.JobDetailsInfo;
+import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagersInfo;
+import org.apache.flink.runtime.rpc.FatalErrorHandler;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+import org.apache.flink.runtime.webmonitor.retriever.MetricQueryServiceRetriever;
+import org.apache.flink.runtime.webmonitor.testutils.HttpTestClient;
+import org.apache.flink.util.ConfigurationException;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandler;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+
+import scala.concurrent.duration.FiniteDuration;
+
+/**
+ * Rest API test suite.
+ */
+public class RestApiTestSuite {
+ private static final FiniteDuration TEST_TIMEOUT = new FiniteDuration(10L, TimeUnit.SECONDS);
+ private static final ObjectMapper MAPPER = new ObjectMapper();
+
+ private static int testSuccessCount = 0;
+ private static int testFailureCount = 0;
+ private static int testSkipCount = 0;
+
+ public static void main(String[] args) throws Exception {
+
+ ParameterTool params = ParameterTool.fromArgs(args);
+ final String host = params.get("host", "localhost");
+ final int port = params.getInt("port", 8081);
+ final HttpTestClient httpClient = new HttpTestClient(host, port);
+
+ // Validate Flink cluster is running
+ JobIdsWithStatusOverview jobOverview = getJobOverview(httpClient);
+
+ // Get necessary parameters for testing GET endpoints
+ Map<String, String> parameterMap = getParameterMaps(httpClient, jobOverview);
+
+ // Get list of endpoints
+ List<MessageHeaders> specs = new E2ETestDispatcherRestEndpoint().getSpecs();
+ specs.forEach(spec -> testMonitoringEndpointSpecs(httpClient, spec, parameterMap));
+
+ if (testFailureCount != 0) {
+ throw new RuntimeException("There are test failures. Success: " + testSuccessCount +
+ " Failures: " + testFailureCount + " Skipped: " + testSkipCount);
+ }
+ }
+
+ @SuppressWarnings("ConstantConditions")
+ private static Map<String, String> getParameterMaps(HttpTestClient httpClient,
+ JobIdsWithStatusOverview jobOverview) throws InterruptedException, IOException, TimeoutException {
+ // Get necessary parameters used for all REST API testings.
+ final Map<String, String> parameterMap = new HashMap<>();
+ Preconditions.checkState(jobOverview.getJobsWithStatus().stream()
+ .filter(jobIdWithStatus -> jobIdWithStatus.getJobStatus() == JobStatus.RUNNING)
+ .count() >= 1, "Cannot found active running jobs, discontinuing test!");
+ String jobId = jobOverview.getJobsWithStatus().stream()
+ .filter(jobIdWithStatus -> jobIdWithStatus.getJobStatus() == JobStatus.RUNNING)
+ .findFirst().get().getJobId().toString();
--- End diff --
use the Optional returned by `findFirst` instead of checking the condition beforehand.
---
[GitHub] flink pull request #5849: [FLINK-8986][e2e-test] Flink end to end test REST ...
Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:
https://github.com/apache/flink/pull/5849#discussion_r181688711
--- Diff: flink-end-to-end-tests/test-scripts/test_rest_api.sh ---
@@ -0,0 +1,44 @@
+#!/usr/bin/env bash
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+source "$(dirname "$0")"/common.sh
+
+TEST_JAR_JAR=$TEST_INFRA_DIR/../../flink-end-to-end-tests/flink-rest-api-test/target/PeriodicStreamingJob.jar
+TEST_PROGRAM_JAR=$TEST_INFRA_DIR/../../flink-end-to-end-tests/flink-rest-api-test/target/RestApiTestSuite.jar
+
+echo "Run Rest-Api-Test Program"
+
+start_cluster
+$FLINK_DIR/bin/taskmanager.sh start
+$FLINK_DIR/bin/taskmanager.sh start
+$FLINK_DIR/bin/taskmanager.sh start
+
+# Start periodic streaming job to test against
+$FLINK_DIR/bin/flink run -p 4 $TEST_JAR_JAR -outputPath file://${TEST_DATA_DIR}/out/result &
+
+# Wait for the job to come up
+sleep 5s
--- End diff --
we can handle this more efficiently in the java program by polling the REST API
---
[GitHub] flink pull request #5849: [FLINK-8986][e2e-test] Flink end to end test REST ...
Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:
https://github.com/apache/flink/pull/5849#discussion_r181687083
--- Diff: flink-end-to-end-tests/flink-rest-api-test/src/main/java/org/apache/flink/runtime/rest/tests/RestApiTestSuite.java ---
@@ -0,0 +1,363 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.tests;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.RestOptions;
+import org.apache.flink.runtime.blob.TransientBlobKey;
+import org.apache.flink.runtime.blob.TransientBlobService;
+import org.apache.flink.runtime.concurrent.Executors;
+import org.apache.flink.runtime.dispatcher.DispatcherGateway;
+import org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint;
+import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.leaderelection.LeaderContender;
+import org.apache.flink.runtime.leaderelection.LeaderElectionService;
+import org.apache.flink.runtime.messages.webmonitor.JobIdsWithStatusOverview;
+import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
+import org.apache.flink.runtime.rest.HttpMethodWrapper;
+import org.apache.flink.runtime.rest.RestServerEndpoint;
+import org.apache.flink.runtime.rest.RestServerEndpointConfiguration;
+import org.apache.flink.runtime.rest.handler.RestHandlerConfiguration;
+import org.apache.flink.runtime.rest.handler.RestHandlerSpecification;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.MessageHeaders;
+import org.apache.flink.runtime.rest.messages.job.JobDetailsInfo;
+import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagersInfo;
+import org.apache.flink.runtime.rpc.FatalErrorHandler;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+import org.apache.flink.runtime.webmonitor.retriever.MetricQueryServiceRetriever;
+import org.apache.flink.runtime.webmonitor.testutils.HttpTestClient;
+import org.apache.flink.util.ConfigurationException;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandler;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+
+import scala.concurrent.duration.FiniteDuration;
+
+/**
+ * Rest API test suite.
+ */
+public class RestApiTestSuite {
+ private static final FiniteDuration TEST_TIMEOUT = new FiniteDuration(10L, TimeUnit.SECONDS);
+ private static final ObjectMapper MAPPER = new ObjectMapper();
+
+ private static int testSuccessCount = 0;
+ private static int testFailureCount = 0;
+ private static int testSkipCount = 0;
+
+ public static void main(String[] args) throws Exception {
+
+ ParameterTool params = ParameterTool.fromArgs(args);
+ final String host = params.get("host", "localhost");
+ final int port = params.getInt("port", 8081);
+ final HttpTestClient httpClient = new HttpTestClient(host, port);
+
+ // Validate Flink cluster is running
+ JobIdsWithStatusOverview jobOverview = getJobOverview(httpClient);
+
+ // Get necessary parameters for testing GET endpoints
+ Map<String, String> parameterMap = getParameterMaps(httpClient, jobOverview);
+
+ // Get list of endpoints
+ List<MessageHeaders> specs = new E2ETestDispatcherRestEndpoint().getSpecs();
+ specs.forEach(spec -> testMonitoringEndpointSpecs(httpClient, spec, parameterMap));
+
+ if (testFailureCount != 0) {
+ throw new RuntimeException("There are test failures. Success: " + testSuccessCount +
+ " Failures: " + testFailureCount + " Skipped: " + testSkipCount);
+ }
+ }
+
+ @SuppressWarnings("ConstantConditions")
+ private static Map<String, String> getParameterMaps(HttpTestClient httpClient,
+ JobIdsWithStatusOverview jobOverview) throws InterruptedException, IOException, TimeoutException {
+ // Get necessary parameters used for all REST API testings.
+ final Map<String, String> parameterMap = new HashMap<>();
+ Preconditions.checkState(jobOverview.getJobsWithStatus().stream()
+ .filter(jobIdWithStatus -> jobIdWithStatus.getJobStatus() == JobStatus.RUNNING)
+ .count() >= 1, "Cannot found active running jobs, discontinuing test!");
+ String jobId = jobOverview.getJobsWithStatus().stream()
+ .filter(jobIdWithStatus -> jobIdWithStatus.getJobStatus() == JobStatus.RUNNING)
+ .findFirst().get().getJobId().toString();
+ parameterMap.put(":jobid", jobId);
+
+ JobDetailsInfo jobDetailsInfo = getJobDetailInfo(httpClient, jobId);
+ String vertexId = jobDetailsInfo.getJobVertexInfos().stream()
+ .findFirst().get().getJobVertexID().toString();
+ parameterMap.put(":vertexid", vertexId);
+ parameterMap.put(":checkpointid", "1"); // test first checkpoint
+ parameterMap.put(":subtaskindex", "0"); // test first subtask
+
+ TaskManagersInfo taskManagersInfo = getTaskManagers(httpClient);
+ String taskMgrId = taskManagersInfo.getTaskManagerInfos().stream().findFirst().get().getResourceId().toString();
+ parameterMap.put(":taskmanagerid", taskMgrId);
+ parameterMap.put(":triggerid", "");
+
+ return parameterMap;
+ }
+
+ private static JobIdsWithStatusOverview getJobOverview(HttpTestClient httpClient)
+ throws TimeoutException, InterruptedException, IOException {
+ httpClient.sendGetRequest("/jobs", TEST_TIMEOUT);
+ HttpTestClient.SimpleHttpResponse resp = httpClient.getNextResponse();
+ Preconditions.checkState(resp.getStatus().code() == 200,
+ "Cannot fetch Flink cluster status!");
+ return MAPPER.readValue(resp.getContent(), JobIdsWithStatusOverview.class);
+ }
+
+ private static JobDetailsInfo getJobDetailInfo(HttpTestClient httpClient, String jobId)
+ throws TimeoutException, InterruptedException, IOException {
+ httpClient.sendGetRequest("/jobs/" + jobId, TEST_TIMEOUT);
--- End diff --
indentation is off
---
[GitHub] flink pull request #5849: [FLINK-8986][e2e-test] Flink end to end test REST ...
Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:
https://github.com/apache/flink/pull/5849#discussion_r181687893
--- Diff: flink-end-to-end-tests/flink-rest-api-test/src/main/java/org/apache/flink/runtime/rest/tests/RestApiTestSuite.java ---
@@ -0,0 +1,363 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.tests;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.RestOptions;
+import org.apache.flink.runtime.blob.TransientBlobKey;
+import org.apache.flink.runtime.blob.TransientBlobService;
+import org.apache.flink.runtime.concurrent.Executors;
+import org.apache.flink.runtime.dispatcher.DispatcherGateway;
+import org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint;
+import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.leaderelection.LeaderContender;
+import org.apache.flink.runtime.leaderelection.LeaderElectionService;
+import org.apache.flink.runtime.messages.webmonitor.JobIdsWithStatusOverview;
+import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
+import org.apache.flink.runtime.rest.HttpMethodWrapper;
+import org.apache.flink.runtime.rest.RestServerEndpoint;
+import org.apache.flink.runtime.rest.RestServerEndpointConfiguration;
+import org.apache.flink.runtime.rest.handler.RestHandlerConfiguration;
+import org.apache.flink.runtime.rest.handler.RestHandlerSpecification;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.MessageHeaders;
+import org.apache.flink.runtime.rest.messages.job.JobDetailsInfo;
+import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagersInfo;
+import org.apache.flink.runtime.rpc.FatalErrorHandler;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+import org.apache.flink.runtime.webmonitor.retriever.MetricQueryServiceRetriever;
+import org.apache.flink.runtime.webmonitor.testutils.HttpTestClient;
+import org.apache.flink.util.ConfigurationException;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandler;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+
+import scala.concurrent.duration.FiniteDuration;
+
+/**
+ * Rest API test suite.
+ */
+public class RestApiTestSuite {
+ private static final FiniteDuration TEST_TIMEOUT = new FiniteDuration(10L, TimeUnit.SECONDS);
+ private static final ObjectMapper MAPPER = new ObjectMapper();
+
+ private static int testSuccessCount = 0;
+ private static int testFailureCount = 0;
+ private static int testSkipCount = 0;
+
+ public static void main(String[] args) throws Exception {
+
+ ParameterTool params = ParameterTool.fromArgs(args);
+ final String host = params.get("host", "localhost");
+ final int port = params.getInt("port", 8081);
+ final HttpTestClient httpClient = new HttpTestClient(host, port);
+
+ // Validate Flink cluster is running
+ JobIdsWithStatusOverview jobOverview = getJobOverview(httpClient);
+
+ // Get necessary parameters for testing GET endpoints
+ Map<String, String> parameterMap = getParameterMaps(httpClient, jobOverview);
+
+ // Get list of endpoints
+ List<MessageHeaders> specs = new E2ETestDispatcherRestEndpoint().getSpecs();
+ specs.forEach(spec -> testMonitoringEndpointSpecs(httpClient, spec, parameterMap));
+
+ if (testFailureCount != 0) {
+ throw new RuntimeException("There are test failures. Success: " + testSuccessCount +
+ " Failures: " + testFailureCount + " Skipped: " + testSkipCount);
+ }
+ }
+
+ @SuppressWarnings("ConstantConditions")
+ private static Map<String, String> getParameterMaps(HttpTestClient httpClient,
+ JobIdsWithStatusOverview jobOverview) throws InterruptedException, IOException, TimeoutException {
+ // Get necessary parameters used for all REST API testings.
+ final Map<String, String> parameterMap = new HashMap<>();
+ Preconditions.checkState(jobOverview.getJobsWithStatus().stream()
+ .filter(jobIdWithStatus -> jobIdWithStatus.getJobStatus() == JobStatus.RUNNING)
+ .count() >= 1, "Cannot found active running jobs, discontinuing test!");
+ String jobId = jobOverview.getJobsWithStatus().stream()
+ .filter(jobIdWithStatus -> jobIdWithStatus.getJobStatus() == JobStatus.RUNNING)
+ .findFirst().get().getJobId().toString();
+ parameterMap.put(":jobid", jobId);
+
+ JobDetailsInfo jobDetailsInfo = getJobDetailInfo(httpClient, jobId);
+ String vertexId = jobDetailsInfo.getJobVertexInfos().stream()
+ .findFirst().get().getJobVertexID().toString();
+ parameterMap.put(":vertexid", vertexId);
+ parameterMap.put(":checkpointid", "1"); // test first checkpoint
+ parameterMap.put(":subtaskindex", "0"); // test first subtask
+
+ TaskManagersInfo taskManagersInfo = getTaskManagers(httpClient);
+ String taskMgrId = taskManagersInfo.getTaskManagerInfos().stream().findFirst().get().getResourceId().toString();
+ parameterMap.put(":taskmanagerid", taskMgrId);
+ parameterMap.put(":triggerid", "");
+
+ return parameterMap;
+ }
+
+ private static JobIdsWithStatusOverview getJobOverview(HttpTestClient httpClient)
+ throws TimeoutException, InterruptedException, IOException {
+ httpClient.sendGetRequest("/jobs", TEST_TIMEOUT);
+ HttpTestClient.SimpleHttpResponse resp = httpClient.getNextResponse();
+ Preconditions.checkState(resp.getStatus().code() == 200,
+ "Cannot fetch Flink cluster status!");
+ return MAPPER.readValue(resp.getContent(), JobIdsWithStatusOverview.class);
+ }
+
+ private static JobDetailsInfo getJobDetailInfo(HttpTestClient httpClient, String jobId)
+ throws TimeoutException, InterruptedException, IOException {
+ httpClient.sendGetRequest("/jobs/" + jobId, TEST_TIMEOUT);
+ HttpTestClient.SimpleHttpResponse resp = httpClient.getNextResponse();
+ Preconditions.checkState(resp.getStatus().code() == 200,
+ "Cannot fetch job detail information for job " + jobId);
+ return MAPPER.readValue(resp.getContent(), JobDetailsInfo.class);
+ }
+
+ private static TaskManagersInfo getTaskManagers(HttpTestClient httpClient)
+ throws TimeoutException, InterruptedException, IOException {
+ httpClient.sendGetRequest("/taskmanagers", TEST_TIMEOUT);
+ HttpTestClient.SimpleHttpResponse resp = httpClient.getNextResponse();
+ Preconditions.checkState(resp.getStatus().code() == 200,
+ "Cannot fetch task manager status!");
+ return MAPPER.readValue(resp.getContent(), TaskManagersInfo.class);
+ }
+
+ private static void testMonitoringEndpointSpecs(HttpTestClient httpClient, MessageHeaders spec,
+ Map<String, String> parameterMap) {
+ try {
+ HttpMethodWrapper method = spec.getHttpMethod();
+ String path = getRestEndpointPath(spec.getTargetRestEndpointURL(), parameterMap);
+ if (spec.getRequestClass() == EmptyRequestBody.class) {
+ switch (method) {
+ case GET:
+ httpClient.sendGetRequest(path, TEST_TIMEOUT);
+ break;
+ case DELETE:
+ httpClient.sendDeleteRequest(path, TEST_TIMEOUT);
+ break;
+ default:
+ throw new UnsupportedOperationException("Cannot handle REST Test for " + path +
+ " with method " + method + ". Only GET and DELETE requests are supported!");
+ }
+
+ HttpTestClient.SimpleHttpResponse resp = httpClient.getNextResponse();
+
+ Preconditions.checkState(resp.getStatus().code() == spec.getResponseStatusCode().code(),
+ "Found mismatching status code from endpoint " + path + " with method " + method +
+ ", expecting: " + spec.getResponseStatusCode().code() + ", but was: " + resp.getStatus().code());
+ // System.out.println("Found matching status for endpoint " + path + " with method " + method);
+ @SuppressWarnings("unchecked")
+ Object responseObject = MAPPER.readValue(resp.getContent(), spec.getResponseClass());
+ Preconditions.checkNotNull(responseObject);
+ testSuccessCount += 1;
+ } else {
+ throw new UnsupportedOperationException("Cannot handle REST Test for " + path +
+ " with method " + method + ". Non-empty payload is not supported!");
+ }
+ } catch (IOException |
--- End diff --
can be simplified to
```
} catch (UnsupportedOperationException e) {
testSkipCount += 1;
e.printStackTrace();
} catch (Exception e) {
testFailureCount += 1;
e.printStackTrace();
}
```
---
[GitHub] flink pull request #5849: [FLINK-8986][e2e-test] Flink end to end test REST ...
Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:
https://github.com/apache/flink/pull/5849#discussion_r181668750
--- Diff: flink-end-to-end-tests/flink-rest-api-test/src/main/java/org/apache/flink/runtime/rest/tests/RestApiTestSuite.java ---
@@ -0,0 +1,363 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.tests;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.RestOptions;
+import org.apache.flink.runtime.blob.TransientBlobKey;
+import org.apache.flink.runtime.blob.TransientBlobService;
+import org.apache.flink.runtime.concurrent.Executors;
+import org.apache.flink.runtime.dispatcher.DispatcherGateway;
+import org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint;
+import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.leaderelection.LeaderContender;
+import org.apache.flink.runtime.leaderelection.LeaderElectionService;
+import org.apache.flink.runtime.messages.webmonitor.JobIdsWithStatusOverview;
+import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
+import org.apache.flink.runtime.rest.HttpMethodWrapper;
+import org.apache.flink.runtime.rest.RestServerEndpoint;
+import org.apache.flink.runtime.rest.RestServerEndpointConfiguration;
+import org.apache.flink.runtime.rest.handler.RestHandlerConfiguration;
+import org.apache.flink.runtime.rest.handler.RestHandlerSpecification;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.MessageHeaders;
+import org.apache.flink.runtime.rest.messages.job.JobDetailsInfo;
+import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagersInfo;
+import org.apache.flink.runtime.rpc.FatalErrorHandler;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+import org.apache.flink.runtime.webmonitor.retriever.MetricQueryServiceRetriever;
+import org.apache.flink.runtime.webmonitor.testutils.HttpTestClient;
+import org.apache.flink.util.ConfigurationException;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandler;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+
+import scala.concurrent.duration.FiniteDuration;
+
+/**
+ * Rest API test suite.
+ */
+public class RestApiTestSuite {
+ private static final FiniteDuration TEST_TIMEOUT = new FiniteDuration(10L, TimeUnit.SECONDS);
+ private static final ObjectMapper MAPPER = new ObjectMapper();
+
+ private static int testSuccessCount = 0;
+ private static int testFailureCount = 0;
+ private static int testSkipCount = 0;
+
+ public static void main(String[] args) throws Exception {
+
+ ParameterTool params = ParameterTool.fromArgs(args);
+ final String host = params.get("host", "localhost");
+ final int port = params.getInt("port", 8081);
+ final HttpTestClient httpClient = new HttpTestClient(host, port);
+
+ // Validate Flink cluster is running
+ JobIdsWithStatusOverview jobOverview = getJobOverview(httpClient);
+
+ // Get necessary parameters for testing GET endpoints
+ Map<String, String> parameterMap = getParameterMaps(httpClient, jobOverview);
+
+ // Get list of endpoints
+ List<MessageHeaders> specs = new E2ETestDispatcherRestEndpoint().getSpecs();
+ specs.forEach(spec -> testMonitoringEndpointSpecs(httpClient, spec, parameterMap));
+
+ if (testFailureCount != 0) {
+ throw new RuntimeException("There are test failures. Success: " + testSuccessCount +
+ " Failures: " + testFailureCount + " Skipped: " + testSkipCount);
+ }
+ }
+
+ @SuppressWarnings("ConstantConditions")
+ private static Map<String, String> getParameterMaps(HttpTestClient httpClient,
+ JobIdsWithStatusOverview jobOverview) throws InterruptedException, IOException, TimeoutException {
+ // Get necessary parameters used for all REST API testings.
+ final Map<String, String> parameterMap = new HashMap<>();
+ Preconditions.checkState(jobOverview.getJobsWithStatus().stream()
+ .filter(jobIdWithStatus -> jobIdWithStatus.getJobStatus() == JobStatus.RUNNING)
+ .count() >= 1, "Cannot found active running jobs, discontinuing test!");
+ String jobId = jobOverview.getJobsWithStatus().stream()
+ .filter(jobIdWithStatus -> jobIdWithStatus.getJobStatus() == JobStatus.RUNNING)
+ .findFirst().get().getJobId().toString();
+ parameterMap.put(":jobid", jobId);
+
+ JobDetailsInfo jobDetailsInfo = getJobDetailInfo(httpClient, jobId);
+ String vertexId = jobDetailsInfo.getJobVertexInfos().stream()
+ .findFirst().get().getJobVertexID().toString();
+ parameterMap.put(":vertexid", vertexId);
+ parameterMap.put(":checkpointid", "1"); // test first checkpoint
+ parameterMap.put(":subtaskindex", "0"); // test first subtask
+
+ TaskManagersInfo taskManagersInfo = getTaskManagers(httpClient);
+ String taskMgrId = taskManagersInfo.getTaskManagerInfos().stream().findFirst().get().getResourceId().toString();
--- End diff --
let's get this parameter first; if this fails the job parameter retrieval will also fail.
---
[GitHub] flink pull request #5849: [FLINK-8986][e2e-test] Flink end to end test REST ...
Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:
https://github.com/apache/flink/pull/5849#discussion_r181685139
--- Diff: flink-end-to-end-tests/flink-rest-api-test/src/main/java/org/apache/flink/runtime/rest/tests/RestApiTestSuite.java ---
@@ -0,0 +1,363 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.tests;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.RestOptions;
+import org.apache.flink.runtime.blob.TransientBlobKey;
+import org.apache.flink.runtime.blob.TransientBlobService;
+import org.apache.flink.runtime.concurrent.Executors;
+import org.apache.flink.runtime.dispatcher.DispatcherGateway;
+import org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint;
+import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.leaderelection.LeaderContender;
+import org.apache.flink.runtime.leaderelection.LeaderElectionService;
+import org.apache.flink.runtime.messages.webmonitor.JobIdsWithStatusOverview;
+import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
+import org.apache.flink.runtime.rest.HttpMethodWrapper;
+import org.apache.flink.runtime.rest.RestServerEndpoint;
+import org.apache.flink.runtime.rest.RestServerEndpointConfiguration;
+import org.apache.flink.runtime.rest.handler.RestHandlerConfiguration;
+import org.apache.flink.runtime.rest.handler.RestHandlerSpecification;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.MessageHeaders;
+import org.apache.flink.runtime.rest.messages.job.JobDetailsInfo;
+import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagersInfo;
+import org.apache.flink.runtime.rpc.FatalErrorHandler;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+import org.apache.flink.runtime.webmonitor.retriever.MetricQueryServiceRetriever;
+import org.apache.flink.runtime.webmonitor.testutils.HttpTestClient;
+import org.apache.flink.util.ConfigurationException;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandler;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+
+import scala.concurrent.duration.FiniteDuration;
+
+/**
+ * Rest API test suite.
+ */
+public class RestApiTestSuite {
+ private static final FiniteDuration TEST_TIMEOUT = new FiniteDuration(10L, TimeUnit.SECONDS);
+ private static final ObjectMapper MAPPER = new ObjectMapper();
+
+ private static int testSuccessCount = 0;
+ private static int testFailureCount = 0;
+ private static int testSkipCount = 0;
+
+ public static void main(String[] args) throws Exception {
+
+ ParameterTool params = ParameterTool.fromArgs(args);
+ final String host = params.get("host", "localhost");
+ final int port = params.getInt("port", 8081);
+ final HttpTestClient httpClient = new HttpTestClient(host, port);
+
+ // Validate Flink cluster is running
+ JobIdsWithStatusOverview jobOverview = getJobOverview(httpClient);
+
+ // Get necessary parameters for testing GET endpoints
+ Map<String, String> parameterMap = getParameterMaps(httpClient, jobOverview);
+
+ // Get list of endpoints
+ List<MessageHeaders> specs = new E2ETestDispatcherRestEndpoint().getSpecs();
+ specs.forEach(spec -> testMonitoringEndpointSpecs(httpClient, spec, parameterMap));
+
+ if (testFailureCount != 0) {
+ throw new RuntimeException("There are test failures. Success: " + testSuccessCount +
+ " Failures: " + testFailureCount + " Skipped: " + testSkipCount);
+ }
+ }
+
+ @SuppressWarnings("ConstantConditions")
+ private static Map<String, String> getParameterMaps(HttpTestClient httpClient,
+ JobIdsWithStatusOverview jobOverview) throws InterruptedException, IOException, TimeoutException {
+ // Get necessary parameters used for all REST API testings.
+ final Map<String, String> parameterMap = new HashMap<>();
+ Preconditions.checkState(jobOverview.getJobsWithStatus().stream()
+ .filter(jobIdWithStatus -> jobIdWithStatus.getJobStatus() == JobStatus.RUNNING)
+ .count() >= 1, "Cannot found active running jobs, discontinuing test!");
+ String jobId = jobOverview.getJobsWithStatus().stream()
+ .filter(jobIdWithStatus -> jobIdWithStatus.getJobStatus() == JobStatus.RUNNING)
+ .findFirst().get().getJobId().toString();
+ parameterMap.put(":jobid", jobId);
+
+ JobDetailsInfo jobDetailsInfo = getJobDetailInfo(httpClient, jobId);
+ String vertexId = jobDetailsInfo.getJobVertexInfos().stream()
+ .findFirst().get().getJobVertexID().toString();
+ parameterMap.put(":vertexid", vertexId);
+ parameterMap.put(":checkpointid", "1"); // test first checkpoint
+ parameterMap.put(":subtaskindex", "0"); // test first subtask
+
+ TaskManagersInfo taskManagersInfo = getTaskManagers(httpClient);
+ String taskMgrId = taskManagersInfo.getTaskManagerInfos().stream().findFirst().get().getResourceId().toString();
+ parameterMap.put(":taskmanagerid", taskMgrId);
+ parameterMap.put(":triggerid", "");
+
+ return parameterMap;
+ }
+
+ private static JobIdsWithStatusOverview getJobOverview(HttpTestClient httpClient)
+ throws TimeoutException, InterruptedException, IOException {
+ httpClient.sendGetRequest("/jobs", TEST_TIMEOUT);
+ HttpTestClient.SimpleHttpResponse resp = httpClient.getNextResponse();
+ Preconditions.checkState(resp.getStatus().code() == 200,
+ "Cannot fetch Flink cluster status!");
+ return MAPPER.readValue(resp.getContent(), JobIdsWithStatusOverview.class);
+ }
+
+ private static JobDetailsInfo getJobDetailInfo(HttpTestClient httpClient, String jobId)
+ throws TimeoutException, InterruptedException, IOException {
+ httpClient.sendGetRequest("/jobs/" + jobId, TEST_TIMEOUT);
+ HttpTestClient.SimpleHttpResponse resp = httpClient.getNextResponse();
+ Preconditions.checkState(resp.getStatus().code() == 200,
+ "Cannot fetch job detail information for job " + jobId);
+ return MAPPER.readValue(resp.getContent(), JobDetailsInfo.class);
+ }
+
+ private static TaskManagersInfo getTaskManagers(HttpTestClient httpClient)
+ throws TimeoutException, InterruptedException, IOException {
+ httpClient.sendGetRequest("/taskmanagers", TEST_TIMEOUT);
+ HttpTestClient.SimpleHttpResponse resp = httpClient.getNextResponse();
+ Preconditions.checkState(resp.getStatus().code() == 200,
+ "Cannot fetch task manager status!");
+ return MAPPER.readValue(resp.getContent(), TaskManagersInfo.class);
+ }
+
+ private static void testMonitoringEndpointSpecs(HttpTestClient httpClient, MessageHeaders spec,
+ Map<String, String> parameterMap) {
+ try {
+ HttpMethodWrapper method = spec.getHttpMethod();
+ String path = getRestEndpointPath(spec.getTargetRestEndpointURL(), parameterMap);
+ if (spec.getRequestClass() == EmptyRequestBody.class) {
+ switch (method) {
+ case GET:
+ httpClient.sendGetRequest(path, TEST_TIMEOUT);
+ break;
+ case DELETE:
+ httpClient.sendDeleteRequest(path, TEST_TIMEOUT);
+ break;
+ default:
+ throw new UnsupportedOperationException("Cannot handle REST Test for " + path +
+ " with method " + method + ". Only GET and DELETE requests are supported!");
+ }
+
+ HttpTestClient.SimpleHttpResponse resp = httpClient.getNextResponse();
+
+ Preconditions.checkState(resp.getStatus().code() == spec.getResponseStatusCode().code(),
+ "Found mismatching status code from endpoint " + path + " with method " + method +
+ ", expecting: " + spec.getResponseStatusCode().code() + ", but was: " + resp.getStatus().code());
+ // System.out.println("Found matching status for endpoint " + path + " with method " + method);
+ @SuppressWarnings("unchecked")
+ Object responseObject = MAPPER.readValue(resp.getContent(), spec.getResponseClass());
+ Preconditions.checkNotNull(responseObject);
+ testSuccessCount += 1;
+ } else {
+ throw new UnsupportedOperationException("Cannot handle REST Test for " + path +
+ " with method " + method + ". Non-empty payload is not supported!");
+ }
+ } catch (IOException |
+ TimeoutException |
+ InterruptedException |
+ IllegalStateException |
+ NullPointerException e) {
+ testFailureCount += 1;
+ e.printStackTrace();
+ } catch (UnsupportedOperationException e) {
+ testSkipCount += 1;
+ e.printStackTrace();
+ }
+ }
+
+ /**
+ * Replace target REST endpoint with actual parameters from job launch.
+ * @param url target REST endpoint pattern from {@link MessageHeaders} specs
+ * @param parameterMap parameter replacement map
+ * @return actual REST URL literal
+ */
+ private static String getRestEndpointPath(String url, Map<String, String> parameterMap)
+ throws UnsupportedOperationException {
+ for (Map.Entry<String, String> e : parameterMap.entrySet()) {
+ if (e.getValue().equals("")) {
+ if (Pattern.compile(e.getKey()).matcher(url).find()) {
+ throw new UnsupportedOperationException("No parameter replacement found for " + e.getKey());
--- End diff --
an easier way is to check for `:` in the URL. if it exists there's a parameter that wasn't replaced.
---
[GitHub] flink issue #5849: [FLINK-8986][e2e-test][WIP] Flink end to end test REST AP...
Posted by walterddr <gi...@git.apache.org>.
Github user walterddr commented on the issue:
https://github.com/apache/flink/pull/5849
Thanks @tzulitai for the suggestion. I will close this and continue with the new PR in #6054 .
---