You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by beyond1920 <gi...@git.apache.org> on 2016/09/01 08:59:07 UTC

[GitHub] flink pull request #2455: [FLINK-4547] [cluster management] Return same obje...

GitHub user beyond1920 opened a pull request:

    https://github.com/apache/flink/pull/2455

    [FLINK-4547] [cluster management] Return same object when call connect method in AkkaRpcService using same address and same rpc gateway class

    The pull request is to add equals and hashCode method to AkkaInvocationHandler class. 
    Now every time call connect method in AkkaRpcService class using same address and same rpc gateway class, the return gateway object is totally different with each other which equals and hashcode are not same. 
    Maybe it\u2019s reasonable to have the same result (equals return true, and hashcode is same) when using the same address and same Gateway class.
    
    Main difference are 2 points:
    1. Add equals and hashCode method to AkkaInvocationHandler class
    2. Add a test for connect method in AkkaRpcService


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/alibaba/flink jira-4547

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/2455.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #2455
    
----
commit 8504f1075b4c4c51f37aab2b45b864f885dc76a3
Author: beyond1920 <be...@126.com>
Date:   2016-09-01T07:00:03Z

    add equals and hashcode in AkkaInvocationHandler

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2455: [FLINK-4547] [cluster management] when call connect metho...

Posted by beyond1920 <gi...@git.apache.org>.
Github user beyond1920 commented on the issue:

    https://github.com/apache/flink/pull/2455
  
    @tillrohrmann , I rebase the PR. Thanks.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2455: [FLINK-4547] [cluster management] Return same object when...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on the issue:

    https://github.com/apache/flink/pull/2455
  
    I think the subject line of this PR is a little bit misleading. The `RpcService` won't return the same gateway object upon calling `connect` with the same parameters. But the returned gateways are equal with respect to `equals` and `hashCode`.
    
    Maybe the JIRA and the PR subject line should be corrected to better reflect the actual changes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2455: [FLINK-4547] [cluster management] when call connect metho...

Posted by beyond1920 <gi...@git.apache.org>.
Github user beyond1920 commented on the issue:

    https://github.com/apache/flink/pull/2455
  
    @tillrohrmann @StephanEwen , thanks for your review. I changed the code based on your comment, including two points:
    1. Change the JIRA and the PR subject line to better reflect the actual changes.
    2. Modify the testcase which connect to invalid address in AkkaRpcServiceTest.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2455: [FLINK-4547] [cluster management] Return same obje...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2455#discussion_r77326124
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaInvocationHandler.java ---
    @@ -189,7 +191,49 @@ public void stop() {
     		rpcEndpoint.tell(Processing.STOP, ActorRef.noSender());
     	}
     
    -	// ------------------------------------------------------------------------
    +	@Override
    +	public boolean equals(Object o) {
    +		if (this == o) {
    +			return true;
    +		}
    +
    +		if (o == null) {
    +			return false;
    +		}
    +
    +		if(Proxy.isProxyClass(o.getClass())) {
    +			return o.equals(this);
    +		}
    --- End diff --
    
    Why should the `AkkaInvocationHandler` be a proxy class?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2455: [FLINK-4547] [cluster management] Return same object when...

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on the issue:

    https://github.com/apache/flink/pull/2455
  
    In order to reuse the same gateway, one would need to cache them in the RPC service. When would they be cleaned up? There needs to be a good solution to that, otherwise it may create a slow but constant memory leak in the JobManager.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2455: [FLINK-4547] [cluster management] Return same obje...

Posted by beyond1920 <gi...@git.apache.org>.
Github user beyond1920 commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2455#discussion_r77753174
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaInvocationHandler.java ---
    @@ -189,7 +191,49 @@ public void stop() {
     		rpcEndpoint.tell(Processing.STOP, ActorRef.noSender());
     	}
     
    -	// ------------------------------------------------------------------------
    +	@Override
    +	public boolean equals(Object o) {
    +		if (this == o) {
    +			return true;
    +		}
    +
    +		if (o == null) {
    +			return false;
    +		}
    +
    +		if(Proxy.isProxyClass(o.getClass())) {
    +			return o.equals(this);
    +		}
    --- End diff --
    
    @StephanEwen , as till said, the subject of this pr is misleading. I means When call connect method in AkkaRpcService using same address and same rpc gateway class, the returned gateways are equals instead of return same gateway. I changed the subject.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2455: [FLINK-4547] [cluster management] Return same obje...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2455#discussion_r77326942
  
    --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceTest.java ---
    @@ -84,4 +89,35 @@ public void run() {
     
     		assertTrue("call was not properly delayed", ((stop - start) / 1000000) >= delay);
     	}
    +
    +	/**
    +	 * Test connect method
    +	 * 1. Get the same result when connect the same address and same gateway class
    +	 * 2. Failed when connect to invalid address
    +	 * @throws Exception
    +	 */
    +	@Test
    +	public void testConnect() throws Exception {
    +		TestingLeaderElectionService leaderElectionService = new TestingLeaderElectionService();
    +		TestingHighAvailabilityServices highAvailabilityServices = new TestingHighAvailabilityServices();
    +		highAvailabilityServices.setResourceManagerLeaderElectionService(leaderElectionService);
    +
    +		ResourceManager rm = new ResourceManager(akkaRpcService, highAvailabilityServices);
    +		rm.start();
    +		String address = rm.getAddress();
    +		// verify get the same result when connect the same address and same gateway class
    +		Future<ResourceManagerGateway> rmGatewayFuture1 = akkaRpcService.connect(address, ResourceManagerGateway.class);
    +		ResourceManagerGateway rmGateway1 = Await.result(rmGatewayFuture1, new FiniteDuration(200, TimeUnit.MILLISECONDS));
    +
    +		Future<ResourceManagerGateway> rmGatewayFuture2 = akkaRpcService.connect(address, ResourceManagerGateway.class);
    +		ResourceManagerGateway rmGateway2 = Await.result(rmGatewayFuture2, new FiniteDuration(200, TimeUnit.MILLISECONDS));
    +
    +		Assert.assertEquals(rmGateway1, rmGateway2);
    +		Assert.assertEquals(rmGateway1.hashCode(), rmGateway2.hashCode());
    +
    +		// verify failed when connect to invalid address
    +		String invalidString = "abc";
    +		Future<ResourceManagerGateway> invalidRmGatewayFuture = akkaRpcService.connect(invalidString, ResourceManagerGateway.class);
    +		assertTrue(invalidRmGatewayFuture.failed().value().get().get() instanceof RuntimeException);
    --- End diff --
    
    Maybe you should check whether the future is completed. You could obtain the result via `Await.result(future, timeout)`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2455: [FLINK-4547] [cluster management] when call connec...

Posted by beyond1920 <gi...@git.apache.org>.
Github user beyond1920 commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2455#discussion_r77754084
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaInvocationHandler.java ---
    @@ -189,7 +191,49 @@ public void stop() {
     		rpcEndpoint.tell(Processing.STOP, ActorRef.noSender());
     	}
     
    -	// ------------------------------------------------------------------------
    +	@Override
    +	public boolean equals(Object o) {
    +		if (this == o) {
    +			return true;
    +		}
    +
    +		if (o == null) {
    +			return false;
    +		}
    +
    +		if(Proxy.isProxyClass(o.getClass())) {
    +			return o.equals(this);
    +		}
    --- End diff --
    
    @tillrohrmann ,  It does not means that AkkaInvocationHandler be a proxy class. In fact it means if input parameter class is a proxy class, then return o.equals(this) result. Here is my reason.  when call connect method in AkkaRpcService, the returned gateway which is wrapped in Future is in fact a Proxy. When I call equals method to compare two gateway, the equals method of AkkaInvocationHandler will be called. But the input parameter is still another Gateway  which class is Proxy class instead of AkkaInvocationHandler. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2455: [FLINK-4547] [cluster management] when call connec...

Posted by beyond1920 <gi...@git.apache.org>.
Github user beyond1920 closed the pull request at:

    https://github.com/apache/flink/pull/2455


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2455: [FLINK-4547] [cluster management] when call connec...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2455#discussion_r77782622
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaInvocationHandler.java ---
    @@ -189,7 +191,49 @@ public void stop() {
     		rpcEndpoint.tell(Processing.STOP, ActorRef.noSender());
     	}
     
    -	// ------------------------------------------------------------------------
    +	@Override
    +	public boolean equals(Object o) {
    +		if (this == o) {
    +			return true;
    +		}
    +
    +		if (o == null) {
    +			return false;
    +		}
    +
    +		if(Proxy.isProxyClass(o.getClass())) {
    +			return o.equals(this);
    +		}
    --- End diff --
    
    Alright, I understand that now :-).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---