You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by dabaitu <gi...@git.apache.org> on 2015/06/21 00:40:07 UTC

[GitHub] flink pull request: FLINK-1085: Unnecessary failing of GroupReduce...

GitHub user dabaitu opened a pull request:

    https://github.com/apache/flink/pull/854

    FLINK-1085: Unnecessary failing of GroupReduceCombineDriver

    I have a unit test failure and it seems it may have been there before my changes:
    
    Tests in error: 
      UtilsTest.testUberjarLocator:39 NullPointer
    
    It's looking for some uber jar and not finding it. Did I need to compile something else first?

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/dabaitu/flink master

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/854.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #854
    
----
commit e693ba2398c170b6d65a2365a4cc0fe091f3319b
Author: dabaitu <to...@gmail.com>
Date:   2015-06-20T22:35:48Z

    FLINK-1085: change exception to warning log and track oversized Record count

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-1085] Unnecessary failing of GroupReduc...

Posted by dabaitu <gi...@git.apache.org>.
Github user dabaitu commented on the pull request:

    https://github.com/apache/flink/pull/854#issuecomment-118219396
  
    @StephanEwen please review again. Thanks!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-1085] Unnecessary failing of GroupReduc...

Posted by dabaitu <gi...@git.apache.org>.
Github user dabaitu commented on a diff in the pull request:

    https://github.com/apache/flink/pull/854#discussion_r33749807
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/operators/GroupReduceCombineDriver.java ---
    @@ -170,7 +171,14 @@ public void run() throws Exception {
     
     			// write the value again
     			if (!this.sorter.write(value)) {
    -				throw new IOException("Cannot write record to fresh sort buffer. Record too large.");
    +				if (oversizedRecordCount == Long.MAX_VALUE) {
    +					LOG.debug("Number of oversized record has exceeded MAX Long");
    +				} else {
    +					++oversizedRecordCount;
    +					LOG.debug("Cannot write record to fresh sort buffer. Record too large. Oversized record count: {}", oversizedRecordCount);
    +				}
    +				// simply forward the record
    +				this.output.collect((OUT)value);
    --- End diff --
    
    I tried to call
    
    this.output.collect(value)
    I couldn't figure out how to convert this single value from type IN to type OUT besides direct casting. Could you guys please help me? @StephanEwen @rmetzger


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-1085] Unnecessary failing of GroupReduc...

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on a diff in the pull request:

    https://github.com/apache/flink/pull/854#discussion_r34425008
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/operators/GroupReduceCombineDriver.java ---
    @@ -170,7 +171,14 @@ public void run() throws Exception {
     
     			// write the value again
     			if (!this.sorter.write(value)) {
    -				throw new IOException("Cannot write record to fresh sort buffer. Record too large.");
    +				if (oversizedRecordCount == Long.MAX_VALUE) {
    --- End diff --
    
    I think we can skip this test. It's going to be very hard to process 2^63 oversized records in a single Flink thread. Might take a millennium or so ;-)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-1085] Unnecessary failing of GroupReduc...

Posted by rmetzger <gi...@git.apache.org>.
Github user rmetzger commented on the pull request:

    https://github.com/apache/flink/pull/854#issuecomment-113835793
  
    How did you start the tests?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-1085] Unnecessary failing of GroupReduc...

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on the pull request:

    https://github.com/apache/flink/pull/854#issuecomment-114043020
  
    I think this code actually simply drops oversized records, or am I overlooking something?
    
    Also, outputting oversized records on WARN debug level seems heavy, it is not really something that should alert the person that runs the program.
    
    Before merging this, we need to
      - Make sure the records are not lost
      - Add a test that validates exactly that (not as an ITCase, but a unit test)
      - Reduce the log level for large records to DEBUG


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-1085] Unnecessary failing of GroupReduc...

Posted by dabaitu <gi...@git.apache.org>.
Github user dabaitu commented on the pull request:

    https://github.com/apache/flink/pull/854#issuecomment-113842215
  
    @rmetzger : 'mvn test'


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-1085] Unnecessary failing of GroupReduc...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/flink/pull/854


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-1085] Unnecessary failing of GroupReduc...

Posted by dabaitu <gi...@git.apache.org>.
Github user dabaitu commented on the pull request:

    https://github.com/apache/flink/pull/854#issuecomment-120769914
  
    ProcessFailureStreamingRecoveryITCase passes locally. Is the master branch broken?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-1085] Unnecessary failing of GroupReduc...

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on the pull request:

    https://github.com/apache/flink/pull/854#issuecomment-114396094
  
    The combiner partially sorts the records, to group subsequences in order to combine (reduce) them.
    
    If a record does not fit into a fresh sort buffer, then we can simply emit it as it is. There is no assumption on records being emitted in sorted order.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-1085] Unnecessary failing of GroupReduc...

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on a diff in the pull request:

    https://github.com/apache/flink/pull/854#discussion_r33809780
  
    --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/operators/CombineTaskTest.java ---
    @@ -92,7 +92,35 @@ public void testCombineTask() {
     		
     		this.outList.clear();
     	}
    -	
    +
    +	@Test
    +	public void testOversizedRecordCombineTask() {
    +		int keyCnt = 1;
    +		int valCnt = 20;
    +
    +		addInput(new UniformRecordGenerator(keyCnt, valCnt, true));
    +		addDriverComparator(this.comparator);
    +		addDriverComparator(this.comparator);
    +		setOutput(this.outList);
    +
    +		getTaskConfig().setDriverStrategy(DriverStrategy.SORTED_GROUP_COMBINE);
    +		getTaskConfig().setRelativeMemoryDriver(combine_frac);
    --- End diff --
    
    I think you cannot reduce the memory further below to enforce records to be oversized. I think you need to change the data generator to provide you with an oversized record. I think the tests have a `UnionIterator` where you can mix an oversized record into the generating iterator.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-1085] Unnecessary failing of GroupReduc...

Posted by dabaitu <gi...@git.apache.org>.
Github user dabaitu commented on a diff in the pull request:

    https://github.com/apache/flink/pull/854#discussion_r33749843
  
    --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/operators/CombineTaskTest.java ---
    @@ -92,7 +92,35 @@ public void testCombineTask() {
     		
     		this.outList.clear();
     	}
    -	
    +
    +	@Test
    +	public void testOversizedRecordCombineTask() {
    +		int keyCnt = 1;
    +		int valCnt = 20;
    +
    +		addInput(new UniformRecordGenerator(keyCnt, valCnt, true));
    +		addDriverComparator(this.comparator);
    +		addDriverComparator(this.comparator);
    +		setOutput(this.outList);
    +
    +		getTaskConfig().setDriverStrategy(DriverStrategy.SORTED_GROUP_COMBINE);
    +		getTaskConfig().setRelativeMemoryDriver(combine_frac);
    --- End diff --
    
    I tried tweaking this to configure the memory size of the sorter, but failed. Please help too


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-1085] Unnecessary failing of GroupReduc...

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on the pull request:

    https://github.com/apache/flink/pull/854#issuecomment-120842381
  
    I think there was a test instability, but it was fixed a bit back...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-1085] Unnecessary failing of GroupReduc...

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on a diff in the pull request:

    https://github.com/apache/flink/pull/854#discussion_r33809590
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/operators/GroupReduceCombineDriver.java ---
    @@ -170,7 +171,14 @@ public void run() throws Exception {
     
     			// write the value again
     			if (!this.sorter.write(value)) {
    -				throw new IOException("Cannot write record to fresh sort buffer. Record too large.");
    +				if (oversizedRecordCount == Long.MAX_VALUE) {
    +					LOG.debug("Number of oversized record has exceeded MAX Long");
    +				} else {
    +					++oversizedRecordCount;
    +					LOG.debug("Cannot write record to fresh sort buffer. Record too large. Oversized record count: {}", oversizedRecordCount);
    +				}
    +				// simply forward the record
    +				this.output.collect((OUT)value);
    --- End diff --
    
    I think this is fine, calling `out.collect((OUT) value)`. The generics are a bit confusing here, I think. OUT is for combine functions really the same thing as IN - they are not allowed to change the type. This should really be simplified, otherwise it is confusing, agreed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-1085] Unnecessary failing of GroupReduc...

Posted by dabaitu <gi...@git.apache.org>.
Github user dabaitu commented on the pull request:

    https://github.com/apache/flink/pull/854#issuecomment-114331577
  
    @StephanEwen @fhueske - apologies, I forgot to call collect. 
    A dumb question - why do we need to sort the records before combining them? Should I just call collect on the oversized record or still pass it to the combiner? What do combiners do(or what are they suppose to do in general)?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-1085] Unnecessary failing of GroupReduc...

Posted by rmetzger <gi...@git.apache.org>.
Github user rmetzger commented on the pull request:

    https://github.com/apache/flink/pull/854#issuecomment-113842515
  
    Can you try to run "mvn clean verify" in the project root?
    It will use a lot of resources and run for more than 30 minutes but the tests should pass ;)
    
    Also, the travis (CI system) build for your pull request passed.
    So the tests are all good.
    
    Lets wait for @fhueske to review your proposed changes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-1085] Unnecessary failing of GroupReduc...

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on the pull request:

    https://github.com/apache/flink/pull/854#issuecomment-120842417
  
    Looks good, let me merge this...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-1085] Unnecessary failing of GroupReduc...

Posted by dabaitu <gi...@git.apache.org>.
Github user dabaitu commented on a diff in the pull request:

    https://github.com/apache/flink/pull/854#discussion_r33746632
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/operators/GroupReduceCombineDriver.java ---
    @@ -170,7 +171,12 @@ public void run() throws Exception {
     
     			// write the value again
     			if (!this.sorter.write(value)) {
    -				throw new IOException("Cannot write record to fresh sort buffer. Record too large.");
    +				if (oversizedRecordCount == Long.MAX_VALUE) {
    +					LOG.error("Number of oversized record has exceeded MAX Long");
    +				} else {
    +					++oversizedRecordCount;
    +					LOG.warn("Cannot write record to fresh sort buffer. Record too large. Oversized record count: {}", oversizedRecordCount);
    +				}
    --- End diff --
    
    I tried to call
    ```
    this.output.collect(value)
    ```
    I couldn't figure out how to convert this single value from type IN to type OUT. Could you guys please help me? @StephanEwen @rmetzger 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---