You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by ramkrish86 <gi...@git.apache.org> on 2016/09/19 10:31:56 UTC

[GitHub] flink pull request #2510: FLINK-3322 Allow drivers and iterators to reuse th...

GitHub user ramkrish86 opened a pull request:

    https://github.com/apache/flink/pull/2510

    FLINK-3322 Allow drivers and iterators to reuse the memory segments

    Thanks for contributing to Apache Flink. Before you open your pull request, please take the following check list into consideration.
    If your changes take all of the items into account, feel free to open your pull request. For more information and/or questions please refer to the [How To Contribute guide](http://flink.apache.org/how-to-contribute.html).
    In addition to going through the list, please provide a meaningful description of your changes.
    
    - [ ] General
      - The pull request references the related JIRA issue ("[FLINK-XXX] Jira title text")
      - The pull request addresses only one issue
      - Each commit in the PR has a meaningful commit message (including the JIRA id)
    
    - [ ] Documentation
      - Documentation has been added for new functionality
      - Old documentation affected by the pull request has been updated
      - JavaDoc for public methods has been added
    
    - [ ] Tests & Build
      - Functionality added by the pull request is covered by tests
      - `mvn clean verify` has been executed successfully locally or a Travis build has passed
    
    Updated PR based on @ggevay 's feedback. Now the drivers are all made of type ResettableDrivers and we use the existing APIs to reuse the driver and the iterators. The JoinTaskITerator has now a new method called reset() that allows to reset the iterator's state.
    If we go with the approach then it makes sense to remove the ResettableDriver and add all the API in it to the Driver interface itself. That is for a later PR. 
    @ggevay - Pls review and give your valuable feedback.
    I tried mvn clean verify and got the tests to pass in flink-runtime and flink-gelly-examples. One failed 'org.apache.flink.runtime.io.disk.ChannelViewsTest.testWriteAndReadLongRecords' and when I ran locally it passed again. 


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/ramkrish86/flink FLINK-4615_new

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/2510.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #2510
    
----
commit e17462881cc2f0137cc8412eb578fb4c42baeb15
Author: Ramkrishna <ra...@intel.com>
Date:   2016-09-19T10:27:28Z

    FLINK-3322 Allow drivers and iterators to reuse the memory segments

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2510: FLINK-3322 Allow drivers and iterators to reuse the memor...

Posted by ramkrish86 <gi...@git.apache.org>.
Github user ramkrish86 commented on the issue:

    https://github.com/apache/flink/pull/2510
  
    @StephanEwen - Any comments/feedback here. A gentle reminder !!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2510: FLINK-3322 Allow drivers and iterators to reuse the memor...

Posted by ramkrish86 <gi...@git.apache.org>.
Github user ramkrish86 commented on the issue:

    https://github.com/apache/flink/pull/2510
  
    @StephanEwen 
    Do you have some time now? A gentle reminder!!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2510: FLINK-3322 Allow drivers and iterators to reuse the memor...

Posted by ramkrish86 <gi...@git.apache.org>.
Github user ramkrish86 commented on the issue:

    https://github.com/apache/flink/pull/2510
  
    Any chance of a review here @ggevay  and @StephanEwen - A gentle reminder - in case you have some time.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2510: FLINK-3322 Allow drivers and iterators to reuse the memor...

Posted by ggevay <gi...@git.apache.org>.
Github user ggevay commented on the issue:

    https://github.com/apache/flink/pull/2510
  
    Sorry, I'm extremely busy these days. I'm not sure when will I have time, unfortunately.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2510: FLINK-3322 Allow drivers and iterators to reuse th...

Posted by ramkrish86 <gi...@git.apache.org>.
Github user ramkrish86 commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2510#discussion_r79382611
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/AbstractIterativeTask.java ---
    @@ -125,6 +125,16 @@ protected void initialize() throws Exception {
     	}
     
     	@Override
    +	protected void postRun() throws Exception {
    --- End diff --
    
    The instanceof for ResettableDriver like
    `if (this.driver instanceof ResettableDriver) {`
    
    was being done in AbstractITerativetask. So I thought if we could clearly seperate out what the finalize clause after every run() can do should be differentiated based on the current instance executing it. Let BatchTask do only driver.cleanup and let the AbstractIterativeTask do based on the type of the driver. I can remove it if not needed. That was just for code clarity.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2510: FLINK-3322 Allow drivers and iterators to reuse th...

Posted by ggevay <gi...@git.apache.org>.
Github user ggevay commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2510#discussion_r79375332
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/AbstractIterativeTask.java ---
    @@ -125,6 +125,16 @@ protected void initialize() throws Exception {
     	}
     
     	@Override
    +	protected void postRun() throws Exception {
    --- End diff --
    
    Why did you introduce `postRun`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2510: FLINK-3322 Allow drivers and iterators to reuse the memor...

Posted by ramkrish86 <gi...@git.apache.org>.
Github user ramkrish86 commented on the issue:

    https://github.com/apache/flink/pull/2510
  
    @StephanEwen 
    Do you have some time to check this? :)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2510: FLINK-3322 Allow drivers and iterators to reuse the memor...

Posted by ramkrish86 <gi...@git.apache.org>.
Github user ramkrish86 commented on the issue:

    https://github.com/apache/flink/pull/2510
  
    Sounds good @StephanEwen . Thank you.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2510: FLINK-3322 Allow drivers and iterators to reuse the memor...

Posted by ramkrish86 <gi...@git.apache.org>.
Github user ramkrish86 commented on the issue:

    https://github.com/apache/flink/pull/2510
  
    @ggevay 
    Do you have some time to check the last commit in this PR?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2510: FLINK-3322 Allow drivers and iterators to reuse th...

Posted by ggevay <gi...@git.apache.org>.
Github user ggevay commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2510#discussion_r79373063
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/operators/resettable/AbstractBlockResettableIterator.java ---
    @@ -183,4 +183,24 @@ protected T getNextRecord() throws IOException {
     		}
     	}
     
    +	public void resetForIterativeTasks() {
    --- End diff --
    
    This class already has a method called `reset`. Why do we need to add another resetting method?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2510: FLINK-3322 Allow drivers and iterators to reuse th...

Posted by ramkrish86 <gi...@git.apache.org>.
Github user ramkrish86 commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2510#discussion_r79761429
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/operators/JoinDriver.java ---
    @@ -128,17 +130,22 @@ public void prepare() throws Exception{
     				ConfigConstants.DEFAULT_RUNTIME_HASH_JOIN_BLOOM_FILTERS);
     
     		// create and return joining iterator according to provided local strategy.
    -		if (objectReuseEnabled) {
    -			switch (ls) {
    -				case INNER_MERGE:
    -					this.joinIterator = new ReusingMergeInnerJoinIterator<>(in1, in2, 
    +		if (reset) {
    +			resetForIterativeTasks(in1, in2, serializer1, serializer2, comparator1, comparator2, pairComparatorFactory);
    +			reset = false;
    +		}
    +		if (joinIterator == null) {
    --- End diff --
    
    @ggevay 
    Can you have a look at this?  If we can really see how cases like the PageRank can be solved then may be we can have the reset() method to do the actual reset. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2510: FLINK-3322 Allow drivers and iterators to reuse the memor...

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on the issue:

    https://github.com/apache/flink/pull/2510
  
    There are a bunch of quite pressing issues (hard requirements for some use cases) that I am working on these weeks. I'll try to come back to this after we have gotten those done...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2510: FLINK-3322 Allow drivers and iterators to reuse th...

Posted by ggevay <gi...@git.apache.org>.
Github user ggevay commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2510#discussion_r79376690
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/operators/JoinDriver.java ---
    @@ -128,17 +130,22 @@ public void prepare() throws Exception{
     				ConfigConstants.DEFAULT_RUNTIME_HASH_JOIN_BLOOM_FILTERS);
     
     		// create and return joining iterator according to provided local strategy.
    -		if (objectReuseEnabled) {
    -			switch (ls) {
    -				case INNER_MERGE:
    -					this.joinIterator = new ReusingMergeInnerJoinIterator<>(in1, in2, 
    +		if (reset) {
    +			resetForIterativeTasks(in1, in2, serializer1, serializer2, comparator1, comparator2, pairComparatorFactory);
    +			reset = false;
    +		}
    +		if (joinIterator == null) {
    --- End diff --
    
    This solution here with the `reset` flag is very hacky. I would like to point your attention to the `initialize` method of `ResettableDriver`, which is called once before the first iteration step. Instead of controlling with flags what `prepare` does, you should cleanly separate its work into `initialize` and `reset`. (If they have overlapping parts, then these can go to a new private method that both of them call.)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2510: FLINK-3322 Allow drivers and iterators to reuse the memor...

Posted by ramkrish86 <gi...@git.apache.org>.
Github user ramkrish86 commented on the issue:

    https://github.com/apache/flink/pull/2510
  
    @StephanEwen 
    Do you have some time now? A gentle reminder. No problem if you don't. I can wait.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2510: FLINK-3322 Allow drivers and iterators to reuse th...

Posted by ramkrish86 <gi...@git.apache.org>.
Github user ramkrish86 commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2510#discussion_r79539655
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/operators/JoinDriver.java ---
    @@ -128,17 +130,22 @@ public void prepare() throws Exception{
     				ConfigConstants.DEFAULT_RUNTIME_HASH_JOIN_BLOOM_FILTERS);
     
     		// create and return joining iterator according to provided local strategy.
    -		if (objectReuseEnabled) {
    -			switch (ls) {
    -				case INNER_MERGE:
    -					this.joinIterator = new ReusingMergeInnerJoinIterator<>(in1, in2, 
    +		if (reset) {
    +			resetForIterativeTasks(in1, in2, serializer1, serializer2, comparator1, comparator2, pairComparatorFactory);
    +			reset = false;
    +		}
    +		if (joinIterator == null) {
    --- End diff --
    
    @ggevay 
    I can now say why I went with the `reset` boolean. In case of `PageRankITCase#testPageRankSmallNumberOfIterations' the way the input is closed and opened again - things does not work if we allow the iterators to be reset in the `reset' method. Because the input is referring to the older input iterators. Instead of it is done in the `prepare` it works fine. I can see that this eg uses the TempBarriers and hence the change in behaviour.
    Hence when i went with a boolean way I could atleast show that reset is having some impl and those drivers work on the action based on 'reset'. 
    That is why if you see the impl of (for eg) `JoinWithSolutionSetFirstDriver` the reset() is left empty and also the updation of the inputs happen in the run() method. IMHO  having `reset()`, `prepare()` and `initialize()` is bit confusing and needs to be refactored. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2510: FLINK-3322 Allow drivers and iterators to reuse th...

Posted by ramkrish86 <gi...@git.apache.org>.
Github user ramkrish86 commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2510#discussion_r79384469
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/operators/JoinDriver.java ---
    @@ -128,17 +130,22 @@ public void prepare() throws Exception{
     				ConfigConstants.DEFAULT_RUNTIME_HASH_JOIN_BLOOM_FILTERS);
     
     		// create and return joining iterator according to provided local strategy.
    -		if (objectReuseEnabled) {
    -			switch (ls) {
    -				case INNER_MERGE:
    -					this.joinIterator = new ReusingMergeInnerJoinIterator<>(in1, in2, 
    +		if (reset) {
    +			resetForIterativeTasks(in1, in2, serializer1, serializer2, comparator1, comparator2, pairComparatorFactory);
    +			reset = false;
    +		}
    +		if (joinIterator == null) {
    --- End diff --
    
    Yes. I saw this pattern and how it was used. The existing ResettableDriver impls like
    JoinWithSolutionSetFirstDriver, AbstractCachedBuildSideJoinDriver was actually doing things in 
    `initialize()` was called only once for the first time and was leaving out `prepare()` as empty as it was called for every iteration. 
    Now we may have to change even them if we need to unify all. I can make those changes. Let me check. Thanks @ggevay .



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2510: FLINK-3322 Allow drivers and iterators to reuse th...

Posted by ramkrish86 <gi...@git.apache.org>.
Github user ramkrish86 commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2510#discussion_r79755676
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/operators/JoinDriver.java ---
    @@ -128,17 +130,22 @@ public void prepare() throws Exception{
     				ConfigConstants.DEFAULT_RUNTIME_HASH_JOIN_BLOOM_FILTERS);
     
     		// create and return joining iterator according to provided local strategy.
    -		if (objectReuseEnabled) {
    -			switch (ls) {
    -				case INNER_MERGE:
    -					this.joinIterator = new ReusingMergeInnerJoinIterator<>(in1, in2, 
    +		if (reset) {
    +			resetForIterativeTasks(in1, in2, serializer1, serializer2, comparator1, comparator2, pairComparatorFactory);
    +			reset = false;
    +		}
    +		if (joinIterator == null) {
    --- End diff --
    
    I can submit a PR with my updated changes. As said above this time though the drivers are ResettableDrivers, `reset()` does not have any impl and everything is done in `prepare()`.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2510: FLINK-3322 Allow drivers and iterators to reuse th...

Posted by ramkrish86 <gi...@git.apache.org>.
Github user ramkrish86 commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2510#discussion_r79382231
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/operators/resettable/AbstractBlockResettableIterator.java ---
    @@ -183,4 +183,24 @@ protected T getNextRecord() throws IOException {
     		}
     	}
     
    +	public void resetForIterativeTasks() {
    --- End diff --
    
    The already existing 'reset' tries to reset with in the logic of iteration.
    `this.readView.setReadPosition(0);
    		this.numRecordsReturned = 0;`
    
    But it does not put back the fullSegments back to the emptySegment. We need a much enhanced reset version. Hence I did not want to change the available one and hence added a new one.  WDYT?  


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2510: FLINK-3322 Allow drivers and iterators to reuse th...

Posted by ramkrish86 <gi...@git.apache.org>.
Github user ramkrish86 commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2510#discussion_r79381731
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/operators/util/JoinTaskIterator.java ---
    @@ -68,4 +72,9 @@
     	 * method signals an interrupt to that procedure.
     	 */
     	void abort();
    +
    +	/**
    +	 * Allows to reset the iterator
    +	 */
    +	void reset(MutableObjectIterator<V1> in1, MutableObjectIterator<V2> in2, TypeSerializer<V1> serializer1, TypeSerializer<V2> serializer2, TypeComparator<V1> comp1, TypeComparator<V2> comp2, TypePairComparatorFactory<V1, V2> pairComparatorFactory);
    --- End diff --
    
    Ok. I can change them and test once. I found that the inputs are changing on a reset and hence expected the comparator and serializer also to be fetched every time. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2510: FLINK-3322 Allow drivers and iterators to reuse th...

Posted by ggevay <gi...@git.apache.org>.
Github user ggevay commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2510#discussion_r79370262
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/operators/util/JoinTaskIterator.java ---
    @@ -68,4 +72,9 @@
     	 * method signals an interrupt to that procedure.
     	 */
     	void abort();
    +
    +	/**
    +	 * Allows to reset the iterator
    +	 */
    +	void reset(MutableObjectIterator<V1> in1, MutableObjectIterator<V2> in2, TypeSerializer<V1> serializer1, TypeSerializer<V2> serializer2, TypeComparator<V1> comp1, TypeComparator<V2> comp2, TypePairComparatorFactory<V1, V2> pairComparatorFactory);
    --- End diff --
    
    Could you pass in only those objects here, which might change during a reset? I think these should only be the two inputs.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---