You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by tillrohrmann <gi...@git.apache.org> on 2017/08/02 12:45:06 UTC

[GitHub] flink pull request #4462: [FLINK-7334] [futures] Replace Flink's futures wit...

GitHub user tillrohrmann opened a pull request:

    https://github.com/apache/flink/pull/4462

    [FLINK-7334] [futures] Replace Flink's futures with Java 8's CompletableFuture in RpcEndpoint, RpcGateways and RpcService

    ## What is the purpose of the change
    
    Replace Flink's futures with Java 8's CompletableFuture in RpcEndpoint, RpcGateways and RpcService. Moreover, it removes the remaining usage of Flink futures in tests.
    
    ## Brief change log
    
    - Change RpcCompletenessTest to only accept `CompletableFutures` as return type
    - Change signature of `RpcGateways`
    - Adapt all `RpcEndpoint` implementations
    - Remove Flink Future usage from remaining tests
    
    This change is a trivial rework / code cleanup without any test coverage.
    
    ## Does this pull request potentially affect one of the following parts:
    
      - Dependencies (does it add or upgrade a dependency): (no)
      - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no)
      - The serializers: (no)
      - The runtime per-record code paths (performance sensitive): (no)
      - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes)
    
    ## Documentation
    
      - Does this pull request introduce a new feature? (no)
    


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/tillrohrmann/flink rfRpcGateway

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/4462.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #4462
    
----
commit 0e497367d8666a580d033848977b0639b037e554
Author: Till Rohrmann <tr...@apache.org>
Date:   2017-08-01T09:33:48Z

    [FLINK-7334] [futures] Replace Flink's futures with Java 8's CompletableFuture in RpcEndpoint, RpcGateways and RpcService
    
    Remove Futures from RpcGateways
    
    Remove Future usage

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4462: [FLINK-7334] [futures] Replace Flink's futures wit...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4462#discussion_r130873991
  
    --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingSerialRpcService.java ---
    @@ -178,20 +178,19 @@ public int getPort() {
     	}
     
     	@Override
    -	public <C extends RpcGateway> Future<C> connect(String address, Class<C> clazz) {
    +	public <C extends RpcGateway> CompletableFuture<C> connect(String address, Class<C> clazz) {
     		RpcGateway gateway = registeredConnections.get(address);
     
     		if (gateway != null) {
     			if (clazz.isAssignableFrom(gateway.getClass())) {
     				@SuppressWarnings("unchecked")
     				C typedGateway = (C) gateway;
    -				return FlinkCompletableFuture.completed(typedGateway);
    +				return CompletableFuture.completedFuture(typedGateway);
     			} else {
    -				return FlinkCompletableFuture.completedExceptionally(
    -					new Exception("Gateway registered under " + address + " is not of type " + clazz));
    +				return FutureUtils.completedExceptionally(new Exception("Gateway registered under " + address + " is not of type " + clazz));
     			}
     		} else {
    -			return FlinkCompletableFuture.completedExceptionally(new Exception("No gateway registered under that name"));
    +			return FutureUtils.completedExceptionally(new Exception("No gateway registered under that name"));
    --- End diff --
    
    same as above


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4462: [FLINK-7334] [futures] Replace Flink's futures wit...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4462#discussion_r130930727
  
    --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingSerialRpcService.java ---
    @@ -178,20 +178,19 @@ public int getPort() {
     	}
     
     	@Override
    -	public <C extends RpcGateway> Future<C> connect(String address, Class<C> clazz) {
    +	public <C extends RpcGateway> CompletableFuture<C> connect(String address, Class<C> clazz) {
     		RpcGateway gateway = registeredConnections.get(address);
     
     		if (gateway != null) {
     			if (clazz.isAssignableFrom(gateway.getClass())) {
     				@SuppressWarnings("unchecked")
     				C typedGateway = (C) gateway;
    -				return FlinkCompletableFuture.completed(typedGateway);
    +				return CompletableFuture.completedFuture(typedGateway);
     			} else {
    -				return FlinkCompletableFuture.completedExceptionally(
    -					new Exception("Gateway registered under " + address + " is not of type " + clazz));
    +				return FutureUtils.completedExceptionally(new Exception("Gateway registered under " + address + " is not of type " + clazz));
     			}
     		} else {
    -			return FlinkCompletableFuture.completedExceptionally(new Exception("No gateway registered under that name"));
    +			return FutureUtils.completedExceptionally(new Exception("No gateway registered under that name"));
    --- End diff --
    
    will add it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4462: [FLINK-7334] [futures] Replace Flink's futures wit...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4462#discussion_r130868023
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java ---
    @@ -755,48 +751,42 @@ public void failSlot(final ResourceID taskManagerId,
     		if (registeredTaskManagers.containsKey(taskManagerId)) {
     			final RegistrationResponse response = new JMTMRegistrationSuccess(
     				resourceId, libraryCacheManager.getBlobServerPort());
    -			return FlinkCompletableFuture.completed(response);
    +			return CompletableFuture.completedFuture(response);
     		} else {
    -			return getRpcService().execute(new Callable<TaskExecutorGateway>() {
    -				@Override
    -				public TaskExecutorGateway call() throws Exception {
    -					return getRpcService().connect(taskManagerRpcAddress, TaskExecutorGateway.class)
    -							.get(rpcTimeout.getSize(), rpcTimeout.getUnit());
    -				}
    -			}).handleAsync(new BiFunction<TaskExecutorGateway, Throwable, RegistrationResponse>() {
    -				@Override
    -				public RegistrationResponse apply(final TaskExecutorGateway taskExecutorGateway, Throwable throwable) {
    -					if (throwable != null) {
    -						return new RegistrationResponse.Decline(throwable.getMessage());
    -					}
    -
    -					if (!JobMaster.this.leaderSessionID.equals(leaderId)) {
    -						log.warn("Discard registration from TaskExecutor {} at ({}) because the expected " +
    -										"leader session ID {} did not equal the received leader session ID {}.",
    -								taskManagerId, taskManagerRpcAddress,
    -								JobMaster.this.leaderSessionID, leaderId);
    -						return new RegistrationResponse.Decline("Invalid leader session id");
    -					}
    -
    -					slotPoolGateway.registerTaskManager(taskManagerId);
    -					registeredTaskManagers.put(taskManagerId, Tuple2.of(taskManagerLocation, taskExecutorGateway));
    -
    -					// monitor the task manager as heartbeat target
    -					taskManagerHeartbeatManager.monitorTarget(taskManagerId, new HeartbeatTarget<Void>() {
    -						@Override
    -						public void receiveHeartbeat(ResourceID resourceID, Void payload) {
    -							// the task manager will not request heartbeat, so this method will never be called currently
    +			return getRpcService()
    +				.connect(taskManagerRpcAddress, TaskExecutorGateway.class)
    --- End diff --
    
    why is this no longer executed by the RpcService?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4462: [FLINK-7334] [futures] Replace Flink's futures wit...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4462#discussion_r130929937
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java ---
    @@ -755,48 +751,42 @@ public void failSlot(final ResourceID taskManagerId,
     		if (registeredTaskManagers.containsKey(taskManagerId)) {
     			final RegistrationResponse response = new JMTMRegistrationSuccess(
     				resourceId, libraryCacheManager.getBlobServerPort());
    -			return FlinkCompletableFuture.completed(response);
    +			return CompletableFuture.completedFuture(response);
     		} else {
    -			return getRpcService().execute(new Callable<TaskExecutorGateway>() {
    -				@Override
    -				public TaskExecutorGateway call() throws Exception {
    -					return getRpcService().connect(taskManagerRpcAddress, TaskExecutorGateway.class)
    -							.get(rpcTimeout.getSize(), rpcTimeout.getUnit());
    -				}
    -			}).handleAsync(new BiFunction<TaskExecutorGateway, Throwable, RegistrationResponse>() {
    -				@Override
    -				public RegistrationResponse apply(final TaskExecutorGateway taskExecutorGateway, Throwable throwable) {
    -					if (throwable != null) {
    -						return new RegistrationResponse.Decline(throwable.getMessage());
    -					}
    -
    -					if (!JobMaster.this.leaderSessionID.equals(leaderId)) {
    -						log.warn("Discard registration from TaskExecutor {} at ({}) because the expected " +
    -										"leader session ID {} did not equal the received leader session ID {}.",
    -								taskManagerId, taskManagerRpcAddress,
    -								JobMaster.this.leaderSessionID, leaderId);
    -						return new RegistrationResponse.Decline("Invalid leader session id");
    -					}
    -
    -					slotPoolGateway.registerTaskManager(taskManagerId);
    -					registeredTaskManagers.put(taskManagerId, Tuple2.of(taskManagerLocation, taskExecutorGateway));
    -
    -					// monitor the task manager as heartbeat target
    -					taskManagerHeartbeatManager.monitorTarget(taskManagerId, new HeartbeatTarget<Void>() {
    -						@Override
    -						public void receiveHeartbeat(ResourceID resourceID, Void payload) {
    -							// the task manager will not request heartbeat, so this method will never be called currently
    +			return getRpcService()
    +				.connect(taskManagerRpcAddress, TaskExecutorGateway.class)
    --- End diff --
    
    Because we were blocking a thread from the `RpcService's` `Executor` without a reason by calling `get` on the returned future by `RpcService#connect`. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4462: [FLINK-7334] [futures] Replace Flink's futures wit...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4462#discussion_r130874525
  
    --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java ---
    @@ -168,7 +165,7 @@ public void testHeartbeatTimeoutWithJobManager() throws Exception {
     				eq(taskManagerLocation),
     				eq(jmLeaderId),
     				any(Time.class)
    -		)).thenReturn(FlinkCompletableFuture.<RegistrationResponse>completed(new JMTMRegistrationSuccess(jmResourceId, blobPort)));
    +		)).thenReturn(java.util.concurrent.CompletableFuture.completedFuture(new JMTMRegistrationSuccess(jmResourceId, blobPort)));
    --- End diff --
    
    This can be shortened. (multiple times in this file)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4462: [FLINK-7334] [futures] Replace Flink's futures wit...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4462#discussion_r130949194
  
    --- Diff: flink-runtime/src/test/resources/log4j-test.properties ---
    @@ -16,7 +16,7 @@
     # limitations under the License.
     ################################################################################
     
    -log4j.rootLogger=OFF, console
    --- End diff --
    
    needs to be reverted


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4462: [FLINK-7334] [futures] Replace Flink's futures wit...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4462#discussion_r130930816
  
    --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java ---
    @@ -168,7 +165,7 @@ public void testHeartbeatTimeoutWithJobManager() throws Exception {
     				eq(taskManagerLocation),
     				eq(jmLeaderId),
     				any(Time.class)
    -		)).thenReturn(FlinkCompletableFuture.<RegistrationResponse>completed(new JMTMRegistrationSuccess(jmResourceId, blobPort)));
    +		)).thenReturn(java.util.concurrent.CompletableFuture.completedFuture(new JMTMRegistrationSuccess(jmResourceId, blobPort)));
    --- End diff --
    
    True. Will change it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4462: [FLINK-7334] [futures] Replace Flink's futures wit...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4462#discussion_r130930461
  
    --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingRpcService.java ---
    @@ -92,20 +92,19 @@ public void registerGateway(String address, RpcGateway gateway) {
     	}
     
     	@Override
    -	public <C extends RpcGateway> Future<C> connect(String address, Class<C> clazz) {
    +	public <C extends RpcGateway> CompletableFuture<C> connect(String address, Class<C> clazz) {
     		RpcGateway gateway = registeredConnections.get(address);
     
     		if (gateway != null) {
     			if (clazz.isAssignableFrom(gateway.getClass())) {
     				@SuppressWarnings("unchecked")
     				C typedGateway = (C) gateway;
    -				return FlinkCompletableFuture.completed(typedGateway);
    +				return CompletableFuture.completedFuture(typedGateway);
     			} else {
    -				return FlinkCompletableFuture.completedExceptionally(
    -					new Exception("Gateway registered under " + address + " is not of type " + clazz));
    +				return FutureUtils.completedExceptionally(new Exception("Gateway registered under " + address + " is not of type " + clazz));
     			}
     		} else {
    -			return FlinkCompletableFuture.completedExceptionally(new Exception("No gateway registered under that name"));
    +			return FutureUtils.completedExceptionally(new Exception("No gateway registered under that name"));
    --- End diff --
    
    True. Will add it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4462: [FLINK-7334] [futures] Replace Flink's futures wit...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/flink/pull/4462


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #4462: [FLINK-7334] [futures] Replace Flink's futures with Java ...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on the issue:

    https://github.com/apache/flink/pull/4462
  
    Thanks for reviewing @zentol. I'll address your comments and then merge this PR.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4462: [FLINK-7334] [futures] Replace Flink's futures wit...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4462#discussion_r131072988
  
    --- Diff: flink-runtime/src/test/resources/log4j-test.properties ---
    @@ -16,7 +16,7 @@
     # limitations under the License.
     ################################################################################
     
    -log4j.rootLogger=OFF, console
    --- End diff --
    
    Thanks for catching this @zentol.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4462: [FLINK-7334] [futures] Replace Flink's futures wit...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4462#discussion_r130873887
  
    --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingRpcService.java ---
    @@ -92,20 +92,19 @@ public void registerGateway(String address, RpcGateway gateway) {
     	}
     
     	@Override
    -	public <C extends RpcGateway> Future<C> connect(String address, Class<C> clazz) {
    +	public <C extends RpcGateway> CompletableFuture<C> connect(String address, Class<C> clazz) {
     		RpcGateway gateway = registeredConnections.get(address);
     
     		if (gateway != null) {
     			if (clazz.isAssignableFrom(gateway.getClass())) {
     				@SuppressWarnings("unchecked")
     				C typedGateway = (C) gateway;
    -				return FlinkCompletableFuture.completed(typedGateway);
    +				return CompletableFuture.completedFuture(typedGateway);
     			} else {
    -				return FlinkCompletableFuture.completedExceptionally(
    -					new Exception("Gateway registered under " + address + " is not of type " + clazz));
    +				return FutureUtils.completedExceptionally(new Exception("Gateway registered under " + address + " is not of type " + clazz));
     			}
     		} else {
    -			return FlinkCompletableFuture.completedExceptionally(new Exception("No gateway registered under that name"));
    +			return FutureUtils.completedExceptionally(new Exception("No gateway registered under that name"));
    --- End diff --
    
    We could include the name in the exception.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #4462: [FLINK-7334] [futures] Replace Flink's futures with Java ...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on the issue:

    https://github.com/apache/flink/pull/4462
  
    Thanks for your review @zentol. I've addressed your comments.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---