You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by ramkrish86 <gi...@git.apache.org> on 2016/09/13 13:42:44 UTC

[GitHub] flink pull request #2496: FLINK-4615 Reusing the memory allocated for the dr...

GitHub user ramkrish86 opened a pull request:

    https://github.com/apache/flink/pull/2496

    FLINK-4615 Reusing the memory allocated for the drivers and iterators

    Thanks for contributing to Apache Flink. Before you open your pull request, please take the following check list into consideration.
    If your changes take all of the items into account, feel free to open your pull request. For more information and/or questions please refer to the [How To Contribute guide](http://flink.apache.org/how-to-contribute.html).
    In addition to going through the list, please provide a meaningful description of your changes.
    
    - [ ] General
      - The pull request references the related JIRA issue ("[FLINK-XXX] Jira title text")
      - The pull request addresses only one issue
      - Each commit in the PR has a meaningful commit message (including the JIRA id)
    
    - [ ] Documentation
      - Documentation has been added for new functionality
      - Old documentation affected by the pull request has been updated
      - JavaDoc for public methods has been added
    
    - [ ] Tests & Build
      - Functionality added by the pull request is covered by tests
      - `mvn clean verify` has been executed successfully locally or a Travis build has passed
    
    First PR for initial review. Helps to reuse the memory for the iterators that are created by the drivers. If some one could point me to more test cases other than the ConnectedComponents example can see how things work. Suggestions/feedback are welcome. 

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/ramkrish86/flink FLINK-4615

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/2496.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #2496
    
----
commit b4440ed2a4cd7484ae90562a49a5af0336b41902
Author: Ramkrishna <ra...@intel.com>
Date:   2016-09-13T13:39:58Z

    FLINK-4615 Reusing the memory allocated for the drivers and iterators

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2496: FLINK-4615 Reusing the memory allocated for the dr...

Posted by ramkrish86 <gi...@git.apache.org>.
Github user ramkrish86 commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2496#discussion_r79021152
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/operators/Driver.java ---
    @@ -87,4 +87,6 @@
     	 * @throws Exception Exceptions may be forwarded.
     	 */
     	void cancel() throws Exception;
    +
    +	void resetForIterativeTasks() throws Exception;
    --- End diff --
    
    So can we only have ResettableDriver only and no need for Driver interface then?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2496: FLINK-4615 Reusing the memory allocated for the drivers a...

Posted by ramkrish86 <gi...@git.apache.org>.
Github user ramkrish86 commented on the issue:

    https://github.com/apache/flink/pull/2496
  
    The pageRank test failure is a genuine one. I think the problem is because resetting of some of the states are not done correctly. I think we have to be more careful here and ensure we reset everything.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2496: FLINK-4615 Reusing the memory allocated for the dr...

Posted by ggevay <gi...@git.apache.org>.
Github user ggevay commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2496#discussion_r79010283
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/operators/Driver.java ---
    @@ -87,4 +87,6 @@
     	 * @throws Exception Exceptions may be forwarded.
     	 */
     	void cancel() throws Exception;
    +
    +	void resetForIterativeTasks() throws Exception;
    --- End diff --
    
    > You mean all drivers will now become a type of ResettableDriver?
    
    Yes, this was the idea in my comment [1] on the Jira. I'm sorry if I wasn't clear enough there.
    The advantage of this is that you wouldn't need to add more logic in `BatchTask`, `AbstractIterativeTask` and its descendants, because these already handle the situation when the driver is an instance of `ResettableDriver`. (See e.g. the beginning of `reinstantiateDriver`.)
    
    [1] https://issues.apache.org/jira/browse/FLINK-3322?focusedCommentId=15474531&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-15474531


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2496: FLINK-4615 Reusing the memory allocated for the drivers a...

Posted by ramkrish86 <gi...@git.apache.org>.
Github user ramkrish86 commented on the issue:

    https://github.com/apache/flink/pull/2496
  
    Updated PR. Handles reset()in a much better way. PageRankITCase exposed some cases. Also does the clean up of the driver now.
    Still wondering there should be a reason why the driver was instantiated every time for an iterative task. May be am missing the reason for that. If that is for sure needed then this whole fix may not work. But the patch as such is better in terms of handling the reset() part. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2496: FLINK-4615 Reusing the memory allocated for the dr...

Posted by ramkrish86 <gi...@git.apache.org>.
Github user ramkrish86 commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2496#discussion_r79006658
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationIntermediateTask.java ---
    @@ -82,43 +82,46 @@ protected void initialize() throws Exception {
     	public void run() throws Exception {
     		
     		SuperstepKickoffLatch nextSuperstepLatch = SuperstepKickoffLatchBroker.instance().get(brokerKey());
    +		try {
    +			while (this.running && !terminationRequested()) {
     
    -		while (this.running && !terminationRequested()) {
    +				if (log.isInfoEnabled()) {
    +					log.info(formatLogString("starting iteration [" + currentIteration() + "]"));
    +				}
     
    -			if (log.isInfoEnabled()) {
    -				log.info(formatLogString("starting iteration [" + currentIteration() + "]"));
    -			}
    +				super.run();
     
    -			super.run();
    +				// check if termination was requested
    +				verifyEndOfSuperstepState();
     
    -			// check if termination was requested
    -			verifyEndOfSuperstepState();
    +				if (isWorksetUpdate && isWorksetIteration) {
    +					long numCollected = worksetUpdateOutputCollector.getElementsCollectedAndReset();
    +					worksetAggregator.aggregate(numCollected);
    +				}
     
    -			if (isWorksetUpdate && isWorksetIteration) {
    -				long numCollected = worksetUpdateOutputCollector.getElementsCollectedAndReset();
    -				worksetAggregator.aggregate(numCollected);
    -			}
    -			
    -			if (log.isInfoEnabled()) {
    -				log.info(formatLogString("finishing iteration [" + currentIteration() + "]"));
    -			}
    -			
    -			// let the successors know that the end of this superstep data is reached
    -			sendEndOfSuperstep();
    -			
    -			if (isWorksetUpdate) {
    -				// notify iteration head if responsible for workset update
    -				worksetBackChannel.notifyOfEndOfSuperstep();
    -			}
    -			
    -			boolean terminated = nextSuperstepLatch.awaitStartOfSuperstepOrTermination(currentIteration() + 1);
    +				if (log.isInfoEnabled()) {
    +					log.info(formatLogString("finishing iteration [" + currentIteration() + "]"));
    +				}
     
    -			if (terminated) {
    -				requestTermination();
    -			}
    -			else {
    -				incrementIterationCounter();
    +				// let the successors know that the end of this superstep data is reached
    +				sendEndOfSuperstep();
    +
    +				if (isWorksetUpdate) {
    +					// notify iteration head if responsible for workset update
    +					worksetBackChannel.notifyOfEndOfSuperstep();
    +				}
    +
    +				boolean terminated = nextSuperstepLatch.awaitStartOfSuperstepOrTermination(currentIteration() + 1);
    +
    +				if (terminated) {
    +					requestTermination();
    +					this.driver.cleanup();
    --- End diff --
    
    Am very sorry. That was a mistake. An oversight. I forgot to remove the cleanup() called after termination.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2496: FLINK-4615 Reusing the memory allocated for the dr...

Posted by ggevay <gi...@git.apache.org>.
Github user ggevay commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2496#discussion_r79347709
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/operators/Driver.java ---
    @@ -87,4 +87,6 @@
     	 * @throws Exception Exceptions may be forwarded.
     	 */
     	void cancel() throws Exception;
    +
    +	void resetForIterativeTasks() throws Exception;
    --- End diff --
    
    OK


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2496: FLINK-4615 Reusing the memory allocated for the dr...

Posted by ggevay <gi...@git.apache.org>.
Github user ggevay commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2496#discussion_r79003843
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationIntermediateTask.java ---
    @@ -82,43 +82,46 @@ protected void initialize() throws Exception {
     	public void run() throws Exception {
     		
     		SuperstepKickoffLatch nextSuperstepLatch = SuperstepKickoffLatchBroker.instance().get(brokerKey());
    +		try {
    +			while (this.running && !terminationRequested()) {
     
    -		while (this.running && !terminationRequested()) {
    +				if (log.isInfoEnabled()) {
    +					log.info(formatLogString("starting iteration [" + currentIteration() + "]"));
    +				}
     
    -			if (log.isInfoEnabled()) {
    -				log.info(formatLogString("starting iteration [" + currentIteration() + "]"));
    -			}
    +				super.run();
     
    -			super.run();
    +				// check if termination was requested
    +				verifyEndOfSuperstepState();
     
    -			// check if termination was requested
    -			verifyEndOfSuperstepState();
    +				if (isWorksetUpdate && isWorksetIteration) {
    +					long numCollected = worksetUpdateOutputCollector.getElementsCollectedAndReset();
    +					worksetAggregator.aggregate(numCollected);
    +				}
     
    -			if (isWorksetUpdate && isWorksetIteration) {
    -				long numCollected = worksetUpdateOutputCollector.getElementsCollectedAndReset();
    -				worksetAggregator.aggregate(numCollected);
    -			}
    -			
    -			if (log.isInfoEnabled()) {
    -				log.info(formatLogString("finishing iteration [" + currentIteration() + "]"));
    -			}
    -			
    -			// let the successors know that the end of this superstep data is reached
    -			sendEndOfSuperstep();
    -			
    -			if (isWorksetUpdate) {
    -				// notify iteration head if responsible for workset update
    -				worksetBackChannel.notifyOfEndOfSuperstep();
    -			}
    -			
    -			boolean terminated = nextSuperstepLatch.awaitStartOfSuperstepOrTermination(currentIteration() + 1);
    +				if (log.isInfoEnabled()) {
    +					log.info(formatLogString("finishing iteration [" + currentIteration() + "]"));
    +				}
     
    -			if (terminated) {
    -				requestTermination();
    -			}
    -			else {
    -				incrementIterationCounter();
    +				// let the successors know that the end of this superstep data is reached
    +				sendEndOfSuperstep();
    +
    +				if (isWorksetUpdate) {
    +					// notify iteration head if responsible for workset update
    +					worksetBackChannel.notifyOfEndOfSuperstep();
    +				}
    +
    +				boolean terminated = nextSuperstepLatch.awaitStartOfSuperstepOrTermination(currentIteration() + 1);
    +
    +				if (terminated) {
    +					requestTermination();
    +					this.driver.cleanup();
    --- End diff --
    
    `cleanup` will be called twice at the last step of the iteration, because it is also called in the `finally` block. This might potentially cause problems if one of the drivers assumes that `cleanup` will only be called once.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2496: FLINK-4615 Reusing the memory allocated for the dr...

Posted by ramkrish86 <gi...@git.apache.org>.
Github user ramkrish86 commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2496#discussion_r79017956
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/operators/Driver.java ---
    @@ -87,4 +87,6 @@
     	 * @throws Exception Exceptions may be forwarded.
     	 */
     	void cancel() throws Exception;
    +
    +	void resetForIterativeTasks() throws Exception;
    --- End diff --
    
    Ok. Let me update the code accordingly. Of course it is simple then. My only concern was I was not sure whether every driver can be made of type ResettableDriver when it was a special case for certain drivers. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2496: FLINK-4615 Reusing the memory allocated for the dr...

Posted by ramkrish86 <gi...@git.apache.org>.
Github user ramkrish86 commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2496#discussion_r79347430
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/operators/Driver.java ---
    @@ -87,4 +87,6 @@
     	 * @throws Exception Exceptions may be forwarded.
     	 */
     	void cancel() throws Exception;
    +
    +	void resetForIterativeTasks() throws Exception;
    --- End diff --
    
    @ggevay 
    Can i make a new PR that makes the drivers Resettable, rather than overwriting the current one. I can close this PR once the new one with Resettable is reviewed and we finalize on the changes of that PR.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2496: FLINK-4615 Reusing the memory allocated for the drivers a...

Posted by ramkrish86 <gi...@git.apache.org>.
Github user ramkrish86 commented on the issue:

    https://github.com/apache/flink/pull/2496
  
    Builds seems to pass with this latest commit. 
    Any feedback here @ggevay and @StephanEwen .


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2496: FLINK-4615 Reusing the memory allocated for the dr...

Posted by ggevay <gi...@git.apache.org>.
Github user ggevay commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2496#discussion_r79005879
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/operators/Driver.java ---
    @@ -87,4 +87,6 @@
     	 * @throws Exception Exceptions may be forwarded.
     	 */
     	void cancel() throws Exception;
    +
    +	void resetForIterativeTasks() throws Exception;
    --- End diff --
    
    I think you should use the already existing `ResettableDriver` interface instead of introducing a new resetting method.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2496: FLINK-4615 Reusing the memory allocated for the dr...

Posted by ggevay <gi...@git.apache.org>.
Github user ggevay commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2496#discussion_r79038567
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/operators/Driver.java ---
    @@ -87,4 +87,6 @@
     	 * @throws Exception Exceptions may be forwarded.
     	 */
     	void cancel() throws Exception;
    +
    +	void resetForIterativeTasks() throws Exception;
    --- End diff --
    
    Currently, the `ResettableDriver` interface extends the `Driver` interface. I would say that for the moment, let's just start by making some of the drivers resettable in this PR, and keep both interfaces around. And then when this PR is merged, a second PR could make all the remaing drivers resettable as well. And then a third one could merge the functionality of `ResettableDriver` into `Driver`, and thus get rid of `ResettableDriver`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2496: FLINK-4615 Reusing the memory allocated for the dr...

Posted by ramkrish86 <gi...@git.apache.org>.
Github user ramkrish86 commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2496#discussion_r79007600
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/operators/Driver.java ---
    @@ -87,4 +87,6 @@
     	 * @throws Exception Exceptions may be forwarded.
     	 */
     	void cancel() throws Exception;
    +
    +	void resetForIterativeTasks() throws Exception;
    --- End diff --
    
    I saw that code. But am not sure how to do that. Driver is actually instantiated from the config. So how to ensure that it is of type ResettableDriver? You mean all drivers will now become a type of ResettableDriver? 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2496: FLINK-4615 Reusing the memory allocated for the dr...

Posted by ggevay <gi...@git.apache.org>.
Github user ggevay commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2496#discussion_r79040795
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/operators/Driver.java ---
    @@ -87,4 +87,6 @@
     	 * @throws Exception Exceptions may be forwarded.
     	 */
     	void cancel() throws Exception;
    +
    +	void resetForIterativeTasks() throws Exception;
    --- End diff --
    
    For example, the various combine drivers seem to be easy to make resettable. Maybe this PR should just do that, so we maximize the chances that we can get the PR to be merged.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---