You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by tillrohrmann <gi...@git.apache.org> on 2017/08/26 15:51:59 UTC
[GitHub] flink pull request #4598: [FLINK-7528] Create DispatcherRestEndpoint and int...
GitHub user tillrohrmann opened a pull request:
https://github.com/apache/flink/pull/4598
[FLINK-7528] Create DispatcherRestEndpoint and integrate with Dispatcher
## What is the purpose of the change
This commit creates the DispatcherRestEndpoint and integrates it with the
Dispatcher. The DispatcherRestEndpoint is created in the SessionClusterEntrypoint
and its address is passed to the Dispatcher such that it can answer the
requestRestAddress RPC.
## Brief change log
- Create the `DispatcherRestEndpoint`
- Start `DispatcherRestEndpoint` in `SessionClusterEntrypoint`
- Pass in REST address to Dispatcher and JobManager
- Resolve redirection address by directly ask leader component
## Verifying this change
Verified manually that the `DispatcherRestEndpoint` is started and reachable (even though it does not serve anything yet).
## Does this pull request potentially affect one of the following parts:
- Dependencies (does it add or upgrade a dependency): (no)
- The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no)
- The serializers: (no)
- The runtime per-record code paths (performance sensitive): (no)
- Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
## Documentation
- Does this pull request introduce a new feature? (no)
- If yes, how is the feature documented? (not applicable)
You can merge this pull request into a Git repository by running:
$ git pull https://github.com/tillrohrmann/flink dispatcherRestEndpoint
Alternatively you can review and apply these changes as the patch at:
https://github.com/apache/flink/pull/4598.patch
To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:
This closes #4598
----
commit 7835d0a89c8ecdd5b9661ee8c57a9d63a3ed3742
Author: Till Rohrmann <tr...@apache.org>
Date: 2017-08-14T13:47:04Z
[FLINK-7522] Add termination future to ClusterEntrypoint
The termination future is completed when the ClusterEntrypoint shuts down. This
allows for easier testing.
commit 2cdf97f824bc62a82e65f4c160b9ad64de446de4
Author: Till Rohrmann <tr...@apache.org>
Date: 2017-08-16T12:36:13Z
[FLINK-7457] Make Dispatcher highly available
This commit introduces a dispatcher leader election and retrieval service to the
HighAvailabilityServices. Moreover it adds code such that the Dispatcher now takes
part in the leader election process using the afore-mentioned services.
Let Dispatcher participate in leader election
Add test for Dispatcher leader election
commit 04caf85d33ddfc3a4a9b788745b8282c3437d8e2
Author: Till Rohrmann <tr...@apache.org>
Date: 2017-08-10T08:56:12Z
[FLINK-7409] [web] Make WebRuntimeMonitor reactive
This commit changes the behaviour of the WebRuntimeMonitor to not longer block serving
threads by waiting on the result of futures. Instead the RequestHandler now returns a
CompletableFuture<FullHttpResponse> which is written out to the Netty channel upon
completion. This will improve the performance of our WebRuntimeMonitor.
commit 4fa6dedd95555a2d1a91339ff5effda3bc2bd1d5
Author: Till Rohrmann <tr...@apache.org>
Date: 2017-08-15T10:00:58Z
[FLINK-7458] Generalize GatewayRetriever for WebRuntimeMonitor
Introduce a generalized GatewayRetriever replacing the JobManagerRetriever. The
GatewayRetriever fulfills the same purpose as the JobManagerRetriever with the
ability to retrieve the gateway for an arbitrary endpoint type.
commit 0f9b2ce77e20f25fc95ddeba98f863b86450a72c
Author: Till Rohrmann <tr...@apache.org>
Date: 2017-08-15T11:55:47Z
[FLINK-7459] Generalize Flink's redirection logic
Introduce RedirectHandler which can be extended to add redirection functionality to all
SimpleInboundChannelHandlers. This allows to share the same functionality across the
StaticFileServerHandler and the RuntimeMonitorHandlerBase which could now be removed.
In the future, the AbstractRestHandler will also extend the RedirectHandler.
commit 88aed4f7a198b3994271088b8e19558d399ddd9d
Author: Till Rohrmann <tr...@apache.org>
Date: 2017-08-17T13:04:19Z
[FLINK-7527] [rest] Let AbstractRestHandler extend RedirectHandler
By letting the AbstractRestHandler extend the RedirectHandler, we add redirection
capabilities to the AbstractRestHandler.
commit 9b7de1dc21b771fc10ee1661f34c142e990b424f
Author: Till Rohrmann <tr...@apache.org>
Date: 2017-08-17T13:25:33Z
[FLINK-7528] Create DispatcherRestEndpoint and integrate with Dispatcher
This commit creates the DispatcherRestEndpoint and integrates it with the
Dispatcher. The DispatcherRestEndpoint is created in the SessionClusterEntrypoint
and its address is passed to the Dispatcher such that it can answer the
requestRestAddress RPC.
----
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---
[GitHub] flink pull request #4598: [FLINK-7528] Create DispatcherRestEndpoint and int...
Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:
https://github.com/apache/flink/pull/4598
---
[GitHub] flink pull request #4598: [FLINK-7528] Create DispatcherRestEndpoint and int...
Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:
https://github.com/apache/flink/pull/4598#discussion_r139623946
--- Diff: flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionCapacitySchedulerITCase.java ---
@@ -153,68 +153,62 @@ public void testTaskManagerFailure() {
// ------------------------ Test if JobManager web interface is accessible -------
- YarnClient yc = null;
- try {
- yc = YarnClient.createYarnClient();
- yc.init(YARN_CONFIGURATION);
- yc.start();
+ final YarnClient yc = YarnClient.createYarnClient();
+ yc.init(YARN_CONFIGURATION);
+ yc.start();
- List<ApplicationReport> apps = yc.getApplications(EnumSet.of(YarnApplicationState.RUNNING));
--- End diff --
These changes are not relevant to the issue at hand.
---
[GitHub] flink pull request #4598: [FLINK-7528] Create DispatcherRestEndpoint and int...
Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:
https://github.com/apache/flink/pull/4598#discussion_r139622270
--- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpoint.java ---
@@ -147,40 +170,65 @@ public InetSocketAddress getServerAddress() {
}
/**
+ * Returns the address of the REST server endpoint. Since the address is only known
+ * after the endpoint is started, it is returned as a future which is completed
+ * with the REST address at start up.
+ *
+ * @return REST address of this endpoint
+ */
+ public String getRestAddress() {
--- End diff --
What is the relation of this method to `getServerAddress()`?
---
[GitHub] flink pull request #4598: [FLINK-7528] Create DispatcherRestEndpoint and int...
Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:
https://github.com/apache/flink/pull/4598#discussion_r139680028
--- Diff: flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/utils/WebFrontendBootstrap.java ---
@@ -110,10 +114,21 @@ protected void initChannel(SocketChannel ch) {
this.serverChannel = ch.sync().channel();
InetSocketAddress bindAddress = (InetSocketAddress) serverChannel.localAddress();
- String address = bindAddress.getAddress().getHostAddress();
+
+ InetAddress inetAddress = bindAddress.getAddress();
+ final String address;
+
+ if (inetAddress.isAnyLocalAddress()) {
--- End diff --
This is necessary if the server binds to `0.0.0.0`. Then we have to return the configured address in order to connect to the server.
---
[GitHub] flink pull request #4598: [FLINK-7528] Create DispatcherRestEndpoint and int...
Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:
https://github.com/apache/flink/pull/4598#discussion_r139624279
--- Diff: flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/utils/WebFrontendBootstrap.java ---
@@ -110,10 +114,21 @@ protected void initChannel(SocketChannel ch) {
this.serverChannel = ch.sync().channel();
InetSocketAddress bindAddress = (InetSocketAddress) serverChannel.localAddress();
- String address = bindAddress.getAddress().getHostAddress();
+
+ InetAddress inetAddress = bindAddress.getAddress();
+ final String address;
+
+ if (inetAddress.isAnyLocalAddress()) {
--- End diff --
What is this case distinction for?
---
[GitHub] flink pull request #4598: [FLINK-7528] Create DispatcherRestEndpoint and int...
Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:
https://github.com/apache/flink/pull/4598#discussion_r139621374
--- Diff: flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/RedirectHandlerTest.java ---
@@ -108,7 +106,7 @@ public void testRedirectHandler() throws Exception {
HttpTestClient.SimpleHttpResponse response = httpClient.getNextResponse(FutureUtils.toFiniteDuration(timeout));
- Assert.assertEquals(HttpResponseStatus.SERVICE_UNAVAILABLE, response.getStatus());
+ Assert.assertEquals(HttpResponseStatus.INTERNAL_SERVER_ERROR, response.getStatus());
--- End diff --
the comment above is now outdated
---
[GitHub] flink pull request #4598: [FLINK-7528] Create DispatcherRestEndpoint and int...
Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:
https://github.com/apache/flink/pull/4598#discussion_r139678620
--- Diff: flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/HandlerRedirectUtilsTest.java ---
@@ -39,40 +39,36 @@
*/
public class HandlerRedirectUtilsTest extends TestLogger {
- private static final String localJobManagerAddress = "akka.tcp://flink@127.0.0.1:1234/user/foobar";
- private static final String remoteHostname = "127.0.0.2";
- private static final int webPort = 1235;
- private static final String remoteURL = remoteHostname + ':' + webPort;
- private static final String remotePath = "akka.tcp://flink@" + remoteURL + "/user/jobmanager";
+ private static final String localRestAddress = "http://127.0.0.1:1234";
+ private static final String remoteRestAddress = "http://127.0.0.2:1234";
@Test
- public void testGetRedirectAddressWithLocalAkkaPath() throws Exception {
+ public void testGetRedirectAddressWithLocalEqualsRemoteRESTAddress() throws Exception {
JobManagerGateway jobManagerGateway = mock(JobManagerGateway.class);
- when(jobManagerGateway.getAddress()).thenReturn("akka://flink/user/foobar");
+ when(jobManagerGateway.requestRestAddress(any(Time.class))).thenReturn(CompletableFuture.completedFuture(localRestAddress));
- Optional<CompletableFuture<String>> redirectingAddress = HandlerRedirectUtils.getRedirectAddress(
- localJobManagerAddress,
+ CompletableFuture<Optional<String>> redirectingAddressFuture = HandlerRedirectUtils.getRedirectAddress(
--- End diff --
Not strictly, but neither did it fit perfectly into another PR. Given all this rebasing hassle, I would like to simply keep it here.
---
[GitHub] flink pull request #4598: [FLINK-7528] Create DispatcherRestEndpoint and int...
Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:
https://github.com/apache/flink/pull/4598#discussion_r139679505
--- Diff: flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionCapacitySchedulerITCase.java ---
@@ -153,68 +153,62 @@ public void testTaskManagerFailure() {
// ------------------------ Test if JobManager web interface is accessible -------
- YarnClient yc = null;
- try {
- yc = YarnClient.createYarnClient();
- yc.init(YARN_CONFIGURATION);
- yc.start();
+ final YarnClient yc = YarnClient.createYarnClient();
+ yc.init(YARN_CONFIGURATION);
+ yc.start();
- List<ApplicationReport> apps = yc.getApplications(EnumSet.of(YarnApplicationState.RUNNING));
--- End diff --
True, but I came across while testing the Dispatcher REST endpoint in the YARN case. I think there is some value in letting the test fail with the proper stack trace instead of only failing the test with the failure message.
---
[GitHub] flink pull request #4598: [FLINK-7528] Create DispatcherRestEndpoint and int...
Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:
https://github.com/apache/flink/pull/4598#discussion_r139677693
--- Diff: flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/RedirectHandlerTest.java ---
@@ -108,7 +106,7 @@ public void testRedirectHandler() throws Exception {
HttpTestClient.SimpleHttpResponse response = httpClient.getNextResponse(FutureUtils.toFiniteDuration(timeout));
- Assert.assertEquals(HttpResponseStatus.SERVICE_UNAVAILABLE, response.getStatus());
+ Assert.assertEquals(HttpResponseStatus.INTERNAL_SERVER_ERROR, response.getStatus());
--- End diff --
True, will fix it.
---
[GitHub] flink pull request #4598: [FLINK-7528] Create DispatcherRestEndpoint and int...
Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:
https://github.com/apache/flink/pull/4598#discussion_r139621841
--- Diff: flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/HandlerRedirectUtilsTest.java ---
@@ -39,40 +39,36 @@
*/
public class HandlerRedirectUtilsTest extends TestLogger {
- private static final String localJobManagerAddress = "akka.tcp://flink@127.0.0.1:1234/user/foobar";
- private static final String remoteHostname = "127.0.0.2";
- private static final int webPort = 1235;
- private static final String remoteURL = remoteHostname + ':' + webPort;
- private static final String remotePath = "akka.tcp://flink@" + remoteURL + "/user/jobmanager";
+ private static final String localRestAddress = "http://127.0.0.1:1234";
+ private static final String remoteRestAddress = "http://127.0.0.2:1234";
@Test
- public void testGetRedirectAddressWithLocalAkkaPath() throws Exception {
+ public void testGetRedirectAddressWithLocalEqualsRemoteRESTAddress() throws Exception {
JobManagerGateway jobManagerGateway = mock(JobManagerGateway.class);
- when(jobManagerGateway.getAddress()).thenReturn("akka://flink/user/foobar");
+ when(jobManagerGateway.requestRestAddress(any(Time.class))).thenReturn(CompletableFuture.completedFuture(localRestAddress));
- Optional<CompletableFuture<String>> redirectingAddress = HandlerRedirectUtils.getRedirectAddress(
- localJobManagerAddress,
+ CompletableFuture<Optional<String>> redirectingAddressFuture = HandlerRedirectUtils.getRedirectAddress(
--- End diff --
does this actually belong in this commit?
---
[GitHub] flink pull request #4598: [FLINK-7528] Create DispatcherRestEndpoint and int...
Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:
https://github.com/apache/flink/pull/4598#discussion_r139678977
--- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpoint.java ---
@@ -147,40 +170,65 @@ public InetSocketAddress getServerAddress() {
}
/**
+ * Returns the address of the REST server endpoint. Since the address is only known
+ * after the endpoint is started, it is returned as a future which is completed
+ * with the REST address at start up.
+ *
+ * @return REST address of this endpoint
+ */
+ public String getRestAddress() {
--- End diff --
This will return the fully qualified URL including protocol whereas `getServerAddress` only return the address of the server to which it is bound.
---