You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by tillrohrmann <gi...@git.apache.org> on 2017/08/26 15:51:59 UTC

[GitHub] flink pull request #4598: [FLINK-7528] Create DispatcherRestEndpoint and int...

GitHub user tillrohrmann opened a pull request:

    https://github.com/apache/flink/pull/4598

    [FLINK-7528] Create DispatcherRestEndpoint and integrate with Dispatcher

    ## What is the purpose of the change
    
    This commit creates the DispatcherRestEndpoint and integrates it with the
    Dispatcher. The DispatcherRestEndpoint is created in the SessionClusterEntrypoint
    and its address is passed to the Dispatcher such that it can answer the
    requestRestAddress RPC.
    
    ## Brief change log
    
    - Create the `DispatcherRestEndpoint`
    - Start `DispatcherRestEndpoint` in `SessionClusterEntrypoint`
    - Pass in REST address to Dispatcher and JobManager
    - Resolve redirection address by directly ask leader component
    
    ## Verifying this change
    
    Verified manually that the `DispatcherRestEndpoint` is started and reachable (even though it does not serve anything yet).
    
    ## Does this pull request potentially affect one of the following parts:
    
      - Dependencies (does it add or upgrade a dependency): (no)
      - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no)
      - The serializers: (no)
      - The runtime per-record code paths (performance sensitive): (no)
      - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
    
    ## Documentation
    
      - Does this pull request introduce a new feature? (no)
      - If yes, how is the feature documented? (not applicable)
    


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/tillrohrmann/flink dispatcherRestEndpoint

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/4598.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #4598
    
----
commit 7835d0a89c8ecdd5b9661ee8c57a9d63a3ed3742
Author: Till Rohrmann <tr...@apache.org>
Date:   2017-08-14T13:47:04Z

    [FLINK-7522] Add termination future to ClusterEntrypoint
    
    The termination future is completed when the ClusterEntrypoint shuts down. This
    allows for easier testing.

commit 2cdf97f824bc62a82e65f4c160b9ad64de446de4
Author: Till Rohrmann <tr...@apache.org>
Date:   2017-08-16T12:36:13Z

    [FLINK-7457] Make Dispatcher highly available
    
    This commit introduces a dispatcher leader election and retrieval service to the
    HighAvailabilityServices. Moreover it adds code such that the Dispatcher now takes
    part in the leader election process using the afore-mentioned services.
    
    Let Dispatcher participate in leader election
    
    Add test for Dispatcher leader election

commit 04caf85d33ddfc3a4a9b788745b8282c3437d8e2
Author: Till Rohrmann <tr...@apache.org>
Date:   2017-08-10T08:56:12Z

    [FLINK-7409] [web] Make WebRuntimeMonitor reactive
    
    This commit changes the behaviour of the WebRuntimeMonitor to not longer block serving
    threads by waiting on the result of futures. Instead the RequestHandler now returns a
    CompletableFuture<FullHttpResponse> which is written out to the Netty channel upon
    completion. This will improve the performance of our WebRuntimeMonitor.

commit 4fa6dedd95555a2d1a91339ff5effda3bc2bd1d5
Author: Till Rohrmann <tr...@apache.org>
Date:   2017-08-15T10:00:58Z

    [FLINK-7458] Generalize GatewayRetriever for WebRuntimeMonitor
    
    Introduce a generalized GatewayRetriever replacing the JobManagerRetriever. The
    GatewayRetriever fulfills the same purpose as the JobManagerRetriever with the
    ability to retrieve the gateway for an arbitrary endpoint type.

commit 0f9b2ce77e20f25fc95ddeba98f863b86450a72c
Author: Till Rohrmann <tr...@apache.org>
Date:   2017-08-15T11:55:47Z

    [FLINK-7459] Generalize Flink's redirection logic
    
    Introduce RedirectHandler which can be extended to add redirection functionality to all
    SimpleInboundChannelHandlers. This allows to share the same functionality across the
    StaticFileServerHandler and the RuntimeMonitorHandlerBase which could now be removed.
    In the future, the AbstractRestHandler will also extend the RedirectHandler.

commit 88aed4f7a198b3994271088b8e19558d399ddd9d
Author: Till Rohrmann <tr...@apache.org>
Date:   2017-08-17T13:04:19Z

    [FLINK-7527] [rest] Let AbstractRestHandler extend RedirectHandler
    
    By letting the AbstractRestHandler extend the RedirectHandler, we add redirection
    capabilities to the AbstractRestHandler.

commit 9b7de1dc21b771fc10ee1661f34c142e990b424f
Author: Till Rohrmann <tr...@apache.org>
Date:   2017-08-17T13:25:33Z

    [FLINK-7528] Create DispatcherRestEndpoint and integrate with Dispatcher
    
    This commit creates the DispatcherRestEndpoint and integrates it with the
    Dispatcher. The DispatcherRestEndpoint is created in the SessionClusterEntrypoint
    and its address is passed to the Dispatcher such that it can answer the
    requestRestAddress RPC.

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4598: [FLINK-7528] Create DispatcherRestEndpoint and int...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/flink/pull/4598


---

[GitHub] flink pull request #4598: [FLINK-7528] Create DispatcherRestEndpoint and int...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4598#discussion_r139623946
  
    --- Diff: flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionCapacitySchedulerITCase.java ---
    @@ -153,68 +153,62 @@ public void testTaskManagerFailure() {
     
     		// ------------------------ Test if JobManager web interface is accessible -------
     
    -		YarnClient yc = null;
    -		try {
    -			yc = YarnClient.createYarnClient();
    -			yc.init(YARN_CONFIGURATION);
    -			yc.start();
    +		final YarnClient yc = YarnClient.createYarnClient();
    +		yc.init(YARN_CONFIGURATION);
    +		yc.start();
     
    -			List<ApplicationReport> apps = yc.getApplications(EnumSet.of(YarnApplicationState.RUNNING));
    --- End diff --
    
    These changes are not relevant to the issue at hand.


---

[GitHub] flink pull request #4598: [FLINK-7528] Create DispatcherRestEndpoint and int...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4598#discussion_r139622270
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpoint.java ---
    @@ -147,40 +170,65 @@ public InetSocketAddress getServerAddress() {
     	}
     
     	/**
    +	 * Returns the address of the REST server endpoint. Since the address is only known
    +	 * after the endpoint is started, it is returned as a future which is completed
    +	 * with the REST address at start up.
    +	 *
    +	 * @return REST address of this endpoint
    +	 */
    +	public String getRestAddress() {
    --- End diff --
    
    What is the relation of this method to `getServerAddress()`?


---

[GitHub] flink pull request #4598: [FLINK-7528] Create DispatcherRestEndpoint and int...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4598#discussion_r139680028
  
    --- Diff: flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/utils/WebFrontendBootstrap.java ---
    @@ -110,10 +114,21 @@ protected void initChannel(SocketChannel ch) {
     		this.serverChannel = ch.sync().channel();
     
     		InetSocketAddress bindAddress = (InetSocketAddress) serverChannel.localAddress();
    -		String address = bindAddress.getAddress().getHostAddress();
    +
    +		InetAddress inetAddress = bindAddress.getAddress();
    +		final String address;
    +
    +		if (inetAddress.isAnyLocalAddress()) {
    --- End diff --
    
    This is necessary if the server binds to `0.0.0.0`. Then we have to return the configured address in order to connect to the server.


---

[GitHub] flink pull request #4598: [FLINK-7528] Create DispatcherRestEndpoint and int...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4598#discussion_r139624279
  
    --- Diff: flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/utils/WebFrontendBootstrap.java ---
    @@ -110,10 +114,21 @@ protected void initChannel(SocketChannel ch) {
     		this.serverChannel = ch.sync().channel();
     
     		InetSocketAddress bindAddress = (InetSocketAddress) serverChannel.localAddress();
    -		String address = bindAddress.getAddress().getHostAddress();
    +
    +		InetAddress inetAddress = bindAddress.getAddress();
    +		final String address;
    +
    +		if (inetAddress.isAnyLocalAddress()) {
    --- End diff --
    
    What is this case distinction for?


---

[GitHub] flink pull request #4598: [FLINK-7528] Create DispatcherRestEndpoint and int...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4598#discussion_r139621374
  
    --- Diff: flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/RedirectHandlerTest.java ---
    @@ -108,7 +106,7 @@ public void testRedirectHandler() throws Exception {
     
     			HttpTestClient.SimpleHttpResponse response = httpClient.getNextResponse(FutureUtils.toFiniteDuration(timeout));
     
    -			Assert.assertEquals(HttpResponseStatus.SERVICE_UNAVAILABLE, response.getStatus());
    +			Assert.assertEquals(HttpResponseStatus.INTERNAL_SERVER_ERROR, response.getStatus());
    --- End diff --
    
    the comment above is now outdated


---

[GitHub] flink pull request #4598: [FLINK-7528] Create DispatcherRestEndpoint and int...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4598#discussion_r139678620
  
    --- Diff: flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/HandlerRedirectUtilsTest.java ---
    @@ -39,40 +39,36 @@
      */
     public class HandlerRedirectUtilsTest extends TestLogger {
     
    -	private static final String localJobManagerAddress = "akka.tcp://flink@127.0.0.1:1234/user/foobar";
    -	private static final String remoteHostname = "127.0.0.2";
    -	private static final int webPort = 1235;
    -	private static final String remoteURL = remoteHostname + ':' + webPort;
    -	private static final String remotePath = "akka.tcp://flink@" + remoteURL + "/user/jobmanager";
    +	private static final String localRestAddress = "http://127.0.0.1:1234";
    +	private static final String remoteRestAddress = "http://127.0.0.2:1234";
     
     	@Test
    -	public void testGetRedirectAddressWithLocalAkkaPath() throws Exception {
    +	public void testGetRedirectAddressWithLocalEqualsRemoteRESTAddress() throws Exception {
     		JobManagerGateway jobManagerGateway = mock(JobManagerGateway.class);
    -		when(jobManagerGateway.getAddress()).thenReturn("akka://flink/user/foobar");
    +		when(jobManagerGateway.requestRestAddress(any(Time.class))).thenReturn(CompletableFuture.completedFuture(localRestAddress));
     
    -		Optional<CompletableFuture<String>> redirectingAddress = HandlerRedirectUtils.getRedirectAddress(
    -			localJobManagerAddress,
    +		CompletableFuture<Optional<String>> redirectingAddressFuture = HandlerRedirectUtils.getRedirectAddress(
    --- End diff --
    
    Not strictly, but neither did it fit perfectly into another PR. Given all this rebasing hassle, I would like to simply keep it here.


---

[GitHub] flink pull request #4598: [FLINK-7528] Create DispatcherRestEndpoint and int...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4598#discussion_r139679505
  
    --- Diff: flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionCapacitySchedulerITCase.java ---
    @@ -153,68 +153,62 @@ public void testTaskManagerFailure() {
     
     		// ------------------------ Test if JobManager web interface is accessible -------
     
    -		YarnClient yc = null;
    -		try {
    -			yc = YarnClient.createYarnClient();
    -			yc.init(YARN_CONFIGURATION);
    -			yc.start();
    +		final YarnClient yc = YarnClient.createYarnClient();
    +		yc.init(YARN_CONFIGURATION);
    +		yc.start();
     
    -			List<ApplicationReport> apps = yc.getApplications(EnumSet.of(YarnApplicationState.RUNNING));
    --- End diff --
    
    True, but I came across while testing the Dispatcher REST endpoint in the YARN case. I think there is some value in letting the test fail with the proper stack trace instead of only failing the test with the failure message.


---

[GitHub] flink pull request #4598: [FLINK-7528] Create DispatcherRestEndpoint and int...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4598#discussion_r139677693
  
    --- Diff: flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/RedirectHandlerTest.java ---
    @@ -108,7 +106,7 @@ public void testRedirectHandler() throws Exception {
     
     			HttpTestClient.SimpleHttpResponse response = httpClient.getNextResponse(FutureUtils.toFiniteDuration(timeout));
     
    -			Assert.assertEquals(HttpResponseStatus.SERVICE_UNAVAILABLE, response.getStatus());
    +			Assert.assertEquals(HttpResponseStatus.INTERNAL_SERVER_ERROR, response.getStatus());
    --- End diff --
    
    True, will fix it.


---

[GitHub] flink pull request #4598: [FLINK-7528] Create DispatcherRestEndpoint and int...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4598#discussion_r139621841
  
    --- Diff: flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/HandlerRedirectUtilsTest.java ---
    @@ -39,40 +39,36 @@
      */
     public class HandlerRedirectUtilsTest extends TestLogger {
     
    -	private static final String localJobManagerAddress = "akka.tcp://flink@127.0.0.1:1234/user/foobar";
    -	private static final String remoteHostname = "127.0.0.2";
    -	private static final int webPort = 1235;
    -	private static final String remoteURL = remoteHostname + ':' + webPort;
    -	private static final String remotePath = "akka.tcp://flink@" + remoteURL + "/user/jobmanager";
    +	private static final String localRestAddress = "http://127.0.0.1:1234";
    +	private static final String remoteRestAddress = "http://127.0.0.2:1234";
     
     	@Test
    -	public void testGetRedirectAddressWithLocalAkkaPath() throws Exception {
    +	public void testGetRedirectAddressWithLocalEqualsRemoteRESTAddress() throws Exception {
     		JobManagerGateway jobManagerGateway = mock(JobManagerGateway.class);
    -		when(jobManagerGateway.getAddress()).thenReturn("akka://flink/user/foobar");
    +		when(jobManagerGateway.requestRestAddress(any(Time.class))).thenReturn(CompletableFuture.completedFuture(localRestAddress));
     
    -		Optional<CompletableFuture<String>> redirectingAddress = HandlerRedirectUtils.getRedirectAddress(
    -			localJobManagerAddress,
    +		CompletableFuture<Optional<String>> redirectingAddressFuture = HandlerRedirectUtils.getRedirectAddress(
    --- End diff --
    
    does this actually belong  in this commit?


---

[GitHub] flink pull request #4598: [FLINK-7528] Create DispatcherRestEndpoint and int...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4598#discussion_r139678977
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpoint.java ---
    @@ -147,40 +170,65 @@ public InetSocketAddress getServerAddress() {
     	}
     
     	/**
    +	 * Returns the address of the REST server endpoint. Since the address is only known
    +	 * after the endpoint is started, it is returned as a future which is completed
    +	 * with the REST address at start up.
    +	 *
    +	 * @return REST address of this endpoint
    +	 */
    +	public String getRestAddress() {
    --- End diff --
    
    This will return the fully qualified URL including protocol whereas `getServerAddress` only return the address of the server to which it is bound.


---