You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by walterddr <gi...@git.apache.org> on 2018/04/15 15:13:08 UTC

[GitHub] flink pull request #5849: [FLINK-8986][e2e-test] Flink end to end test REST ...

GitHub user walterddr opened a pull request:

    https://github.com/apache/flink/pull/5849

    [FLINK-8986][e2e-test] Flink end to end test REST API 

    ## What is the purpose of the change
    
    Adding end to end test for REST APIs for FLIP-6 generated endpoints
    
    ## Brief change log
    
      - Added `flink-rest-api-test` module in `flink-end-to-end-test` with a periodic stream job and a test suite that runs the REST API tests.
      - Added test script to run the REST API test.
    
    ## Verifying this change
    
    N/A, this is a test
    
    ## Does this pull request potentially affect one of the following parts:
    
    No
    
    ## Documentation


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/walterddr/flink FLINK-8986

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/5849.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #5849
    
----
commit 526e5ac1f9758451417972143d7117095b18f2ab
Author: Rong Rong <wa...@...>
Date:   2018-04-15T15:05:08Z

    Flink end to end test REST API with generated endpoints only

----


---

[GitHub] flink pull request #5849: [FLINK-8986][e2e-test] Flink end to end test REST ...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5849#discussion_r181688947
  
    --- Diff: flink-end-to-end-tests/flink-rest-api-test/src/main/java/org/apache/flink/runtime/rest/tests/RestApiTestSuite.java ---
    @@ -0,0 +1,363 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.rest.tests;
    +
    +import org.apache.flink.api.common.JobID;
    +import org.apache.flink.api.java.tuple.Tuple2;
    +import org.apache.flink.api.java.utils.ParameterTool;
    +import org.apache.flink.configuration.Configuration;
    +import org.apache.flink.configuration.RestOptions;
    +import org.apache.flink.runtime.blob.TransientBlobKey;
    +import org.apache.flink.runtime.blob.TransientBlobService;
    +import org.apache.flink.runtime.concurrent.Executors;
    +import org.apache.flink.runtime.dispatcher.DispatcherGateway;
    +import org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint;
    +import org.apache.flink.runtime.jobgraph.JobStatus;
    +import org.apache.flink.runtime.leaderelection.LeaderContender;
    +import org.apache.flink.runtime.leaderelection.LeaderElectionService;
    +import org.apache.flink.runtime.messages.webmonitor.JobIdsWithStatusOverview;
    +import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
    +import org.apache.flink.runtime.rest.HttpMethodWrapper;
    +import org.apache.flink.runtime.rest.RestServerEndpoint;
    +import org.apache.flink.runtime.rest.RestServerEndpointConfiguration;
    +import org.apache.flink.runtime.rest.handler.RestHandlerConfiguration;
    +import org.apache.flink.runtime.rest.handler.RestHandlerSpecification;
    +import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
    +import org.apache.flink.runtime.rest.messages.MessageHeaders;
    +import org.apache.flink.runtime.rest.messages.job.JobDetailsInfo;
    +import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagersInfo;
    +import org.apache.flink.runtime.rpc.FatalErrorHandler;
    +import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
    +import org.apache.flink.runtime.webmonitor.retriever.MetricQueryServiceRetriever;
    +import org.apache.flink.runtime.webmonitor.testutils.HttpTestClient;
    +import org.apache.flink.util.ConfigurationException;
    +import org.apache.flink.util.Preconditions;
    +
    +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
    +import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandler;
    +
    +import java.io.File;
    +import java.io.IOException;
    +import java.io.InputStream;
    +import java.util.Comparator;
    +import java.util.HashMap;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.UUID;
    +import java.util.concurrent.CompletableFuture;
    +import java.util.concurrent.Executor;
    +import java.util.concurrent.TimeUnit;
    +import java.util.concurrent.TimeoutException;
    +import java.util.regex.Pattern;
    +import java.util.stream.Collectors;
    +
    +import scala.concurrent.duration.FiniteDuration;
    +
    +/**
    + * Rest API test suite.
    + */
    +public class RestApiTestSuite {
    +	private static final FiniteDuration TEST_TIMEOUT = new FiniteDuration(10L, TimeUnit.SECONDS);
    +	private static final ObjectMapper MAPPER = new ObjectMapper();
    +
    +	private static int testSuccessCount = 0;
    +	private static int testFailureCount = 0;
    +	private static int testSkipCount = 0;
    +
    +	public static void main(String[] args) throws Exception {
    +
    +		ParameterTool params = ParameterTool.fromArgs(args);
    +		final String host = params.get("host", "localhost");
    +		final int port = params.getInt("port", 8081);
    +		final HttpTestClient httpClient = new HttpTestClient(host, port);
    +
    +		// Validate Flink cluster is running
    +		JobIdsWithStatusOverview jobOverview = getJobOverview(httpClient);
    +
    +		// Get necessary parameters for testing GET endpoints
    +		Map<String, String> parameterMap = getParameterMaps(httpClient, jobOverview);
    +
    +		// Get list of endpoints
    +		List<MessageHeaders> specs = new E2ETestDispatcherRestEndpoint().getSpecs();
    +		specs.forEach(spec -> testMonitoringEndpointSpecs(httpClient, spec, parameterMap));
    +
    +		if (testFailureCount != 0) {
    +			throw new RuntimeException("There are test failures. Success: " + testSuccessCount +
    +				" Failures: " + testFailureCount + " Skipped: " + testSkipCount);
    +		}
    +	}
    +
    +	@SuppressWarnings("ConstantConditions")
    +	private static Map<String, String> getParameterMaps(HttpTestClient httpClient,
    +		JobIdsWithStatusOverview jobOverview) throws InterruptedException, IOException, TimeoutException {
    +		// Get necessary parameters used for all REST API testings.
    +		final Map<String, String> parameterMap = new HashMap<>();
    +		Preconditions.checkState(jobOverview.getJobsWithStatus().stream()
    +			.filter(jobIdWithStatus -> jobIdWithStatus.getJobStatus() == JobStatus.RUNNING)
    +			.count() >= 1, "Cannot found active running jobs, discontinuing test!");
    +		String jobId = jobOverview.getJobsWithStatus().stream()
    +			.filter(jobIdWithStatus -> jobIdWithStatus.getJobStatus() == JobStatus.RUNNING)
    --- End diff --
    
    replace with `firstMatch`


---

[GitHub] flink pull request #5849: [FLINK-8986][e2e-test] Flink end to end test REST ...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5849#discussion_r181669868
  
    --- Diff: flink-end-to-end-tests/flink-rest-api-test/src/main/java/org/apache/flink/runtime/rest/tests/RestApiTestSuite.java ---
    @@ -0,0 +1,363 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.rest.tests;
    +
    +import org.apache.flink.api.common.JobID;
    +import org.apache.flink.api.java.tuple.Tuple2;
    +import org.apache.flink.api.java.utils.ParameterTool;
    +import org.apache.flink.configuration.Configuration;
    +import org.apache.flink.configuration.RestOptions;
    +import org.apache.flink.runtime.blob.TransientBlobKey;
    +import org.apache.flink.runtime.blob.TransientBlobService;
    +import org.apache.flink.runtime.concurrent.Executors;
    +import org.apache.flink.runtime.dispatcher.DispatcherGateway;
    +import org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint;
    +import org.apache.flink.runtime.jobgraph.JobStatus;
    +import org.apache.flink.runtime.leaderelection.LeaderContender;
    +import org.apache.flink.runtime.leaderelection.LeaderElectionService;
    +import org.apache.flink.runtime.messages.webmonitor.JobIdsWithStatusOverview;
    +import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
    +import org.apache.flink.runtime.rest.HttpMethodWrapper;
    +import org.apache.flink.runtime.rest.RestServerEndpoint;
    +import org.apache.flink.runtime.rest.RestServerEndpointConfiguration;
    +import org.apache.flink.runtime.rest.handler.RestHandlerConfiguration;
    +import org.apache.flink.runtime.rest.handler.RestHandlerSpecification;
    +import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
    +import org.apache.flink.runtime.rest.messages.MessageHeaders;
    +import org.apache.flink.runtime.rest.messages.job.JobDetailsInfo;
    +import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagersInfo;
    +import org.apache.flink.runtime.rpc.FatalErrorHandler;
    +import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
    +import org.apache.flink.runtime.webmonitor.retriever.MetricQueryServiceRetriever;
    +import org.apache.flink.runtime.webmonitor.testutils.HttpTestClient;
    +import org.apache.flink.util.ConfigurationException;
    +import org.apache.flink.util.Preconditions;
    +
    +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
    +import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandler;
    +
    +import java.io.File;
    +import java.io.IOException;
    +import java.io.InputStream;
    +import java.util.Comparator;
    +import java.util.HashMap;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.UUID;
    +import java.util.concurrent.CompletableFuture;
    +import java.util.concurrent.Executor;
    +import java.util.concurrent.TimeUnit;
    +import java.util.concurrent.TimeoutException;
    +import java.util.regex.Pattern;
    +import java.util.stream.Collectors;
    +
    +import scala.concurrent.duration.FiniteDuration;
    +
    +/**
    + * Rest API test suite.
    + */
    +public class RestApiTestSuite {
    +	private static final FiniteDuration TEST_TIMEOUT = new FiniteDuration(10L, TimeUnit.SECONDS);
    +	private static final ObjectMapper MAPPER = new ObjectMapper();
    +
    +	private static int testSuccessCount = 0;
    +	private static int testFailureCount = 0;
    +	private static int testSkipCount = 0;
    +
    +	public static void main(String[] args) throws Exception {
    +
    +		ParameterTool params = ParameterTool.fromArgs(args);
    +		final String host = params.get("host", "localhost");
    +		final int port = params.getInt("port", 8081);
    +		final HttpTestClient httpClient = new HttpTestClient(host, port);
    +
    +		// Validate Flink cluster is running
    +		JobIdsWithStatusOverview jobOverview = getJobOverview(httpClient);
    +
    +		// Get necessary parameters for testing GET endpoints
    +		Map<String, String> parameterMap = getParameterMaps(httpClient, jobOverview);
    +
    +		// Get list of endpoints
    +		List<MessageHeaders> specs = new E2ETestDispatcherRestEndpoint().getSpecs();
    +		specs.forEach(spec -> testMonitoringEndpointSpecs(httpClient, spec, parameterMap));
    +
    +		if (testFailureCount != 0) {
    +			throw new RuntimeException("There are test failures. Success: " + testSuccessCount +
    +				" Failures: " + testFailureCount + " Skipped: " + testSkipCount);
    +		}
    +	}
    +
    +	@SuppressWarnings("ConstantConditions")
    +	private static Map<String, String> getParameterMaps(HttpTestClient httpClient,
    +		JobIdsWithStatusOverview jobOverview) throws InterruptedException, IOException, TimeoutException {
    +		// Get necessary parameters used for all REST API testings.
    +		final Map<String, String> parameterMap = new HashMap<>();
    +		Preconditions.checkState(jobOverview.getJobsWithStatus().stream()
    +			.filter(jobIdWithStatus -> jobIdWithStatus.getJobStatus() == JobStatus.RUNNING)
    +			.count() >= 1, "Cannot found active running jobs, discontinuing test!");
    +		String jobId = jobOverview.getJobsWithStatus().stream()
    +			.filter(jobIdWithStatus -> jobIdWithStatus.getJobStatus() == JobStatus.RUNNING)
    +			.findFirst().get().getJobId().toString();
    +		parameterMap.put(":jobid", jobId);
    +
    +		JobDetailsInfo jobDetailsInfo = getJobDetailInfo(httpClient, jobId);
    +		String vertexId = jobDetailsInfo.getJobVertexInfos().stream()
    +			.findFirst().get().getJobVertexID().toString();
    +		parameterMap.put(":vertexid", vertexId);
    +		parameterMap.put(":checkpointid", "1"); // test first checkpoint
    +		parameterMap.put(":subtaskindex", "0"); // test first subtask
    +
    +		TaskManagersInfo taskManagersInfo = getTaskManagers(httpClient);
    +		String taskMgrId = taskManagersInfo.getTaskManagerInfos().stream().findFirst().get().getResourceId().toString();
    +		parameterMap.put(":taskmanagerid", taskMgrId);
    +		parameterMap.put(":triggerid", "");
    +
    +		return parameterMap;
    +	}
    +
    +	private static JobIdsWithStatusOverview getJobOverview(HttpTestClient httpClient)
    +		throws TimeoutException, InterruptedException, IOException {
    +		httpClient.sendGetRequest("/jobs", TEST_TIMEOUT);
    +		HttpTestClient.SimpleHttpResponse resp = httpClient.getNextResponse();
    +		Preconditions.checkState(resp.getStatus().code() == 200,
    +			"Cannot fetch Flink cluster status!");
    +		return MAPPER.readValue(resp.getContent(), JobIdsWithStatusOverview.class);
    +	}
    +
    +	private static JobDetailsInfo getJobDetailInfo(HttpTestClient httpClient, String jobId)
    +		throws TimeoutException, InterruptedException, IOException {
    +			httpClient.sendGetRequest("/jobs/" + jobId, TEST_TIMEOUT);
    +			HttpTestClient.SimpleHttpResponse resp = httpClient.getNextResponse();
    +			Preconditions.checkState(resp.getStatus().code() == 200,
    +				"Cannot fetch job detail information for job " + jobId);
    +			return MAPPER.readValue(resp.getContent(), JobDetailsInfo.class);
    +	}
    +
    +	private static TaskManagersInfo getTaskManagers(HttpTestClient httpClient)
    +		throws TimeoutException, InterruptedException, IOException {
    +		httpClient.sendGetRequest("/taskmanagers", TEST_TIMEOUT);
    +		HttpTestClient.SimpleHttpResponse resp = httpClient.getNextResponse();
    +		Preconditions.checkState(resp.getStatus().code() == 200,
    +			"Cannot fetch task manager status!");
    +		return MAPPER.readValue(resp.getContent(), TaskManagersInfo.class);
    +	}
    +
    +	private static void testMonitoringEndpointSpecs(HttpTestClient httpClient, MessageHeaders spec,
    +		Map<String, String> parameterMap) {
    +		try {
    +			HttpMethodWrapper method = spec.getHttpMethod();
    +			String path = getRestEndpointPath(spec.getTargetRestEndpointURL(), parameterMap);
    +			if (spec.getRequestClass() == EmptyRequestBody.class) {
    +				switch (method) {
    +					case GET:
    +						httpClient.sendGetRequest(path, TEST_TIMEOUT);
    +						break;
    +					case DELETE:
    +						httpClient.sendDeleteRequest(path, TEST_TIMEOUT);
    +						break;
    +					default:
    +						throw new UnsupportedOperationException("Cannot handle REST Test for " + path +
    +						" with method " + method + ". Only GET and DELETE requests are supported!");
    +				}
    +
    +				HttpTestClient.SimpleHttpResponse resp = httpClient.getNextResponse();
    +
    +				Preconditions.checkState(resp.getStatus().code() == spec.getResponseStatusCode().code(),
    --- End diff --
    
    this should be an assertion, and you can compare the status directly.


---

[GitHub] flink pull request #5849: [FLINK-8986][e2e-test] Flink end to end test REST ...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5849#discussion_r181685888
  
    --- Diff: flink-end-to-end-tests/flink-rest-api-test/src/main/java/org/apache/flink/runtime/rest/tests/RestApiTestSuite.java ---
    @@ -0,0 +1,363 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.rest.tests;
    +
    +import org.apache.flink.api.common.JobID;
    +import org.apache.flink.api.java.tuple.Tuple2;
    +import org.apache.flink.api.java.utils.ParameterTool;
    +import org.apache.flink.configuration.Configuration;
    +import org.apache.flink.configuration.RestOptions;
    +import org.apache.flink.runtime.blob.TransientBlobKey;
    +import org.apache.flink.runtime.blob.TransientBlobService;
    +import org.apache.flink.runtime.concurrent.Executors;
    +import org.apache.flink.runtime.dispatcher.DispatcherGateway;
    +import org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint;
    +import org.apache.flink.runtime.jobgraph.JobStatus;
    +import org.apache.flink.runtime.leaderelection.LeaderContender;
    +import org.apache.flink.runtime.leaderelection.LeaderElectionService;
    +import org.apache.flink.runtime.messages.webmonitor.JobIdsWithStatusOverview;
    +import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
    +import org.apache.flink.runtime.rest.HttpMethodWrapper;
    +import org.apache.flink.runtime.rest.RestServerEndpoint;
    +import org.apache.flink.runtime.rest.RestServerEndpointConfiguration;
    +import org.apache.flink.runtime.rest.handler.RestHandlerConfiguration;
    +import org.apache.flink.runtime.rest.handler.RestHandlerSpecification;
    +import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
    +import org.apache.flink.runtime.rest.messages.MessageHeaders;
    +import org.apache.flink.runtime.rest.messages.job.JobDetailsInfo;
    +import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagersInfo;
    +import org.apache.flink.runtime.rpc.FatalErrorHandler;
    +import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
    +import org.apache.flink.runtime.webmonitor.retriever.MetricQueryServiceRetriever;
    +import org.apache.flink.runtime.webmonitor.testutils.HttpTestClient;
    +import org.apache.flink.util.ConfigurationException;
    +import org.apache.flink.util.Preconditions;
    +
    +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
    +import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandler;
    +
    +import java.io.File;
    +import java.io.IOException;
    +import java.io.InputStream;
    +import java.util.Comparator;
    +import java.util.HashMap;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.UUID;
    +import java.util.concurrent.CompletableFuture;
    +import java.util.concurrent.Executor;
    +import java.util.concurrent.TimeUnit;
    +import java.util.concurrent.TimeoutException;
    +import java.util.regex.Pattern;
    +import java.util.stream.Collectors;
    +
    +import scala.concurrent.duration.FiniteDuration;
    +
    +/**
    + * Rest API test suite.
    + */
    +public class RestApiTestSuite {
    +	private static final FiniteDuration TEST_TIMEOUT = new FiniteDuration(10L, TimeUnit.SECONDS);
    +	private static final ObjectMapper MAPPER = new ObjectMapper();
    +
    +	private static int testSuccessCount = 0;
    +	private static int testFailureCount = 0;
    +	private static int testSkipCount = 0;
    +
    +	public static void main(String[] args) throws Exception {
    +
    +		ParameterTool params = ParameterTool.fromArgs(args);
    +		final String host = params.get("host", "localhost");
    +		final int port = params.getInt("port", 8081);
    +		final HttpTestClient httpClient = new HttpTestClient(host, port);
    +
    +		// Validate Flink cluster is running
    +		JobIdsWithStatusOverview jobOverview = getJobOverview(httpClient);
    +
    +		// Get necessary parameters for testing GET endpoints
    +		Map<String, String> parameterMap = getParameterMaps(httpClient, jobOverview);
    +
    +		// Get list of endpoints
    +		List<MessageHeaders> specs = new E2ETestDispatcherRestEndpoint().getSpecs();
    --- End diff --
    
    raw parameter usage


---

[GitHub] flink pull request #5849: [FLINK-8986][e2e-test][WIP] Flink end to end test ...

Posted by walterddr <gi...@git.apache.org>.
Github user walterddr closed the pull request at:

    https://github.com/apache/flink/pull/5849


---

[GitHub] flink issue #5849: [FLINK-8986][e2e-test][WIP] Flink end to end test REST AP...

Posted by tzulitai <gi...@git.apache.org>.
Github user tzulitai commented on the issue:

    https://github.com/apache/flink/pull/5849
  
    @walterddr yes, you can do that. In the description, just leave some notice that the PR is based on another, and which of the commits are relevant.


---

[GitHub] flink pull request #5849: [FLINK-8986][e2e-test] Flink end to end test REST ...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5849#discussion_r181688583
  
    --- Diff: flink-end-to-end-tests/test-scripts/test_rest_api.sh ---
    @@ -0,0 +1,44 @@
    +#!/usr/bin/env bash
    +################################################################################
    +# Licensed to the Apache Software Foundation (ASF) under one
    +# or more contributor license agreements.  See the NOTICE file
    +# distributed with this work for additional information
    +# regarding copyright ownership.  The ASF licenses this file
    +# to you under the Apache License, Version 2.0 (the
    +# "License"); you may not use this file except in compliance
    +# with the License.  You may obtain a copy of the License at
    +#
    +#     http://www.apache.org/licenses/LICENSE-2.0
    +#
    +# Unless required by applicable law or agreed to in writing, software
    +# distributed under the License is distributed on an "AS IS" BASIS,
    +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    +# See the License for the specific language governing permissions and
    +# limitations under the License.
    +################################################################################
    +
    +source "$(dirname "$0")"/common.sh
    +
    +TEST_JAR_JAR=$TEST_INFRA_DIR/../../flink-end-to-end-tests/flink-rest-api-test/target/PeriodicStreamingJob.jar
    +TEST_PROGRAM_JAR=$TEST_INFRA_DIR/../../flink-end-to-end-tests/flink-rest-api-test/target/RestApiTestSuite.jar
    +
    +echo "Run Rest-Api-Test Program"
    +
    +start_cluster
    +$FLINK_DIR/bin/taskmanager.sh start
    --- End diff --
    
    what are the extra taskmanagers for?


---

[GitHub] flink issue #5849: [FLINK-8986][e2e-test] Flink end to end test REST API

Posted by walterddr <gi...@git.apache.org>.
Github user walterddr commented on the issue:

    https://github.com/apache/flink/pull/5849
  
    At this moment it skips 8 of the 39 tests (PATCH & POST methods). And there's 2 test failures and I think should be fix in another JIRA. Namely,
    `/jobs/:jobid/vertices/:vertexid/subtasks/:subtaskid/attempts/attempt/accumulators` returns a 500
    `/jobs/:jobid/vertices/:vertexid/subtasks/:subtaskid/attempts/attempt/` returns a 500


---

[GitHub] flink pull request #5849: [FLINK-8986][e2e-test] Flink end to end test REST ...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5849#discussion_r181688557
  
    --- Diff: flink-end-to-end-tests/test-scripts/test_rest_api.sh ---
    @@ -0,0 +1,44 @@
    +#!/usr/bin/env bash
    +################################################################################
    +# Licensed to the Apache Software Foundation (ASF) under one
    +# or more contributor license agreements.  See the NOTICE file
    +# distributed with this work for additional information
    +# regarding copyright ownership.  The ASF licenses this file
    +# to you under the Apache License, Version 2.0 (the
    +# "License"); you may not use this file except in compliance
    +# with the License.  You may obtain a copy of the License at
    +#
    +#     http://www.apache.org/licenses/LICENSE-2.0
    +#
    +# Unless required by applicable law or agreed to in writing, software
    +# distributed under the License is distributed on an "AS IS" BASIS,
    +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    +# See the License for the specific language governing permissions and
    +# limitations under the License.
    +################################################################################
    +
    +source "$(dirname "$0")"/common.sh
    +
    +TEST_JAR_JAR=$TEST_INFRA_DIR/../../flink-end-to-end-tests/flink-rest-api-test/target/PeriodicStreamingJob.jar
    +TEST_PROGRAM_JAR=$TEST_INFRA_DIR/../../flink-end-to-end-tests/flink-rest-api-test/target/RestApiTestSuite.jar
    +
    +echo "Run Rest-Api-Test Program"
    +
    +start_cluster
    +$FLINK_DIR/bin/taskmanager.sh start
    +$FLINK_DIR/bin/taskmanager.sh start
    +$FLINK_DIR/bin/taskmanager.sh start
    +
    +# Start periodic streaming job to test against
    +$FLINK_DIR/bin/flink run -p 4 $TEST_JAR_JAR -outputPath file://${TEST_DATA_DIR}/out/result &
    --- End diff --
    
    there's no benefit to running this with a higher parallelism


---

[GitHub] flink issue #5849: [FLINK-8986][e2e-test][WIP] Flink end to end test REST AP...

Posted by walterddr <gi...@git.apache.org>.
Github user walterddr commented on the issue:

    https://github.com/apache/flink/pull/5849
  
    Hi @tzulitai . I've actually created a new version of the test based on @zentol 's comment on this PR: https://github.com/walterddr/flink/compare/FLINK-8985...walterddr:FLINK-8986-test
    
    But it actually depends on https://github.com/apache/flink/pull/5863 as I reused the periodic stream job for testing. Is it possible to create a PR on top of another currently pending PR?


---

[GitHub] flink pull request #5849: [FLINK-8986][e2e-test] Flink end to end test REST ...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5849#discussion_r181683094
  
    --- Diff: flink-end-to-end-tests/flink-rest-api-test/src/main/java/org/apache/flink/runtime/rest/tests/RestApiTestSuite.java ---
    @@ -0,0 +1,363 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.rest.tests;
    +
    +import org.apache.flink.api.common.JobID;
    +import org.apache.flink.api.java.tuple.Tuple2;
    +import org.apache.flink.api.java.utils.ParameterTool;
    +import org.apache.flink.configuration.Configuration;
    +import org.apache.flink.configuration.RestOptions;
    +import org.apache.flink.runtime.blob.TransientBlobKey;
    +import org.apache.flink.runtime.blob.TransientBlobService;
    +import org.apache.flink.runtime.concurrent.Executors;
    +import org.apache.flink.runtime.dispatcher.DispatcherGateway;
    +import org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint;
    +import org.apache.flink.runtime.jobgraph.JobStatus;
    +import org.apache.flink.runtime.leaderelection.LeaderContender;
    +import org.apache.flink.runtime.leaderelection.LeaderElectionService;
    +import org.apache.flink.runtime.messages.webmonitor.JobIdsWithStatusOverview;
    +import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
    +import org.apache.flink.runtime.rest.HttpMethodWrapper;
    +import org.apache.flink.runtime.rest.RestServerEndpoint;
    +import org.apache.flink.runtime.rest.RestServerEndpointConfiguration;
    +import org.apache.flink.runtime.rest.handler.RestHandlerConfiguration;
    +import org.apache.flink.runtime.rest.handler.RestHandlerSpecification;
    +import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
    +import org.apache.flink.runtime.rest.messages.MessageHeaders;
    +import org.apache.flink.runtime.rest.messages.job.JobDetailsInfo;
    +import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagersInfo;
    +import org.apache.flink.runtime.rpc.FatalErrorHandler;
    +import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
    +import org.apache.flink.runtime.webmonitor.retriever.MetricQueryServiceRetriever;
    +import org.apache.flink.runtime.webmonitor.testutils.HttpTestClient;
    +import org.apache.flink.util.ConfigurationException;
    +import org.apache.flink.util.Preconditions;
    +
    +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
    +import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandler;
    +
    +import java.io.File;
    +import java.io.IOException;
    +import java.io.InputStream;
    +import java.util.Comparator;
    +import java.util.HashMap;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.UUID;
    +import java.util.concurrent.CompletableFuture;
    +import java.util.concurrent.Executor;
    +import java.util.concurrent.TimeUnit;
    +import java.util.concurrent.TimeoutException;
    +import java.util.regex.Pattern;
    +import java.util.stream.Collectors;
    +
    +import scala.concurrent.duration.FiniteDuration;
    +
    +/**
    + * Rest API test suite.
    + */
    +public class RestApiTestSuite {
    +	private static final FiniteDuration TEST_TIMEOUT = new FiniteDuration(10L, TimeUnit.SECONDS);
    +	private static final ObjectMapper MAPPER = new ObjectMapper();
    +
    +	private static int testSuccessCount = 0;
    +	private static int testFailureCount = 0;
    +	private static int testSkipCount = 0;
    +
    +	public static void main(String[] args) throws Exception {
    +
    +		ParameterTool params = ParameterTool.fromArgs(args);
    +		final String host = params.get("host", "localhost");
    +		final int port = params.getInt("port", 8081);
    +		final HttpTestClient httpClient = new HttpTestClient(host, port);
    +
    +		// Validate Flink cluster is running
    +		JobIdsWithStatusOverview jobOverview = getJobOverview(httpClient);
    +
    +		// Get necessary parameters for testing GET endpoints
    +		Map<String, String> parameterMap = getParameterMaps(httpClient, jobOverview);
    +
    +		// Get list of endpoints
    +		List<MessageHeaders> specs = new E2ETestDispatcherRestEndpoint().getSpecs();
    +		specs.forEach(spec -> testMonitoringEndpointSpecs(httpClient, spec, parameterMap));
    +
    +		if (testFailureCount != 0) {
    +			throw new RuntimeException("There are test failures. Success: " + testSuccessCount +
    +				" Failures: " + testFailureCount + " Skipped: " + testSkipCount);
    +		}
    +	}
    +
    +	@SuppressWarnings("ConstantConditions")
    +	private static Map<String, String> getParameterMaps(HttpTestClient httpClient,
    +		JobIdsWithStatusOverview jobOverview) throws InterruptedException, IOException, TimeoutException {
    +		// Get necessary parameters used for all REST API testings.
    +		final Map<String, String> parameterMap = new HashMap<>();
    +		Preconditions.checkState(jobOverview.getJobsWithStatus().stream()
    +			.filter(jobIdWithStatus -> jobIdWithStatus.getJobStatus() == JobStatus.RUNNING)
    +			.count() >= 1, "Cannot found active running jobs, discontinuing test!");
    +		String jobId = jobOverview.getJobsWithStatus().stream()
    +			.filter(jobIdWithStatus -> jobIdWithStatus.getJobStatus() == JobStatus.RUNNING)
    +			.findFirst().get().getJobId().toString();
    +		parameterMap.put(":jobid", jobId);
    +
    +		JobDetailsInfo jobDetailsInfo = getJobDetailInfo(httpClient, jobId);
    +		String vertexId = jobDetailsInfo.getJobVertexInfos().stream()
    +			.findFirst().get().getJobVertexID().toString();
    +		parameterMap.put(":vertexid", vertexId);
    +		parameterMap.put(":checkpointid", "1"); // test first checkpoint
    +		parameterMap.put(":subtaskindex", "0"); // test first subtask
    +
    +		TaskManagersInfo taskManagersInfo = getTaskManagers(httpClient);
    +		String taskMgrId = taskManagersInfo.getTaskManagerInfos().stream().findFirst().get().getResourceId().toString();
    +		parameterMap.put(":taskmanagerid", taskMgrId);
    +		parameterMap.put(":triggerid", "");
    +
    +		return parameterMap;
    +	}
    +
    +	private static JobIdsWithStatusOverview getJobOverview(HttpTestClient httpClient)
    +		throws TimeoutException, InterruptedException, IOException {
    +		httpClient.sendGetRequest("/jobs", TEST_TIMEOUT);
    +		HttpTestClient.SimpleHttpResponse resp = httpClient.getNextResponse();
    +		Preconditions.checkState(resp.getStatus().code() == 200,
    +			"Cannot fetch Flink cluster status!");
    +		return MAPPER.readValue(resp.getContent(), JobIdsWithStatusOverview.class);
    +	}
    +
    +	private static JobDetailsInfo getJobDetailInfo(HttpTestClient httpClient, String jobId)
    +		throws TimeoutException, InterruptedException, IOException {
    +			httpClient.sendGetRequest("/jobs/" + jobId, TEST_TIMEOUT);
    +			HttpTestClient.SimpleHttpResponse resp = httpClient.getNextResponse();
    +			Preconditions.checkState(resp.getStatus().code() == 200,
    +				"Cannot fetch job detail information for job " + jobId);
    +			return MAPPER.readValue(resp.getContent(), JobDetailsInfo.class);
    +	}
    +
    +	private static TaskManagersInfo getTaskManagers(HttpTestClient httpClient)
    +		throws TimeoutException, InterruptedException, IOException {
    +		httpClient.sendGetRequest("/taskmanagers", TEST_TIMEOUT);
    +		HttpTestClient.SimpleHttpResponse resp = httpClient.getNextResponse();
    +		Preconditions.checkState(resp.getStatus().code() == 200,
    +			"Cannot fetch task manager status!");
    +		return MAPPER.readValue(resp.getContent(), TaskManagersInfo.class);
    +	}
    +
    +	private static void testMonitoringEndpointSpecs(HttpTestClient httpClient, MessageHeaders spec,
    +		Map<String, String> parameterMap) {
    +		try {
    +			HttpMethodWrapper method = spec.getHttpMethod();
    +			String path = getRestEndpointPath(spec.getTargetRestEndpointURL(), parameterMap);
    +			if (spec.getRequestClass() == EmptyRequestBody.class) {
    --- End diff --
    
    I would filter out unsupported requests before calling `testMonitoringEndpointSpecs`.


---

[GitHub] flink pull request #5849: [FLINK-8986][e2e-test] Flink end to end test REST ...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5849#discussion_r181681917
  
    --- Diff: flink-end-to-end-tests/flink-rest-api-test/src/main/java/org/apache/flink/runtime/rest/tests/PeriodicStreamingJob.java ---
    @@ -0,0 +1,111 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.rest.tests;
    +
    +import org.apache.flink.api.common.typeinfo.TypeInformation;
    +import org.apache.flink.api.common.typeinfo.Types;
    +import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
    +import org.apache.flink.api.java.utils.ParameterTool;
    +import org.apache.flink.core.fs.FileSystem;
    +import org.apache.flink.streaming.api.TimeCharacteristic;
    +import org.apache.flink.streaming.api.checkpoint.ListCheckpointed;
    +import org.apache.flink.streaming.api.datastream.DataStream;
    +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    +import org.apache.flink.streaming.api.functions.source.SourceFunction;
    +import org.apache.flink.types.Row;
    +
    +import java.util.Collections;
    +import java.util.List;
    +
    +/**
    + */
    --- End diff --
    
    proper javadoc missing


---

[GitHub] flink pull request #5849: [FLINK-8986][e2e-test] Flink end to end test REST ...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5849#discussion_r181685328
  
    --- Diff: flink-end-to-end-tests/flink-rest-api-test/src/main/java/org/apache/flink/runtime/rest/tests/RestApiTestSuite.java ---
    @@ -0,0 +1,363 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.rest.tests;
    +
    +import org.apache.flink.api.common.JobID;
    +import org.apache.flink.api.java.tuple.Tuple2;
    +import org.apache.flink.api.java.utils.ParameterTool;
    +import org.apache.flink.configuration.Configuration;
    +import org.apache.flink.configuration.RestOptions;
    +import org.apache.flink.runtime.blob.TransientBlobKey;
    +import org.apache.flink.runtime.blob.TransientBlobService;
    +import org.apache.flink.runtime.concurrent.Executors;
    +import org.apache.flink.runtime.dispatcher.DispatcherGateway;
    +import org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint;
    +import org.apache.flink.runtime.jobgraph.JobStatus;
    +import org.apache.flink.runtime.leaderelection.LeaderContender;
    +import org.apache.flink.runtime.leaderelection.LeaderElectionService;
    +import org.apache.flink.runtime.messages.webmonitor.JobIdsWithStatusOverview;
    +import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
    +import org.apache.flink.runtime.rest.HttpMethodWrapper;
    +import org.apache.flink.runtime.rest.RestServerEndpoint;
    +import org.apache.flink.runtime.rest.RestServerEndpointConfiguration;
    +import org.apache.flink.runtime.rest.handler.RestHandlerConfiguration;
    +import org.apache.flink.runtime.rest.handler.RestHandlerSpecification;
    +import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
    +import org.apache.flink.runtime.rest.messages.MessageHeaders;
    +import org.apache.flink.runtime.rest.messages.job.JobDetailsInfo;
    +import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagersInfo;
    +import org.apache.flink.runtime.rpc.FatalErrorHandler;
    +import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
    +import org.apache.flink.runtime.webmonitor.retriever.MetricQueryServiceRetriever;
    +import org.apache.flink.runtime.webmonitor.testutils.HttpTestClient;
    +import org.apache.flink.util.ConfigurationException;
    +import org.apache.flink.util.Preconditions;
    +
    +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
    +import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandler;
    +
    +import java.io.File;
    +import java.io.IOException;
    +import java.io.InputStream;
    +import java.util.Comparator;
    +import java.util.HashMap;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.UUID;
    +import java.util.concurrent.CompletableFuture;
    +import java.util.concurrent.Executor;
    +import java.util.concurrent.TimeUnit;
    +import java.util.concurrent.TimeoutException;
    +import java.util.regex.Pattern;
    +import java.util.stream.Collectors;
    +
    +import scala.concurrent.duration.FiniteDuration;
    +
    +/**
    + * Rest API test suite.
    + */
    +public class RestApiTestSuite {
    +	private static final FiniteDuration TEST_TIMEOUT = new FiniteDuration(10L, TimeUnit.SECONDS);
    +	private static final ObjectMapper MAPPER = new ObjectMapper();
    +
    +	private static int testSuccessCount = 0;
    +	private static int testFailureCount = 0;
    +	private static int testSkipCount = 0;
    +
    +	public static void main(String[] args) throws Exception {
    +
    +		ParameterTool params = ParameterTool.fromArgs(args);
    +		final String host = params.get("host", "localhost");
    +		final int port = params.getInt("port", 8081);
    +		final HttpTestClient httpClient = new HttpTestClient(host, port);
    +
    +		// Validate Flink cluster is running
    +		JobIdsWithStatusOverview jobOverview = getJobOverview(httpClient);
    +
    +		// Get necessary parameters for testing GET endpoints
    +		Map<String, String> parameterMap = getParameterMaps(httpClient, jobOverview);
    +
    +		// Get list of endpoints
    +		List<MessageHeaders> specs = new E2ETestDispatcherRestEndpoint().getSpecs();
    +		specs.forEach(spec -> testMonitoringEndpointSpecs(httpClient, spec, parameterMap));
    +
    +		if (testFailureCount != 0) {
    +			throw new RuntimeException("There are test failures. Success: " + testSuccessCount +
    +				" Failures: " + testFailureCount + " Skipped: " + testSkipCount);
    +		}
    +	}
    +
    +	@SuppressWarnings("ConstantConditions")
    +	private static Map<String, String> getParameterMaps(HttpTestClient httpClient,
    +		JobIdsWithStatusOverview jobOverview) throws InterruptedException, IOException, TimeoutException {
    +		// Get necessary parameters used for all REST API testings.
    +		final Map<String, String> parameterMap = new HashMap<>();
    +		Preconditions.checkState(jobOverview.getJobsWithStatus().stream()
    +			.filter(jobIdWithStatus -> jobIdWithStatus.getJobStatus() == JobStatus.RUNNING)
    +			.count() >= 1, "Cannot found active running jobs, discontinuing test!");
    +		String jobId = jobOverview.getJobsWithStatus().stream()
    +			.filter(jobIdWithStatus -> jobIdWithStatus.getJobStatus() == JobStatus.RUNNING)
    +			.findFirst().get().getJobId().toString();
    +		parameterMap.put(":jobid", jobId);
    +
    +		JobDetailsInfo jobDetailsInfo = getJobDetailInfo(httpClient, jobId);
    +		String vertexId = jobDetailsInfo.getJobVertexInfos().stream()
    +			.findFirst().get().getJobVertexID().toString();
    +		parameterMap.put(":vertexid", vertexId);
    +		parameterMap.put(":checkpointid", "1"); // test first checkpoint
    +		parameterMap.put(":subtaskindex", "0"); // test first subtask
    +
    +		TaskManagersInfo taskManagersInfo = getTaskManagers(httpClient);
    +		String taskMgrId = taskManagersInfo.getTaskManagerInfos().stream().findFirst().get().getResourceId().toString();
    +		parameterMap.put(":taskmanagerid", taskMgrId);
    +		parameterMap.put(":triggerid", "");
    --- End diff --
    
    this should probably be removed.


---

[GitHub] flink issue #5849: [FLINK-8986][e2e-test][WIP] Flink end to end test REST AP...

Posted by tzulitai <gi...@git.apache.org>.
Github user tzulitai commented on the issue:

    https://github.com/apache/flink/pull/5849
  
    Hi @walterddr, what is the status of this PR? Would be nice if we can move forward with this PR (and also the CLI e2e test PR that also you opened.)


---

[GitHub] flink pull request #5849: [FLINK-8986][e2e-test] Flink end to end test REST ...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5849#discussion_r181669761
  
    --- Diff: flink-end-to-end-tests/flink-rest-api-test/src/main/java/org/apache/flink/runtime/rest/tests/RestApiTestSuite.java ---
    @@ -0,0 +1,363 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.rest.tests;
    +
    +import org.apache.flink.api.common.JobID;
    +import org.apache.flink.api.java.tuple.Tuple2;
    +import org.apache.flink.api.java.utils.ParameterTool;
    +import org.apache.flink.configuration.Configuration;
    +import org.apache.flink.configuration.RestOptions;
    +import org.apache.flink.runtime.blob.TransientBlobKey;
    +import org.apache.flink.runtime.blob.TransientBlobService;
    +import org.apache.flink.runtime.concurrent.Executors;
    +import org.apache.flink.runtime.dispatcher.DispatcherGateway;
    +import org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint;
    +import org.apache.flink.runtime.jobgraph.JobStatus;
    +import org.apache.flink.runtime.leaderelection.LeaderContender;
    +import org.apache.flink.runtime.leaderelection.LeaderElectionService;
    +import org.apache.flink.runtime.messages.webmonitor.JobIdsWithStatusOverview;
    +import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
    +import org.apache.flink.runtime.rest.HttpMethodWrapper;
    +import org.apache.flink.runtime.rest.RestServerEndpoint;
    +import org.apache.flink.runtime.rest.RestServerEndpointConfiguration;
    +import org.apache.flink.runtime.rest.handler.RestHandlerConfiguration;
    +import org.apache.flink.runtime.rest.handler.RestHandlerSpecification;
    +import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
    +import org.apache.flink.runtime.rest.messages.MessageHeaders;
    +import org.apache.flink.runtime.rest.messages.job.JobDetailsInfo;
    +import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagersInfo;
    +import org.apache.flink.runtime.rpc.FatalErrorHandler;
    +import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
    +import org.apache.flink.runtime.webmonitor.retriever.MetricQueryServiceRetriever;
    +import org.apache.flink.runtime.webmonitor.testutils.HttpTestClient;
    +import org.apache.flink.util.ConfigurationException;
    +import org.apache.flink.util.Preconditions;
    +
    +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
    +import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandler;
    +
    +import java.io.File;
    +import java.io.IOException;
    +import java.io.InputStream;
    +import java.util.Comparator;
    +import java.util.HashMap;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.UUID;
    +import java.util.concurrent.CompletableFuture;
    +import java.util.concurrent.Executor;
    +import java.util.concurrent.TimeUnit;
    +import java.util.concurrent.TimeoutException;
    +import java.util.regex.Pattern;
    +import java.util.stream.Collectors;
    +
    +import scala.concurrent.duration.FiniteDuration;
    +
    +/**
    + * Rest API test suite.
    + */
    +public class RestApiTestSuite {
    +	private static final FiniteDuration TEST_TIMEOUT = new FiniteDuration(10L, TimeUnit.SECONDS);
    +	private static final ObjectMapper MAPPER = new ObjectMapper();
    +
    +	private static int testSuccessCount = 0;
    +	private static int testFailureCount = 0;
    +	private static int testSkipCount = 0;
    +
    +	public static void main(String[] args) throws Exception {
    +
    +		ParameterTool params = ParameterTool.fromArgs(args);
    +		final String host = params.get("host", "localhost");
    +		final int port = params.getInt("port", 8081);
    +		final HttpTestClient httpClient = new HttpTestClient(host, port);
    +
    +		// Validate Flink cluster is running
    +		JobIdsWithStatusOverview jobOverview = getJobOverview(httpClient);
    +
    +		// Get necessary parameters for testing GET endpoints
    +		Map<String, String> parameterMap = getParameterMaps(httpClient, jobOverview);
    +
    +		// Get list of endpoints
    +		List<MessageHeaders> specs = new E2ETestDispatcherRestEndpoint().getSpecs();
    +		specs.forEach(spec -> testMonitoringEndpointSpecs(httpClient, spec, parameterMap));
    +
    +		if (testFailureCount != 0) {
    +			throw new RuntimeException("There are test failures. Success: " + testSuccessCount +
    +				" Failures: " + testFailureCount + " Skipped: " + testSkipCount);
    +		}
    +	}
    +
    +	@SuppressWarnings("ConstantConditions")
    +	private static Map<String, String> getParameterMaps(HttpTestClient httpClient,
    +		JobIdsWithStatusOverview jobOverview) throws InterruptedException, IOException, TimeoutException {
    +		// Get necessary parameters used for all REST API testings.
    +		final Map<String, String> parameterMap = new HashMap<>();
    +		Preconditions.checkState(jobOverview.getJobsWithStatus().stream()
    +			.filter(jobIdWithStatus -> jobIdWithStatus.getJobStatus() == JobStatus.RUNNING)
    +			.count() >= 1, "Cannot found active running jobs, discontinuing test!");
    +		String jobId = jobOverview.getJobsWithStatus().stream()
    +			.filter(jobIdWithStatus -> jobIdWithStatus.getJobStatus() == JobStatus.RUNNING)
    +			.findFirst().get().getJobId().toString();
    +		parameterMap.put(":jobid", jobId);
    +
    +		JobDetailsInfo jobDetailsInfo = getJobDetailInfo(httpClient, jobId);
    +		String vertexId = jobDetailsInfo.getJobVertexInfos().stream()
    +			.findFirst().get().getJobVertexID().toString();
    +		parameterMap.put(":vertexid", vertexId);
    +		parameterMap.put(":checkpointid", "1"); // test first checkpoint
    +		parameterMap.put(":subtaskindex", "0"); // test first subtask
    +
    +		TaskManagersInfo taskManagersInfo = getTaskManagers(httpClient);
    +		String taskMgrId = taskManagersInfo.getTaskManagerInfos().stream().findFirst().get().getResourceId().toString();
    +		parameterMap.put(":taskmanagerid", taskMgrId);
    +		parameterMap.put(":triggerid", "");
    +
    +		return parameterMap;
    +	}
    +
    +	private static JobIdsWithStatusOverview getJobOverview(HttpTestClient httpClient)
    +		throws TimeoutException, InterruptedException, IOException {
    +		httpClient.sendGetRequest("/jobs", TEST_TIMEOUT);
    +		HttpTestClient.SimpleHttpResponse resp = httpClient.getNextResponse();
    +		Preconditions.checkState(resp.getStatus().code() == 200,
    +			"Cannot fetch Flink cluster status!");
    +		return MAPPER.readValue(resp.getContent(), JobIdsWithStatusOverview.class);
    +	}
    +
    +	private static JobDetailsInfo getJobDetailInfo(HttpTestClient httpClient, String jobId)
    +		throws TimeoutException, InterruptedException, IOException {
    +			httpClient.sendGetRequest("/jobs/" + jobId, TEST_TIMEOUT);
    +			HttpTestClient.SimpleHttpResponse resp = httpClient.getNextResponse();
    +			Preconditions.checkState(resp.getStatus().code() == 200,
    +				"Cannot fetch job detail information for job " + jobId);
    +			return MAPPER.readValue(resp.getContent(), JobDetailsInfo.class);
    +	}
    +
    +	private static TaskManagersInfo getTaskManagers(HttpTestClient httpClient)
    +		throws TimeoutException, InterruptedException, IOException {
    +		httpClient.sendGetRequest("/taskmanagers", TEST_TIMEOUT);
    +		HttpTestClient.SimpleHttpResponse resp = httpClient.getNextResponse();
    +		Preconditions.checkState(resp.getStatus().code() == 200,
    +			"Cannot fetch task manager status!");
    +		return MAPPER.readValue(resp.getContent(), TaskManagersInfo.class);
    +	}
    +
    +	private static void testMonitoringEndpointSpecs(HttpTestClient httpClient, MessageHeaders spec,
    +		Map<String, String> parameterMap) {
    +		try {
    +			HttpMethodWrapper method = spec.getHttpMethod();
    +			String path = getRestEndpointPath(spec.getTargetRestEndpointURL(), parameterMap);
    +			if (spec.getRequestClass() == EmptyRequestBody.class) {
    +				switch (method) {
    +					case GET:
    +						httpClient.sendGetRequest(path, TEST_TIMEOUT);
    +						break;
    +					case DELETE:
    +						httpClient.sendDeleteRequest(path, TEST_TIMEOUT);
    +						break;
    +					default:
    +						throw new UnsupportedOperationException("Cannot handle REST Test for " + path +
    +						" with method " + method + ". Only GET and DELETE requests are supported!");
    +				}
    +
    +				HttpTestClient.SimpleHttpResponse resp = httpClient.getNextResponse();
    +
    +				Preconditions.checkState(resp.getStatus().code() == spec.getResponseStatusCode().code(),
    +					"Found mismatching status code from endpoint " + path + " with method " + method +
    +					", expecting: " + spec.getResponseStatusCode().code() + ", but was: " + resp.getStatus().code());
    +				// System.out.println("Found matching status for endpoint " + path + " with method  " + method);
    +				@SuppressWarnings("unchecked")
    --- End diff --
    
    this is unnecessary. `MessageHeaders` have the response class as a generic argument.
    
    Change the method signature to 
    ```
    private static <P extends ResponseBody> void testMonitoringEndpointSpecs(HttpTestClient httpClient, MessageHeaders<?, P, ?> spec, Map<String, String> parameterMap)
    ```


---

[GitHub] flink pull request #5849: [FLINK-8986][e2e-test] Flink end to end test REST ...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5849#discussion_r181687398
  
    --- Diff: flink-end-to-end-tests/flink-rest-api-test/src/main/java/org/apache/flink/runtime/rest/tests/RestApiTestSuite.java ---
    @@ -0,0 +1,363 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.rest.tests;
    +
    +import org.apache.flink.api.common.JobID;
    +import org.apache.flink.api.java.tuple.Tuple2;
    +import org.apache.flink.api.java.utils.ParameterTool;
    +import org.apache.flink.configuration.Configuration;
    +import org.apache.flink.configuration.RestOptions;
    +import org.apache.flink.runtime.blob.TransientBlobKey;
    +import org.apache.flink.runtime.blob.TransientBlobService;
    +import org.apache.flink.runtime.concurrent.Executors;
    +import org.apache.flink.runtime.dispatcher.DispatcherGateway;
    +import org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint;
    +import org.apache.flink.runtime.jobgraph.JobStatus;
    +import org.apache.flink.runtime.leaderelection.LeaderContender;
    +import org.apache.flink.runtime.leaderelection.LeaderElectionService;
    +import org.apache.flink.runtime.messages.webmonitor.JobIdsWithStatusOverview;
    +import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
    +import org.apache.flink.runtime.rest.HttpMethodWrapper;
    +import org.apache.flink.runtime.rest.RestServerEndpoint;
    +import org.apache.flink.runtime.rest.RestServerEndpointConfiguration;
    +import org.apache.flink.runtime.rest.handler.RestHandlerConfiguration;
    +import org.apache.flink.runtime.rest.handler.RestHandlerSpecification;
    +import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
    +import org.apache.flink.runtime.rest.messages.MessageHeaders;
    +import org.apache.flink.runtime.rest.messages.job.JobDetailsInfo;
    +import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagersInfo;
    +import org.apache.flink.runtime.rpc.FatalErrorHandler;
    +import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
    +import org.apache.flink.runtime.webmonitor.retriever.MetricQueryServiceRetriever;
    +import org.apache.flink.runtime.webmonitor.testutils.HttpTestClient;
    +import org.apache.flink.util.ConfigurationException;
    +import org.apache.flink.util.Preconditions;
    +
    +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
    +import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandler;
    +
    +import java.io.File;
    +import java.io.IOException;
    +import java.io.InputStream;
    +import java.util.Comparator;
    +import java.util.HashMap;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.UUID;
    +import java.util.concurrent.CompletableFuture;
    +import java.util.concurrent.Executor;
    +import java.util.concurrent.TimeUnit;
    +import java.util.concurrent.TimeoutException;
    +import java.util.regex.Pattern;
    +import java.util.stream.Collectors;
    +
    +import scala.concurrent.duration.FiniteDuration;
    +
    +/**
    + * Rest API test suite.
    + */
    +public class RestApiTestSuite {
    +	private static final FiniteDuration TEST_TIMEOUT = new FiniteDuration(10L, TimeUnit.SECONDS);
    +	private static final ObjectMapper MAPPER = new ObjectMapper();
    +
    +	private static int testSuccessCount = 0;
    +	private static int testFailureCount = 0;
    +	private static int testSkipCount = 0;
    +
    +	public static void main(String[] args) throws Exception {
    +
    +		ParameterTool params = ParameterTool.fromArgs(args);
    +		final String host = params.get("host", "localhost");
    +		final int port = params.getInt("port", 8081);
    +		final HttpTestClient httpClient = new HttpTestClient(host, port);
    +
    +		// Validate Flink cluster is running
    +		JobIdsWithStatusOverview jobOverview = getJobOverview(httpClient);
    +
    +		// Get necessary parameters for testing GET endpoints
    +		Map<String, String> parameterMap = getParameterMaps(httpClient, jobOverview);
    +
    +		// Get list of endpoints
    +		List<MessageHeaders> specs = new E2ETestDispatcherRestEndpoint().getSpecs();
    +		specs.forEach(spec -> testMonitoringEndpointSpecs(httpClient, spec, parameterMap));
    +
    +		if (testFailureCount != 0) {
    +			throw new RuntimeException("There are test failures. Success: " + testSuccessCount +
    +				" Failures: " + testFailureCount + " Skipped: " + testSkipCount);
    +		}
    +	}
    +
    +	@SuppressWarnings("ConstantConditions")
    +	private static Map<String, String> getParameterMaps(HttpTestClient httpClient,
    +		JobIdsWithStatusOverview jobOverview) throws InterruptedException, IOException, TimeoutException {
    +		// Get necessary parameters used for all REST API testings.
    +		final Map<String, String> parameterMap = new HashMap<>();
    +		Preconditions.checkState(jobOverview.getJobsWithStatus().stream()
    +			.filter(jobIdWithStatus -> jobIdWithStatus.getJobStatus() == JobStatus.RUNNING)
    +			.count() >= 1, "Cannot found active running jobs, discontinuing test!");
    +		String jobId = jobOverview.getJobsWithStatus().stream()
    +			.filter(jobIdWithStatus -> jobIdWithStatus.getJobStatus() == JobStatus.RUNNING)
    +			.findFirst().get().getJobId().toString();
    +		parameterMap.put(":jobid", jobId);
    +
    +		JobDetailsInfo jobDetailsInfo = getJobDetailInfo(httpClient, jobId);
    +		String vertexId = jobDetailsInfo.getJobVertexInfos().stream()
    +			.findFirst().get().getJobVertexID().toString();
    +		parameterMap.put(":vertexid", vertexId);
    +		parameterMap.put(":checkpointid", "1"); // test first checkpoint
    +		parameterMap.put(":subtaskindex", "0"); // test first subtask
    +
    +		TaskManagersInfo taskManagersInfo = getTaskManagers(httpClient);
    +		String taskMgrId = taskManagersInfo.getTaskManagerInfos().stream().findFirst().get().getResourceId().toString();
    +		parameterMap.put(":taskmanagerid", taskMgrId);
    +		parameterMap.put(":triggerid", "");
    +
    +		return parameterMap;
    +	}
    +
    +	private static JobIdsWithStatusOverview getJobOverview(HttpTestClient httpClient)
    +		throws TimeoutException, InterruptedException, IOException {
    +		httpClient.sendGetRequest("/jobs", TEST_TIMEOUT);
    +		HttpTestClient.SimpleHttpResponse resp = httpClient.getNextResponse();
    +		Preconditions.checkState(resp.getStatus().code() == 200,
    +			"Cannot fetch Flink cluster status!");
    +		return MAPPER.readValue(resp.getContent(), JobIdsWithStatusOverview.class);
    +	}
    +
    +	private static JobDetailsInfo getJobDetailInfo(HttpTestClient httpClient, String jobId)
    --- End diff --
    
    these methods can be made less brittle my using the corresponding `MessageHeader` objects.
    
    ```
    JobDetailsHeaders headers = JobDetailsHeaders.getInstance();
    httpClient.sendGetRequest(headers.getTargetRestEndpointURL().replace(JobIDPathParameter.KEY, jobId), TEST_TIMEOUT);
    HttpTestClient.SimpleHttpResponse resp = httpClient.getNextResponse();
    Preconditions.checkState(resp.getStatus() == headers.getResponseStatusCode(),
    	"Cannot fetch job detail information for job " + jobId);
    return MAPPER.readValue(resp.getContent(), headers.getResponseClass());
    ```


---

[GitHub] flink pull request #5849: [FLINK-8986][e2e-test] Flink end to end test REST ...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5849#discussion_r181686235
  
    --- Diff: flink-end-to-end-tests/flink-rest-api-test/src/main/java/org/apache/flink/runtime/rest/tests/RestApiTestSuite.java ---
    @@ -0,0 +1,363 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.rest.tests;
    +
    +import org.apache.flink.api.common.JobID;
    +import org.apache.flink.api.java.tuple.Tuple2;
    +import org.apache.flink.api.java.utils.ParameterTool;
    +import org.apache.flink.configuration.Configuration;
    +import org.apache.flink.configuration.RestOptions;
    +import org.apache.flink.runtime.blob.TransientBlobKey;
    +import org.apache.flink.runtime.blob.TransientBlobService;
    +import org.apache.flink.runtime.concurrent.Executors;
    +import org.apache.flink.runtime.dispatcher.DispatcherGateway;
    +import org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint;
    +import org.apache.flink.runtime.jobgraph.JobStatus;
    +import org.apache.flink.runtime.leaderelection.LeaderContender;
    +import org.apache.flink.runtime.leaderelection.LeaderElectionService;
    +import org.apache.flink.runtime.messages.webmonitor.JobIdsWithStatusOverview;
    +import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
    +import org.apache.flink.runtime.rest.HttpMethodWrapper;
    +import org.apache.flink.runtime.rest.RestServerEndpoint;
    +import org.apache.flink.runtime.rest.RestServerEndpointConfiguration;
    +import org.apache.flink.runtime.rest.handler.RestHandlerConfiguration;
    +import org.apache.flink.runtime.rest.handler.RestHandlerSpecification;
    +import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
    +import org.apache.flink.runtime.rest.messages.MessageHeaders;
    +import org.apache.flink.runtime.rest.messages.job.JobDetailsInfo;
    +import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagersInfo;
    +import org.apache.flink.runtime.rpc.FatalErrorHandler;
    +import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
    +import org.apache.flink.runtime.webmonitor.retriever.MetricQueryServiceRetriever;
    +import org.apache.flink.runtime.webmonitor.testutils.HttpTestClient;
    +import org.apache.flink.util.ConfigurationException;
    +import org.apache.flink.util.Preconditions;
    +
    +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
    +import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandler;
    +
    +import java.io.File;
    +import java.io.IOException;
    +import java.io.InputStream;
    +import java.util.Comparator;
    +import java.util.HashMap;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.UUID;
    +import java.util.concurrent.CompletableFuture;
    +import java.util.concurrent.Executor;
    +import java.util.concurrent.TimeUnit;
    +import java.util.concurrent.TimeoutException;
    +import java.util.regex.Pattern;
    +import java.util.stream.Collectors;
    +
    +import scala.concurrent.duration.FiniteDuration;
    +
    +/**
    + * Rest API test suite.
    + */
    +public class RestApiTestSuite {
    +	private static final FiniteDuration TEST_TIMEOUT = new FiniteDuration(10L, TimeUnit.SECONDS);
    +	private static final ObjectMapper MAPPER = new ObjectMapper();
    +
    +	private static int testSuccessCount = 0;
    +	private static int testFailureCount = 0;
    +	private static int testSkipCount = 0;
    +
    +	public static void main(String[] args) throws Exception {
    +
    +		ParameterTool params = ParameterTool.fromArgs(args);
    +		final String host = params.get("host", "localhost");
    +		final int port = params.getInt("port", 8081);
    +		final HttpTestClient httpClient = new HttpTestClient(host, port);
    +
    +		// Validate Flink cluster is running
    +		JobIdsWithStatusOverview jobOverview = getJobOverview(httpClient);
    +
    +		// Get necessary parameters for testing GET endpoints
    +		Map<String, String> parameterMap = getParameterMaps(httpClient, jobOverview);
    +
    +		// Get list of endpoints
    +		List<MessageHeaders> specs = new E2ETestDispatcherRestEndpoint().getSpecs();
    +		specs.forEach(spec -> testMonitoringEndpointSpecs(httpClient, spec, parameterMap));
    +
    +		if (testFailureCount != 0) {
    +			throw new RuntimeException("There are test failures. Success: " + testSuccessCount +
    +				" Failures: " + testFailureCount + " Skipped: " + testSkipCount);
    +		}
    +	}
    +
    +	@SuppressWarnings("ConstantConditions")
    +	private static Map<String, String> getParameterMaps(HttpTestClient httpClient,
    +		JobIdsWithStatusOverview jobOverview) throws InterruptedException, IOException, TimeoutException {
    +		// Get necessary parameters used for all REST API testings.
    +		final Map<String, String> parameterMap = new HashMap<>();
    +		Preconditions.checkState(jobOverview.getJobsWithStatus().stream()
    --- End diff --
    
    May be better to make this an assertion, effectively we are relying on the REST API functionality here already.


---

[GitHub] flink pull request #5849: [FLINK-8986][e2e-test] Flink end to end test REST ...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5849#discussion_r181669905
  
    --- Diff: flink-end-to-end-tests/flink-rest-api-test/src/main/java/org/apache/flink/runtime/rest/tests/RestApiTestSuite.java ---
    @@ -0,0 +1,363 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.rest.tests;
    +
    +import org.apache.flink.api.common.JobID;
    +import org.apache.flink.api.java.tuple.Tuple2;
    +import org.apache.flink.api.java.utils.ParameterTool;
    +import org.apache.flink.configuration.Configuration;
    +import org.apache.flink.configuration.RestOptions;
    +import org.apache.flink.runtime.blob.TransientBlobKey;
    +import org.apache.flink.runtime.blob.TransientBlobService;
    +import org.apache.flink.runtime.concurrent.Executors;
    +import org.apache.flink.runtime.dispatcher.DispatcherGateway;
    +import org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint;
    +import org.apache.flink.runtime.jobgraph.JobStatus;
    +import org.apache.flink.runtime.leaderelection.LeaderContender;
    +import org.apache.flink.runtime.leaderelection.LeaderElectionService;
    +import org.apache.flink.runtime.messages.webmonitor.JobIdsWithStatusOverview;
    +import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
    +import org.apache.flink.runtime.rest.HttpMethodWrapper;
    +import org.apache.flink.runtime.rest.RestServerEndpoint;
    +import org.apache.flink.runtime.rest.RestServerEndpointConfiguration;
    +import org.apache.flink.runtime.rest.handler.RestHandlerConfiguration;
    +import org.apache.flink.runtime.rest.handler.RestHandlerSpecification;
    +import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
    +import org.apache.flink.runtime.rest.messages.MessageHeaders;
    +import org.apache.flink.runtime.rest.messages.job.JobDetailsInfo;
    +import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagersInfo;
    +import org.apache.flink.runtime.rpc.FatalErrorHandler;
    +import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
    +import org.apache.flink.runtime.webmonitor.retriever.MetricQueryServiceRetriever;
    +import org.apache.flink.runtime.webmonitor.testutils.HttpTestClient;
    +import org.apache.flink.util.ConfigurationException;
    +import org.apache.flink.util.Preconditions;
    +
    +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
    +import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandler;
    +
    +import java.io.File;
    +import java.io.IOException;
    +import java.io.InputStream;
    +import java.util.Comparator;
    +import java.util.HashMap;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.UUID;
    +import java.util.concurrent.CompletableFuture;
    +import java.util.concurrent.Executor;
    +import java.util.concurrent.TimeUnit;
    +import java.util.concurrent.TimeoutException;
    +import java.util.regex.Pattern;
    +import java.util.stream.Collectors;
    +
    +import scala.concurrent.duration.FiniteDuration;
    +
    +/**
    + * Rest API test suite.
    + */
    +public class RestApiTestSuite {
    +	private static final FiniteDuration TEST_TIMEOUT = new FiniteDuration(10L, TimeUnit.SECONDS);
    +	private static final ObjectMapper MAPPER = new ObjectMapper();
    +
    +	private static int testSuccessCount = 0;
    +	private static int testFailureCount = 0;
    +	private static int testSkipCount = 0;
    +
    +	public static void main(String[] args) throws Exception {
    +
    +		ParameterTool params = ParameterTool.fromArgs(args);
    +		final String host = params.get("host", "localhost");
    +		final int port = params.getInt("port", 8081);
    +		final HttpTestClient httpClient = new HttpTestClient(host, port);
    +
    +		// Validate Flink cluster is running
    +		JobIdsWithStatusOverview jobOverview = getJobOverview(httpClient);
    +
    +		// Get necessary parameters for testing GET endpoints
    +		Map<String, String> parameterMap = getParameterMaps(httpClient, jobOverview);
    +
    +		// Get list of endpoints
    +		List<MessageHeaders> specs = new E2ETestDispatcherRestEndpoint().getSpecs();
    +		specs.forEach(spec -> testMonitoringEndpointSpecs(httpClient, spec, parameterMap));
    +
    +		if (testFailureCount != 0) {
    +			throw new RuntimeException("There are test failures. Success: " + testSuccessCount +
    +				" Failures: " + testFailureCount + " Skipped: " + testSkipCount);
    +		}
    +	}
    +
    +	@SuppressWarnings("ConstantConditions")
    +	private static Map<String, String> getParameterMaps(HttpTestClient httpClient,
    +		JobIdsWithStatusOverview jobOverview) throws InterruptedException, IOException, TimeoutException {
    +		// Get necessary parameters used for all REST API testings.
    +		final Map<String, String> parameterMap = new HashMap<>();
    +		Preconditions.checkState(jobOverview.getJobsWithStatus().stream()
    +			.filter(jobIdWithStatus -> jobIdWithStatus.getJobStatus() == JobStatus.RUNNING)
    +			.count() >= 1, "Cannot found active running jobs, discontinuing test!");
    +		String jobId = jobOverview.getJobsWithStatus().stream()
    +			.filter(jobIdWithStatus -> jobIdWithStatus.getJobStatus() == JobStatus.RUNNING)
    +			.findFirst().get().getJobId().toString();
    +		parameterMap.put(":jobid", jobId);
    +
    +		JobDetailsInfo jobDetailsInfo = getJobDetailInfo(httpClient, jobId);
    +		String vertexId = jobDetailsInfo.getJobVertexInfos().stream()
    +			.findFirst().get().getJobVertexID().toString();
    +		parameterMap.put(":vertexid", vertexId);
    +		parameterMap.put(":checkpointid", "1"); // test first checkpoint
    +		parameterMap.put(":subtaskindex", "0"); // test first subtask
    +
    +		TaskManagersInfo taskManagersInfo = getTaskManagers(httpClient);
    +		String taskMgrId = taskManagersInfo.getTaskManagerInfos().stream().findFirst().get().getResourceId().toString();
    +		parameterMap.put(":taskmanagerid", taskMgrId);
    +		parameterMap.put(":triggerid", "");
    +
    +		return parameterMap;
    +	}
    +
    +	private static JobIdsWithStatusOverview getJobOverview(HttpTestClient httpClient)
    +		throws TimeoutException, InterruptedException, IOException {
    +		httpClient.sendGetRequest("/jobs", TEST_TIMEOUT);
    +		HttpTestClient.SimpleHttpResponse resp = httpClient.getNextResponse();
    +		Preconditions.checkState(resp.getStatus().code() == 200,
    +			"Cannot fetch Flink cluster status!");
    +		return MAPPER.readValue(resp.getContent(), JobIdsWithStatusOverview.class);
    +	}
    +
    +	private static JobDetailsInfo getJobDetailInfo(HttpTestClient httpClient, String jobId)
    +		throws TimeoutException, InterruptedException, IOException {
    +			httpClient.sendGetRequest("/jobs/" + jobId, TEST_TIMEOUT);
    +			HttpTestClient.SimpleHttpResponse resp = httpClient.getNextResponse();
    +			Preconditions.checkState(resp.getStatus().code() == 200,
    +				"Cannot fetch job detail information for job " + jobId);
    +			return MAPPER.readValue(resp.getContent(), JobDetailsInfo.class);
    +	}
    +
    +	private static TaskManagersInfo getTaskManagers(HttpTestClient httpClient)
    +		throws TimeoutException, InterruptedException, IOException {
    +		httpClient.sendGetRequest("/taskmanagers", TEST_TIMEOUT);
    +		HttpTestClient.SimpleHttpResponse resp = httpClient.getNextResponse();
    +		Preconditions.checkState(resp.getStatus().code() == 200,
    +			"Cannot fetch task manager status!");
    +		return MAPPER.readValue(resp.getContent(), TaskManagersInfo.class);
    +	}
    +
    +	private static void testMonitoringEndpointSpecs(HttpTestClient httpClient, MessageHeaders spec,
    +		Map<String, String> parameterMap) {
    +		try {
    +			HttpMethodWrapper method = spec.getHttpMethod();
    +			String path = getRestEndpointPath(spec.getTargetRestEndpointURL(), parameterMap);
    +			if (spec.getRequestClass() == EmptyRequestBody.class) {
    +				switch (method) {
    +					case GET:
    +						httpClient.sendGetRequest(path, TEST_TIMEOUT);
    +						break;
    +					case DELETE:
    +						httpClient.sendDeleteRequest(path, TEST_TIMEOUT);
    +						break;
    +					default:
    +						throw new UnsupportedOperationException("Cannot handle REST Test for " + path +
    +						" with method " + method + ". Only GET and DELETE requests are supported!");
    +				}
    +
    +				HttpTestClient.SimpleHttpResponse resp = httpClient.getNextResponse();
    +
    +				Preconditions.checkState(resp.getStatus().code() == spec.getResponseStatusCode().code(),
    +					"Found mismatching status code from endpoint " + path + " with method " + method +
    +					", expecting: " + spec.getResponseStatusCode().code() + ", but was: " + resp.getStatus().code());
    +				// System.out.println("Found matching status for endpoint " + path + " with method  " + method);
    --- End diff --
    
    remove


---

[GitHub] flink pull request #5849: [FLINK-8986][e2e-test] Flink end to end test REST ...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5849#discussion_r181668393
  
    --- Diff: flink-end-to-end-tests/flink-rest-api-test/src/main/java/org/apache/flink/runtime/rest/tests/RestApiTestSuite.java ---
    @@ -0,0 +1,363 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.rest.tests;
    +
    +import org.apache.flink.api.common.JobID;
    +import org.apache.flink.api.java.tuple.Tuple2;
    +import org.apache.flink.api.java.utils.ParameterTool;
    +import org.apache.flink.configuration.Configuration;
    +import org.apache.flink.configuration.RestOptions;
    +import org.apache.flink.runtime.blob.TransientBlobKey;
    +import org.apache.flink.runtime.blob.TransientBlobService;
    +import org.apache.flink.runtime.concurrent.Executors;
    +import org.apache.flink.runtime.dispatcher.DispatcherGateway;
    +import org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint;
    +import org.apache.flink.runtime.jobgraph.JobStatus;
    +import org.apache.flink.runtime.leaderelection.LeaderContender;
    +import org.apache.flink.runtime.leaderelection.LeaderElectionService;
    +import org.apache.flink.runtime.messages.webmonitor.JobIdsWithStatusOverview;
    +import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
    +import org.apache.flink.runtime.rest.HttpMethodWrapper;
    +import org.apache.flink.runtime.rest.RestServerEndpoint;
    +import org.apache.flink.runtime.rest.RestServerEndpointConfiguration;
    +import org.apache.flink.runtime.rest.handler.RestHandlerConfiguration;
    +import org.apache.flink.runtime.rest.handler.RestHandlerSpecification;
    +import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
    +import org.apache.flink.runtime.rest.messages.MessageHeaders;
    +import org.apache.flink.runtime.rest.messages.job.JobDetailsInfo;
    +import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagersInfo;
    +import org.apache.flink.runtime.rpc.FatalErrorHandler;
    +import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
    +import org.apache.flink.runtime.webmonitor.retriever.MetricQueryServiceRetriever;
    +import org.apache.flink.runtime.webmonitor.testutils.HttpTestClient;
    +import org.apache.flink.util.ConfigurationException;
    +import org.apache.flink.util.Preconditions;
    +
    +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
    +import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandler;
    +
    +import java.io.File;
    +import java.io.IOException;
    +import java.io.InputStream;
    +import java.util.Comparator;
    +import java.util.HashMap;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.UUID;
    +import java.util.concurrent.CompletableFuture;
    +import java.util.concurrent.Executor;
    +import java.util.concurrent.TimeUnit;
    +import java.util.concurrent.TimeoutException;
    +import java.util.regex.Pattern;
    +import java.util.stream.Collectors;
    +
    +import scala.concurrent.duration.FiniteDuration;
    +
    +/**
    + * Rest API test suite.
    + */
    +public class RestApiTestSuite {
    +	private static final FiniteDuration TEST_TIMEOUT = new FiniteDuration(10L, TimeUnit.SECONDS);
    +	private static final ObjectMapper MAPPER = new ObjectMapper();
    +
    +	private static int testSuccessCount = 0;
    +	private static int testFailureCount = 0;
    +	private static int testSkipCount = 0;
    +
    +	public static void main(String[] args) throws Exception {
    +
    +		ParameterTool params = ParameterTool.fromArgs(args);
    +		final String host = params.get("host", "localhost");
    +		final int port = params.getInt("port", 8081);
    +		final HttpTestClient httpClient = new HttpTestClient(host, port);
    +
    +		// Validate Flink cluster is running
    +		JobIdsWithStatusOverview jobOverview = getJobOverview(httpClient);
    +
    +		// Get necessary parameters for testing GET endpoints
    +		Map<String, String> parameterMap = getParameterMaps(httpClient, jobOverview);
    +
    +		// Get list of endpoints
    +		List<MessageHeaders> specs = new E2ETestDispatcherRestEndpoint().getSpecs();
    +		specs.forEach(spec -> testMonitoringEndpointSpecs(httpClient, spec, parameterMap));
    +
    +		if (testFailureCount != 0) {
    +			throw new RuntimeException("There are test failures. Success: " + testSuccessCount +
    +				" Failures: " + testFailureCount + " Skipped: " + testSkipCount);
    +		}
    +	}
    +
    +	@SuppressWarnings("ConstantConditions")
    +	private static Map<String, String> getParameterMaps(HttpTestClient httpClient,
    +		JobIdsWithStatusOverview jobOverview) throws InterruptedException, IOException, TimeoutException {
    +		// Get necessary parameters used for all REST API testings.
    +		final Map<String, String> parameterMap = new HashMap<>();
    +		Preconditions.checkState(jobOverview.getJobsWithStatus().stream()
    +			.filter(jobIdWithStatus -> jobIdWithStatus.getJobStatus() == JobStatus.RUNNING)
    +			.count() >= 1, "Cannot found active running jobs, discontinuing test!");
    +		String jobId = jobOverview.getJobsWithStatus().stream()
    +			.filter(jobIdWithStatus -> jobIdWithStatus.getJobStatus() == JobStatus.RUNNING)
    +			.findFirst().get().getJobId().toString();
    --- End diff --
    
    use the Optional returned by `findFirst` instead of checking the condition beforehand.


---

[GitHub] flink pull request #5849: [FLINK-8986][e2e-test] Flink end to end test REST ...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5849#discussion_r181688711
  
    --- Diff: flink-end-to-end-tests/test-scripts/test_rest_api.sh ---
    @@ -0,0 +1,44 @@
    +#!/usr/bin/env bash
    +################################################################################
    +# Licensed to the Apache Software Foundation (ASF) under one
    +# or more contributor license agreements.  See the NOTICE file
    +# distributed with this work for additional information
    +# regarding copyright ownership.  The ASF licenses this file
    +# to you under the Apache License, Version 2.0 (the
    +# "License"); you may not use this file except in compliance
    +# with the License.  You may obtain a copy of the License at
    +#
    +#     http://www.apache.org/licenses/LICENSE-2.0
    +#
    +# Unless required by applicable law or agreed to in writing, software
    +# distributed under the License is distributed on an "AS IS" BASIS,
    +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    +# See the License for the specific language governing permissions and
    +# limitations under the License.
    +################################################################################
    +
    +source "$(dirname "$0")"/common.sh
    +
    +TEST_JAR_JAR=$TEST_INFRA_DIR/../../flink-end-to-end-tests/flink-rest-api-test/target/PeriodicStreamingJob.jar
    +TEST_PROGRAM_JAR=$TEST_INFRA_DIR/../../flink-end-to-end-tests/flink-rest-api-test/target/RestApiTestSuite.jar
    +
    +echo "Run Rest-Api-Test Program"
    +
    +start_cluster
    +$FLINK_DIR/bin/taskmanager.sh start
    +$FLINK_DIR/bin/taskmanager.sh start
    +$FLINK_DIR/bin/taskmanager.sh start
    +
    +# Start periodic streaming job to test against
    +$FLINK_DIR/bin/flink run -p 4 $TEST_JAR_JAR -outputPath file://${TEST_DATA_DIR}/out/result &
    +
    +# Wait for the job to come up
    +sleep 5s
    --- End diff --
    
    we can handle this more efficiently in the java program by polling the REST API


---

[GitHub] flink pull request #5849: [FLINK-8986][e2e-test] Flink end to end test REST ...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5849#discussion_r181687083
  
    --- Diff: flink-end-to-end-tests/flink-rest-api-test/src/main/java/org/apache/flink/runtime/rest/tests/RestApiTestSuite.java ---
    @@ -0,0 +1,363 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.rest.tests;
    +
    +import org.apache.flink.api.common.JobID;
    +import org.apache.flink.api.java.tuple.Tuple2;
    +import org.apache.flink.api.java.utils.ParameterTool;
    +import org.apache.flink.configuration.Configuration;
    +import org.apache.flink.configuration.RestOptions;
    +import org.apache.flink.runtime.blob.TransientBlobKey;
    +import org.apache.flink.runtime.blob.TransientBlobService;
    +import org.apache.flink.runtime.concurrent.Executors;
    +import org.apache.flink.runtime.dispatcher.DispatcherGateway;
    +import org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint;
    +import org.apache.flink.runtime.jobgraph.JobStatus;
    +import org.apache.flink.runtime.leaderelection.LeaderContender;
    +import org.apache.flink.runtime.leaderelection.LeaderElectionService;
    +import org.apache.flink.runtime.messages.webmonitor.JobIdsWithStatusOverview;
    +import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
    +import org.apache.flink.runtime.rest.HttpMethodWrapper;
    +import org.apache.flink.runtime.rest.RestServerEndpoint;
    +import org.apache.flink.runtime.rest.RestServerEndpointConfiguration;
    +import org.apache.flink.runtime.rest.handler.RestHandlerConfiguration;
    +import org.apache.flink.runtime.rest.handler.RestHandlerSpecification;
    +import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
    +import org.apache.flink.runtime.rest.messages.MessageHeaders;
    +import org.apache.flink.runtime.rest.messages.job.JobDetailsInfo;
    +import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagersInfo;
    +import org.apache.flink.runtime.rpc.FatalErrorHandler;
    +import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
    +import org.apache.flink.runtime.webmonitor.retriever.MetricQueryServiceRetriever;
    +import org.apache.flink.runtime.webmonitor.testutils.HttpTestClient;
    +import org.apache.flink.util.ConfigurationException;
    +import org.apache.flink.util.Preconditions;
    +
    +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
    +import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandler;
    +
    +import java.io.File;
    +import java.io.IOException;
    +import java.io.InputStream;
    +import java.util.Comparator;
    +import java.util.HashMap;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.UUID;
    +import java.util.concurrent.CompletableFuture;
    +import java.util.concurrent.Executor;
    +import java.util.concurrent.TimeUnit;
    +import java.util.concurrent.TimeoutException;
    +import java.util.regex.Pattern;
    +import java.util.stream.Collectors;
    +
    +import scala.concurrent.duration.FiniteDuration;
    +
    +/**
    + * Rest API test suite.
    + */
    +public class RestApiTestSuite {
    +	private static final FiniteDuration TEST_TIMEOUT = new FiniteDuration(10L, TimeUnit.SECONDS);
    +	private static final ObjectMapper MAPPER = new ObjectMapper();
    +
    +	private static int testSuccessCount = 0;
    +	private static int testFailureCount = 0;
    +	private static int testSkipCount = 0;
    +
    +	public static void main(String[] args) throws Exception {
    +
    +		ParameterTool params = ParameterTool.fromArgs(args);
    +		final String host = params.get("host", "localhost");
    +		final int port = params.getInt("port", 8081);
    +		final HttpTestClient httpClient = new HttpTestClient(host, port);
    +
    +		// Validate Flink cluster is running
    +		JobIdsWithStatusOverview jobOverview = getJobOverview(httpClient);
    +
    +		// Get necessary parameters for testing GET endpoints
    +		Map<String, String> parameterMap = getParameterMaps(httpClient, jobOverview);
    +
    +		// Get list of endpoints
    +		List<MessageHeaders> specs = new E2ETestDispatcherRestEndpoint().getSpecs();
    +		specs.forEach(spec -> testMonitoringEndpointSpecs(httpClient, spec, parameterMap));
    +
    +		if (testFailureCount != 0) {
    +			throw new RuntimeException("There are test failures. Success: " + testSuccessCount +
    +				" Failures: " + testFailureCount + " Skipped: " + testSkipCount);
    +		}
    +	}
    +
    +	@SuppressWarnings("ConstantConditions")
    +	private static Map<String, String> getParameterMaps(HttpTestClient httpClient,
    +		JobIdsWithStatusOverview jobOverview) throws InterruptedException, IOException, TimeoutException {
    +		// Get necessary parameters used for all REST API testings.
    +		final Map<String, String> parameterMap = new HashMap<>();
    +		Preconditions.checkState(jobOverview.getJobsWithStatus().stream()
    +			.filter(jobIdWithStatus -> jobIdWithStatus.getJobStatus() == JobStatus.RUNNING)
    +			.count() >= 1, "Cannot found active running jobs, discontinuing test!");
    +		String jobId = jobOverview.getJobsWithStatus().stream()
    +			.filter(jobIdWithStatus -> jobIdWithStatus.getJobStatus() == JobStatus.RUNNING)
    +			.findFirst().get().getJobId().toString();
    +		parameterMap.put(":jobid", jobId);
    +
    +		JobDetailsInfo jobDetailsInfo = getJobDetailInfo(httpClient, jobId);
    +		String vertexId = jobDetailsInfo.getJobVertexInfos().stream()
    +			.findFirst().get().getJobVertexID().toString();
    +		parameterMap.put(":vertexid", vertexId);
    +		parameterMap.put(":checkpointid", "1"); // test first checkpoint
    +		parameterMap.put(":subtaskindex", "0"); // test first subtask
    +
    +		TaskManagersInfo taskManagersInfo = getTaskManagers(httpClient);
    +		String taskMgrId = taskManagersInfo.getTaskManagerInfos().stream().findFirst().get().getResourceId().toString();
    +		parameterMap.put(":taskmanagerid", taskMgrId);
    +		parameterMap.put(":triggerid", "");
    +
    +		return parameterMap;
    +	}
    +
    +	private static JobIdsWithStatusOverview getJobOverview(HttpTestClient httpClient)
    +		throws TimeoutException, InterruptedException, IOException {
    +		httpClient.sendGetRequest("/jobs", TEST_TIMEOUT);
    +		HttpTestClient.SimpleHttpResponse resp = httpClient.getNextResponse();
    +		Preconditions.checkState(resp.getStatus().code() == 200,
    +			"Cannot fetch Flink cluster status!");
    +		return MAPPER.readValue(resp.getContent(), JobIdsWithStatusOverview.class);
    +	}
    +
    +	private static JobDetailsInfo getJobDetailInfo(HttpTestClient httpClient, String jobId)
    +		throws TimeoutException, InterruptedException, IOException {
    +			httpClient.sendGetRequest("/jobs/" + jobId, TEST_TIMEOUT);
    --- End diff --
    
    indentation is off


---

[GitHub] flink pull request #5849: [FLINK-8986][e2e-test] Flink end to end test REST ...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5849#discussion_r181687893
  
    --- Diff: flink-end-to-end-tests/flink-rest-api-test/src/main/java/org/apache/flink/runtime/rest/tests/RestApiTestSuite.java ---
    @@ -0,0 +1,363 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.rest.tests;
    +
    +import org.apache.flink.api.common.JobID;
    +import org.apache.flink.api.java.tuple.Tuple2;
    +import org.apache.flink.api.java.utils.ParameterTool;
    +import org.apache.flink.configuration.Configuration;
    +import org.apache.flink.configuration.RestOptions;
    +import org.apache.flink.runtime.blob.TransientBlobKey;
    +import org.apache.flink.runtime.blob.TransientBlobService;
    +import org.apache.flink.runtime.concurrent.Executors;
    +import org.apache.flink.runtime.dispatcher.DispatcherGateway;
    +import org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint;
    +import org.apache.flink.runtime.jobgraph.JobStatus;
    +import org.apache.flink.runtime.leaderelection.LeaderContender;
    +import org.apache.flink.runtime.leaderelection.LeaderElectionService;
    +import org.apache.flink.runtime.messages.webmonitor.JobIdsWithStatusOverview;
    +import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
    +import org.apache.flink.runtime.rest.HttpMethodWrapper;
    +import org.apache.flink.runtime.rest.RestServerEndpoint;
    +import org.apache.flink.runtime.rest.RestServerEndpointConfiguration;
    +import org.apache.flink.runtime.rest.handler.RestHandlerConfiguration;
    +import org.apache.flink.runtime.rest.handler.RestHandlerSpecification;
    +import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
    +import org.apache.flink.runtime.rest.messages.MessageHeaders;
    +import org.apache.flink.runtime.rest.messages.job.JobDetailsInfo;
    +import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagersInfo;
    +import org.apache.flink.runtime.rpc.FatalErrorHandler;
    +import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
    +import org.apache.flink.runtime.webmonitor.retriever.MetricQueryServiceRetriever;
    +import org.apache.flink.runtime.webmonitor.testutils.HttpTestClient;
    +import org.apache.flink.util.ConfigurationException;
    +import org.apache.flink.util.Preconditions;
    +
    +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
    +import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandler;
    +
    +import java.io.File;
    +import java.io.IOException;
    +import java.io.InputStream;
    +import java.util.Comparator;
    +import java.util.HashMap;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.UUID;
    +import java.util.concurrent.CompletableFuture;
    +import java.util.concurrent.Executor;
    +import java.util.concurrent.TimeUnit;
    +import java.util.concurrent.TimeoutException;
    +import java.util.regex.Pattern;
    +import java.util.stream.Collectors;
    +
    +import scala.concurrent.duration.FiniteDuration;
    +
    +/**
    + * Rest API test suite.
    + */
    +public class RestApiTestSuite {
    +	private static final FiniteDuration TEST_TIMEOUT = new FiniteDuration(10L, TimeUnit.SECONDS);
    +	private static final ObjectMapper MAPPER = new ObjectMapper();
    +
    +	private static int testSuccessCount = 0;
    +	private static int testFailureCount = 0;
    +	private static int testSkipCount = 0;
    +
    +	public static void main(String[] args) throws Exception {
    +
    +		ParameterTool params = ParameterTool.fromArgs(args);
    +		final String host = params.get("host", "localhost");
    +		final int port = params.getInt("port", 8081);
    +		final HttpTestClient httpClient = new HttpTestClient(host, port);
    +
    +		// Validate Flink cluster is running
    +		JobIdsWithStatusOverview jobOverview = getJobOverview(httpClient);
    +
    +		// Get necessary parameters for testing GET endpoints
    +		Map<String, String> parameterMap = getParameterMaps(httpClient, jobOverview);
    +
    +		// Get list of endpoints
    +		List<MessageHeaders> specs = new E2ETestDispatcherRestEndpoint().getSpecs();
    +		specs.forEach(spec -> testMonitoringEndpointSpecs(httpClient, spec, parameterMap));
    +
    +		if (testFailureCount != 0) {
    +			throw new RuntimeException("There are test failures. Success: " + testSuccessCount +
    +				" Failures: " + testFailureCount + " Skipped: " + testSkipCount);
    +		}
    +	}
    +
    +	@SuppressWarnings("ConstantConditions")
    +	private static Map<String, String> getParameterMaps(HttpTestClient httpClient,
    +		JobIdsWithStatusOverview jobOverview) throws InterruptedException, IOException, TimeoutException {
    +		// Get necessary parameters used for all REST API testings.
    +		final Map<String, String> parameterMap = new HashMap<>();
    +		Preconditions.checkState(jobOverview.getJobsWithStatus().stream()
    +			.filter(jobIdWithStatus -> jobIdWithStatus.getJobStatus() == JobStatus.RUNNING)
    +			.count() >= 1, "Cannot found active running jobs, discontinuing test!");
    +		String jobId = jobOverview.getJobsWithStatus().stream()
    +			.filter(jobIdWithStatus -> jobIdWithStatus.getJobStatus() == JobStatus.RUNNING)
    +			.findFirst().get().getJobId().toString();
    +		parameterMap.put(":jobid", jobId);
    +
    +		JobDetailsInfo jobDetailsInfo = getJobDetailInfo(httpClient, jobId);
    +		String vertexId = jobDetailsInfo.getJobVertexInfos().stream()
    +			.findFirst().get().getJobVertexID().toString();
    +		parameterMap.put(":vertexid", vertexId);
    +		parameterMap.put(":checkpointid", "1"); // test first checkpoint
    +		parameterMap.put(":subtaskindex", "0"); // test first subtask
    +
    +		TaskManagersInfo taskManagersInfo = getTaskManagers(httpClient);
    +		String taskMgrId = taskManagersInfo.getTaskManagerInfos().stream().findFirst().get().getResourceId().toString();
    +		parameterMap.put(":taskmanagerid", taskMgrId);
    +		parameterMap.put(":triggerid", "");
    +
    +		return parameterMap;
    +	}
    +
    +	private static JobIdsWithStatusOverview getJobOverview(HttpTestClient httpClient)
    +		throws TimeoutException, InterruptedException, IOException {
    +		httpClient.sendGetRequest("/jobs", TEST_TIMEOUT);
    +		HttpTestClient.SimpleHttpResponse resp = httpClient.getNextResponse();
    +		Preconditions.checkState(resp.getStatus().code() == 200,
    +			"Cannot fetch Flink cluster status!");
    +		return MAPPER.readValue(resp.getContent(), JobIdsWithStatusOverview.class);
    +	}
    +
    +	private static JobDetailsInfo getJobDetailInfo(HttpTestClient httpClient, String jobId)
    +		throws TimeoutException, InterruptedException, IOException {
    +			httpClient.sendGetRequest("/jobs/" + jobId, TEST_TIMEOUT);
    +			HttpTestClient.SimpleHttpResponse resp = httpClient.getNextResponse();
    +			Preconditions.checkState(resp.getStatus().code() == 200,
    +				"Cannot fetch job detail information for job " + jobId);
    +			return MAPPER.readValue(resp.getContent(), JobDetailsInfo.class);
    +	}
    +
    +	private static TaskManagersInfo getTaskManagers(HttpTestClient httpClient)
    +		throws TimeoutException, InterruptedException, IOException {
    +		httpClient.sendGetRequest("/taskmanagers", TEST_TIMEOUT);
    +		HttpTestClient.SimpleHttpResponse resp = httpClient.getNextResponse();
    +		Preconditions.checkState(resp.getStatus().code() == 200,
    +			"Cannot fetch task manager status!");
    +		return MAPPER.readValue(resp.getContent(), TaskManagersInfo.class);
    +	}
    +
    +	private static void testMonitoringEndpointSpecs(HttpTestClient httpClient, MessageHeaders spec,
    +		Map<String, String> parameterMap) {
    +		try {
    +			HttpMethodWrapper method = spec.getHttpMethod();
    +			String path = getRestEndpointPath(spec.getTargetRestEndpointURL(), parameterMap);
    +			if (spec.getRequestClass() == EmptyRequestBody.class) {
    +				switch (method) {
    +					case GET:
    +						httpClient.sendGetRequest(path, TEST_TIMEOUT);
    +						break;
    +					case DELETE:
    +						httpClient.sendDeleteRequest(path, TEST_TIMEOUT);
    +						break;
    +					default:
    +						throw new UnsupportedOperationException("Cannot handle REST Test for " + path +
    +						" with method " + method + ". Only GET and DELETE requests are supported!");
    +				}
    +
    +				HttpTestClient.SimpleHttpResponse resp = httpClient.getNextResponse();
    +
    +				Preconditions.checkState(resp.getStatus().code() == spec.getResponseStatusCode().code(),
    +					"Found mismatching status code from endpoint " + path + " with method " + method +
    +					", expecting: " + spec.getResponseStatusCode().code() + ", but was: " + resp.getStatus().code());
    +				// System.out.println("Found matching status for endpoint " + path + " with method  " + method);
    +				@SuppressWarnings("unchecked")
    +				Object responseObject = MAPPER.readValue(resp.getContent(), spec.getResponseClass());
    +				Preconditions.checkNotNull(responseObject);
    +				testSuccessCount += 1;
    +			} else {
    +				throw new UnsupportedOperationException("Cannot handle REST Test for " + path +
    +					" with method " + method + ". Non-empty payload is not supported!");
    +			}
    +		} catch (IOException |
    --- End diff --
    
    can be simplified to
    ```
    } catch (UnsupportedOperationException e) {
    	testSkipCount += 1;
    	e.printStackTrace();
    } catch (Exception e) {
    	testFailureCount += 1;
    	e.printStackTrace();
    }
    ```


---

[GitHub] flink pull request #5849: [FLINK-8986][e2e-test] Flink end to end test REST ...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5849#discussion_r181668750
  
    --- Diff: flink-end-to-end-tests/flink-rest-api-test/src/main/java/org/apache/flink/runtime/rest/tests/RestApiTestSuite.java ---
    @@ -0,0 +1,363 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.rest.tests;
    +
    +import org.apache.flink.api.common.JobID;
    +import org.apache.flink.api.java.tuple.Tuple2;
    +import org.apache.flink.api.java.utils.ParameterTool;
    +import org.apache.flink.configuration.Configuration;
    +import org.apache.flink.configuration.RestOptions;
    +import org.apache.flink.runtime.blob.TransientBlobKey;
    +import org.apache.flink.runtime.blob.TransientBlobService;
    +import org.apache.flink.runtime.concurrent.Executors;
    +import org.apache.flink.runtime.dispatcher.DispatcherGateway;
    +import org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint;
    +import org.apache.flink.runtime.jobgraph.JobStatus;
    +import org.apache.flink.runtime.leaderelection.LeaderContender;
    +import org.apache.flink.runtime.leaderelection.LeaderElectionService;
    +import org.apache.flink.runtime.messages.webmonitor.JobIdsWithStatusOverview;
    +import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
    +import org.apache.flink.runtime.rest.HttpMethodWrapper;
    +import org.apache.flink.runtime.rest.RestServerEndpoint;
    +import org.apache.flink.runtime.rest.RestServerEndpointConfiguration;
    +import org.apache.flink.runtime.rest.handler.RestHandlerConfiguration;
    +import org.apache.flink.runtime.rest.handler.RestHandlerSpecification;
    +import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
    +import org.apache.flink.runtime.rest.messages.MessageHeaders;
    +import org.apache.flink.runtime.rest.messages.job.JobDetailsInfo;
    +import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagersInfo;
    +import org.apache.flink.runtime.rpc.FatalErrorHandler;
    +import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
    +import org.apache.flink.runtime.webmonitor.retriever.MetricQueryServiceRetriever;
    +import org.apache.flink.runtime.webmonitor.testutils.HttpTestClient;
    +import org.apache.flink.util.ConfigurationException;
    +import org.apache.flink.util.Preconditions;
    +
    +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
    +import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandler;
    +
    +import java.io.File;
    +import java.io.IOException;
    +import java.io.InputStream;
    +import java.util.Comparator;
    +import java.util.HashMap;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.UUID;
    +import java.util.concurrent.CompletableFuture;
    +import java.util.concurrent.Executor;
    +import java.util.concurrent.TimeUnit;
    +import java.util.concurrent.TimeoutException;
    +import java.util.regex.Pattern;
    +import java.util.stream.Collectors;
    +
    +import scala.concurrent.duration.FiniteDuration;
    +
    +/**
    + * Rest API test suite.
    + */
    +public class RestApiTestSuite {
    +	private static final FiniteDuration TEST_TIMEOUT = new FiniteDuration(10L, TimeUnit.SECONDS);
    +	private static final ObjectMapper MAPPER = new ObjectMapper();
    +
    +	private static int testSuccessCount = 0;
    +	private static int testFailureCount = 0;
    +	private static int testSkipCount = 0;
    +
    +	public static void main(String[] args) throws Exception {
    +
    +		ParameterTool params = ParameterTool.fromArgs(args);
    +		final String host = params.get("host", "localhost");
    +		final int port = params.getInt("port", 8081);
    +		final HttpTestClient httpClient = new HttpTestClient(host, port);
    +
    +		// Validate Flink cluster is running
    +		JobIdsWithStatusOverview jobOverview = getJobOverview(httpClient);
    +
    +		// Get necessary parameters for testing GET endpoints
    +		Map<String, String> parameterMap = getParameterMaps(httpClient, jobOverview);
    +
    +		// Get list of endpoints
    +		List<MessageHeaders> specs = new E2ETestDispatcherRestEndpoint().getSpecs();
    +		specs.forEach(spec -> testMonitoringEndpointSpecs(httpClient, spec, parameterMap));
    +
    +		if (testFailureCount != 0) {
    +			throw new RuntimeException("There are test failures. Success: " + testSuccessCount +
    +				" Failures: " + testFailureCount + " Skipped: " + testSkipCount);
    +		}
    +	}
    +
    +	@SuppressWarnings("ConstantConditions")
    +	private static Map<String, String> getParameterMaps(HttpTestClient httpClient,
    +		JobIdsWithStatusOverview jobOverview) throws InterruptedException, IOException, TimeoutException {
    +		// Get necessary parameters used for all REST API testings.
    +		final Map<String, String> parameterMap = new HashMap<>();
    +		Preconditions.checkState(jobOverview.getJobsWithStatus().stream()
    +			.filter(jobIdWithStatus -> jobIdWithStatus.getJobStatus() == JobStatus.RUNNING)
    +			.count() >= 1, "Cannot found active running jobs, discontinuing test!");
    +		String jobId = jobOverview.getJobsWithStatus().stream()
    +			.filter(jobIdWithStatus -> jobIdWithStatus.getJobStatus() == JobStatus.RUNNING)
    +			.findFirst().get().getJobId().toString();
    +		parameterMap.put(":jobid", jobId);
    +
    +		JobDetailsInfo jobDetailsInfo = getJobDetailInfo(httpClient, jobId);
    +		String vertexId = jobDetailsInfo.getJobVertexInfos().stream()
    +			.findFirst().get().getJobVertexID().toString();
    +		parameterMap.put(":vertexid", vertexId);
    +		parameterMap.put(":checkpointid", "1"); // test first checkpoint
    +		parameterMap.put(":subtaskindex", "0"); // test first subtask
    +
    +		TaskManagersInfo taskManagersInfo = getTaskManagers(httpClient);
    +		String taskMgrId = taskManagersInfo.getTaskManagerInfos().stream().findFirst().get().getResourceId().toString();
    --- End diff --
    
    let's get this parameter first; if this fails the job parameter retrieval will also fail.


---

[GitHub] flink pull request #5849: [FLINK-8986][e2e-test] Flink end to end test REST ...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5849#discussion_r181685139
  
    --- Diff: flink-end-to-end-tests/flink-rest-api-test/src/main/java/org/apache/flink/runtime/rest/tests/RestApiTestSuite.java ---
    @@ -0,0 +1,363 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.rest.tests;
    +
    +import org.apache.flink.api.common.JobID;
    +import org.apache.flink.api.java.tuple.Tuple2;
    +import org.apache.flink.api.java.utils.ParameterTool;
    +import org.apache.flink.configuration.Configuration;
    +import org.apache.flink.configuration.RestOptions;
    +import org.apache.flink.runtime.blob.TransientBlobKey;
    +import org.apache.flink.runtime.blob.TransientBlobService;
    +import org.apache.flink.runtime.concurrent.Executors;
    +import org.apache.flink.runtime.dispatcher.DispatcherGateway;
    +import org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint;
    +import org.apache.flink.runtime.jobgraph.JobStatus;
    +import org.apache.flink.runtime.leaderelection.LeaderContender;
    +import org.apache.flink.runtime.leaderelection.LeaderElectionService;
    +import org.apache.flink.runtime.messages.webmonitor.JobIdsWithStatusOverview;
    +import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
    +import org.apache.flink.runtime.rest.HttpMethodWrapper;
    +import org.apache.flink.runtime.rest.RestServerEndpoint;
    +import org.apache.flink.runtime.rest.RestServerEndpointConfiguration;
    +import org.apache.flink.runtime.rest.handler.RestHandlerConfiguration;
    +import org.apache.flink.runtime.rest.handler.RestHandlerSpecification;
    +import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
    +import org.apache.flink.runtime.rest.messages.MessageHeaders;
    +import org.apache.flink.runtime.rest.messages.job.JobDetailsInfo;
    +import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagersInfo;
    +import org.apache.flink.runtime.rpc.FatalErrorHandler;
    +import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
    +import org.apache.flink.runtime.webmonitor.retriever.MetricQueryServiceRetriever;
    +import org.apache.flink.runtime.webmonitor.testutils.HttpTestClient;
    +import org.apache.flink.util.ConfigurationException;
    +import org.apache.flink.util.Preconditions;
    +
    +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
    +import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandler;
    +
    +import java.io.File;
    +import java.io.IOException;
    +import java.io.InputStream;
    +import java.util.Comparator;
    +import java.util.HashMap;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.UUID;
    +import java.util.concurrent.CompletableFuture;
    +import java.util.concurrent.Executor;
    +import java.util.concurrent.TimeUnit;
    +import java.util.concurrent.TimeoutException;
    +import java.util.regex.Pattern;
    +import java.util.stream.Collectors;
    +
    +import scala.concurrent.duration.FiniteDuration;
    +
    +/**
    + * Rest API test suite.
    + */
    +public class RestApiTestSuite {
    +	private static final FiniteDuration TEST_TIMEOUT = new FiniteDuration(10L, TimeUnit.SECONDS);
    +	private static final ObjectMapper MAPPER = new ObjectMapper();
    +
    +	private static int testSuccessCount = 0;
    +	private static int testFailureCount = 0;
    +	private static int testSkipCount = 0;
    +
    +	public static void main(String[] args) throws Exception {
    +
    +		ParameterTool params = ParameterTool.fromArgs(args);
    +		final String host = params.get("host", "localhost");
    +		final int port = params.getInt("port", 8081);
    +		final HttpTestClient httpClient = new HttpTestClient(host, port);
    +
    +		// Validate Flink cluster is running
    +		JobIdsWithStatusOverview jobOverview = getJobOverview(httpClient);
    +
    +		// Get necessary parameters for testing GET endpoints
    +		Map<String, String> parameterMap = getParameterMaps(httpClient, jobOverview);
    +
    +		// Get list of endpoints
    +		List<MessageHeaders> specs = new E2ETestDispatcherRestEndpoint().getSpecs();
    +		specs.forEach(spec -> testMonitoringEndpointSpecs(httpClient, spec, parameterMap));
    +
    +		if (testFailureCount != 0) {
    +			throw new RuntimeException("There are test failures. Success: " + testSuccessCount +
    +				" Failures: " + testFailureCount + " Skipped: " + testSkipCount);
    +		}
    +	}
    +
    +	@SuppressWarnings("ConstantConditions")
    +	private static Map<String, String> getParameterMaps(HttpTestClient httpClient,
    +		JobIdsWithStatusOverview jobOverview) throws InterruptedException, IOException, TimeoutException {
    +		// Get necessary parameters used for all REST API testings.
    +		final Map<String, String> parameterMap = new HashMap<>();
    +		Preconditions.checkState(jobOverview.getJobsWithStatus().stream()
    +			.filter(jobIdWithStatus -> jobIdWithStatus.getJobStatus() == JobStatus.RUNNING)
    +			.count() >= 1, "Cannot found active running jobs, discontinuing test!");
    +		String jobId = jobOverview.getJobsWithStatus().stream()
    +			.filter(jobIdWithStatus -> jobIdWithStatus.getJobStatus() == JobStatus.RUNNING)
    +			.findFirst().get().getJobId().toString();
    +		parameterMap.put(":jobid", jobId);
    +
    +		JobDetailsInfo jobDetailsInfo = getJobDetailInfo(httpClient, jobId);
    +		String vertexId = jobDetailsInfo.getJobVertexInfos().stream()
    +			.findFirst().get().getJobVertexID().toString();
    +		parameterMap.put(":vertexid", vertexId);
    +		parameterMap.put(":checkpointid", "1"); // test first checkpoint
    +		parameterMap.put(":subtaskindex", "0"); // test first subtask
    +
    +		TaskManagersInfo taskManagersInfo = getTaskManagers(httpClient);
    +		String taskMgrId = taskManagersInfo.getTaskManagerInfos().stream().findFirst().get().getResourceId().toString();
    +		parameterMap.put(":taskmanagerid", taskMgrId);
    +		parameterMap.put(":triggerid", "");
    +
    +		return parameterMap;
    +	}
    +
    +	private static JobIdsWithStatusOverview getJobOverview(HttpTestClient httpClient)
    +		throws TimeoutException, InterruptedException, IOException {
    +		httpClient.sendGetRequest("/jobs", TEST_TIMEOUT);
    +		HttpTestClient.SimpleHttpResponse resp = httpClient.getNextResponse();
    +		Preconditions.checkState(resp.getStatus().code() == 200,
    +			"Cannot fetch Flink cluster status!");
    +		return MAPPER.readValue(resp.getContent(), JobIdsWithStatusOverview.class);
    +	}
    +
    +	private static JobDetailsInfo getJobDetailInfo(HttpTestClient httpClient, String jobId)
    +		throws TimeoutException, InterruptedException, IOException {
    +			httpClient.sendGetRequest("/jobs/" + jobId, TEST_TIMEOUT);
    +			HttpTestClient.SimpleHttpResponse resp = httpClient.getNextResponse();
    +			Preconditions.checkState(resp.getStatus().code() == 200,
    +				"Cannot fetch job detail information for job " + jobId);
    +			return MAPPER.readValue(resp.getContent(), JobDetailsInfo.class);
    +	}
    +
    +	private static TaskManagersInfo getTaskManagers(HttpTestClient httpClient)
    +		throws TimeoutException, InterruptedException, IOException {
    +		httpClient.sendGetRequest("/taskmanagers", TEST_TIMEOUT);
    +		HttpTestClient.SimpleHttpResponse resp = httpClient.getNextResponse();
    +		Preconditions.checkState(resp.getStatus().code() == 200,
    +			"Cannot fetch task manager status!");
    +		return MAPPER.readValue(resp.getContent(), TaskManagersInfo.class);
    +	}
    +
    +	private static void testMonitoringEndpointSpecs(HttpTestClient httpClient, MessageHeaders spec,
    +		Map<String, String> parameterMap) {
    +		try {
    +			HttpMethodWrapper method = spec.getHttpMethod();
    +			String path = getRestEndpointPath(spec.getTargetRestEndpointURL(), parameterMap);
    +			if (spec.getRequestClass() == EmptyRequestBody.class) {
    +				switch (method) {
    +					case GET:
    +						httpClient.sendGetRequest(path, TEST_TIMEOUT);
    +						break;
    +					case DELETE:
    +						httpClient.sendDeleteRequest(path, TEST_TIMEOUT);
    +						break;
    +					default:
    +						throw new UnsupportedOperationException("Cannot handle REST Test for " + path +
    +						" with method " + method + ". Only GET and DELETE requests are supported!");
    +				}
    +
    +				HttpTestClient.SimpleHttpResponse resp = httpClient.getNextResponse();
    +
    +				Preconditions.checkState(resp.getStatus().code() == spec.getResponseStatusCode().code(),
    +					"Found mismatching status code from endpoint " + path + " with method " + method +
    +					", expecting: " + spec.getResponseStatusCode().code() + ", but was: " + resp.getStatus().code());
    +				// System.out.println("Found matching status for endpoint " + path + " with method  " + method);
    +				@SuppressWarnings("unchecked")
    +				Object responseObject = MAPPER.readValue(resp.getContent(), spec.getResponseClass());
    +				Preconditions.checkNotNull(responseObject);
    +				testSuccessCount += 1;
    +			} else {
    +				throw new UnsupportedOperationException("Cannot handle REST Test for " + path +
    +					" with method " + method + ". Non-empty payload is not supported!");
    +			}
    +		} catch (IOException |
    +			TimeoutException |
    +			InterruptedException |
    +			IllegalStateException |
    +			NullPointerException e) {
    +			testFailureCount += 1;
    +			e.printStackTrace();
    +		} catch (UnsupportedOperationException e) {
    +			testSkipCount += 1;
    +			e.printStackTrace();
    +		}
    +	}
    +
    +	/**
    +	 * Replace target REST endpoint with actual parameters from job launch.
    +	 * @param url target REST endpoint pattern from {@link MessageHeaders} specs
    +	 * @param parameterMap parameter replacement map
    +	 * @return actual REST URL literal
    +	 */
    +	private static String getRestEndpointPath(String url, Map<String, String> parameterMap)
    +		throws UnsupportedOperationException {
    +		for (Map.Entry<String, String> e : parameterMap.entrySet()) {
    +			if (e.getValue().equals("")) {
    +				if (Pattern.compile(e.getKey()).matcher(url).find()) {
    +					throw new UnsupportedOperationException("No parameter replacement found for " + e.getKey());
    --- End diff --
    
    an easier way is to check for `:` in the URL. if it exists there's a parameter that wasn't replaced.


---

[GitHub] flink issue #5849: [FLINK-8986][e2e-test][WIP] Flink end to end test REST AP...

Posted by walterddr <gi...@git.apache.org>.
Github user walterddr commented on the issue:

    https://github.com/apache/flink/pull/5849
  
    Thanks @tzulitai for the suggestion. I will close this and continue with the new PR in #6054 .


---