You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@pig.apache.org by Mohit Sabharwal <mo...@cloudera.com> on 2015/05/09 00:26:47 UTC

Review Request 34003: PIG-4542: OutputConsumerIterator should flush buffered records

-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/34003/
-----------------------------------------------------------

Review request for pig, liyun zhang, Praveen Rachabattuni, and Xuefu Zhang.


Bugs: PIG-4542
    https://issues.apache.org/jira/browse/PIG-4542


Repository: pig-git


Description
-------

PIG-4542: OutputConsumerIterator should flush buffered records

Unnecessary use of RDD.count() was bugging me. This patch addresses that.

Certain operators may buffer the output. We need to flush the last set of records from such operators, 
when we encounter the last input record, before calling getNextTuple() for the last time.

Currently, to flush the last set of records, we compute RDD.count() and compare the count with a 
running counter to determine if we have reached the last record. This is an unnecessary and inefficient.

This patch:
- Gets rid of the use of RDD.count() in CollectedGroupConverter and LimitConverter.
- Adds an abstract method endInput() which is executed before we call getNextTuple() for the last time.
- Deletes POStreamSpark since it was just handling the last record.
- Removed special code in POCollectedGroup to handle the last record.
- While I was here, renamed POOutputConsumerIterator to OutputConsumerIterator. The "PO" prefix
should only be usded for physical operators.


Diffs
-----

  src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCollectedGroup.java 7f81c1dce3304011fe89896dd61d46977bcc8821 
  src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java 8654b1b6c6436ed6dd0f24358e5b633e6c963d54 
  src/org/apache/pig/backend/hadoop/executionengine/spark/converter/CollectedGroupConverter.java 99005b7a4eeec5f024512c4aefb396117f574619 
  src/org/apache/pig/backend/hadoop/executionengine/spark/converter/ForEachConverter.java e976cea44f83e83a3ce28fd03374ede7b52fdeb9 
  src/org/apache/pig/backend/hadoop/executionengine/spark/converter/LimitConverter.java 3074f5e6ea4ac15f38affcdaa3e23c3d66bdc104 
  src/org/apache/pig/backend/hadoop/executionengine/spark/converter/POOutputConsumerIterator.java 8b31197f879ed14086a7377635f2877fa3db0e9f 
  src/org/apache/pig/backend/hadoop/executionengine/spark/converter/StreamConverter.java 2c115fc3044c08cc8c96d27a08c9761456aea158 
  src/org/apache/pig/backend/hadoop/executionengine/spark/operator/POStreamSpark.java f42d8ff4653245c9c14ddbea949e66a7609a02a5 
  src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkCompiler.java c66fe7b154f18ccc3aa0f5dab31a5952947fdca2 
  test/org/apache/pig/test/TestCollectedGroup.java a958d335a880432145dc80fec0ef2054ce90a3bf 

Diff: https://reviews.apache.org/r/34003/diff/


Testing
-------

TestStreaming passes.
No new failures in TestCollectedGroup.


Thanks,

Mohit Sabharwal


Re: Review Request 34003: PIG-4542: OutputConsumerIterator should flush buffered records

Posted by kelly zhang <li...@intel.com>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/34003/#review83380
-----------------------------------------------------------



src/org/apache/pig/backend/hadoop/executionengine/spark/converter/StreamConverter.java
<https://reviews.apache.org/r/34003/#comment134364>

    move java.xxxx to before import org.apache.hadoop.mapred.JobConf;



test/org/apache/pig/test/TestCollectedGroup.java
<https://reviews.apache.org/r/34003/#comment134366>

    remove import org.apache.pig.ExecType. this import is not needed


- kelly zhang


On May 12, 2015, 4:21 a.m., Mohit Sabharwal wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/34003/
> -----------------------------------------------------------
> 
> (Updated May 12, 2015, 4:21 a.m.)
> 
> 
> Review request for pig, liyun zhang, Praveen Rachabattuni, and Xuefu Zhang.
> 
> 
> Bugs: PIG-4542
>     https://issues.apache.org/jira/browse/PIG-4542
> 
> 
> Repository: pig-git
> 
> 
> Description
> -------
> 
> PIG-4542: OutputConsumerIterator should flush buffered records
> 
> Unnecessary use of RDD.count() was bugging me. This patch addresses that.
> 
> Certain operators may buffer the output. We need to flush the last set of records from such operators, 
> when we encounter the last input record, before calling getNextTuple() for the last time.
> 
> Currently, to flush the last set of records, we compute RDD.count() and compare the count with a 
> running counter to determine if we have reached the last record. This is an unnecessary and inefficient.
> 
> This patch:
> - Gets rid of the use of RDD.count() in CollectedGroupConverter and LimitConverter.
> - Adds an abstract method endInput() which is executed before we call getNextTuple() for the last time.
> - Deletes POStreamSpark since it was just handling the last record.
> - Removed special code in POCollectedGroup to handle the last record.
> - While I was here, renamed POOutputConsumerIterator to OutputConsumerIterator. The "PO" prefix
> should only be usded for physical operators.
> 
> 
> Diffs
> -----
> 
>   src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/PhysicalOperator.java 3e75cadbca4d98bfd7aeb49da5f298acdab2bc4d 
>   src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCollectedGroup.java 7f81c1dce3304011fe89896dd61d46977bcc8821 
>   src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java 8654b1b6c6436ed6dd0f24358e5b633e6c963d54 
>   src/org/apache/pig/backend/hadoop/executionengine/spark/converter/CollectedGroupConverter.java 99005b7a4eeec5f024512c4aefb396117f574619 
>   src/org/apache/pig/backend/hadoop/executionengine/spark/converter/ForEachConverter.java e976cea44f83e83a3ce28fd03374ede7b52fdeb9 
>   src/org/apache/pig/backend/hadoop/executionengine/spark/converter/LimitConverter.java 3074f5e6ea4ac15f38affcdaa3e23c3d66bdc104 
>   src/org/apache/pig/backend/hadoop/executionengine/spark/converter/POOutputConsumerIterator.java 8b31197f879ed14086a7377635f2877fa3db0e9f 
>   src/org/apache/pig/backend/hadoop/executionengine/spark/converter/StreamConverter.java 2c115fc3044c08cc8c96d27a08c9761456aea158 
>   src/org/apache/pig/backend/hadoop/executionengine/spark/operator/POStreamSpark.java f42d8ff4653245c9c14ddbea949e66a7609a02a5 
>   src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkCompiler.java c66fe7b154f18ccc3aa0f5dab31a5952947fdca2 
>   test/org/apache/pig/test/TestCollectedGroup.java a958d335a880432145dc80fec0ef2054ce90a3bf 
> 
> Diff: https://reviews.apache.org/r/34003/diff/
> 
> 
> Testing
> -------
> 
> TestStreaming passes.
> No new failures in TestCollectedGroup.
> 
> 
> Thanks,
> 
> Mohit Sabharwal
> 
>


Re: Review Request 34003: PIG-4542: OutputConsumerIterator should flush buffered records

Posted by Mohit Sabharwal <mo...@cloudera.com>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/34003/
-----------------------------------------------------------

(Updated May 12, 2015, 9:37 p.m.)


Review request for pig, liyun zhang, Praveen Rachabattuni, and Xuefu Zhang.


Changes
-------

Incorporated feedback.


Bugs: PIG-4542
    https://issues.apache.org/jira/browse/PIG-4542


Repository: pig-git


Description
-------

PIG-4542: OutputConsumerIterator should flush buffered records

Unnecessary use of RDD.count() was bugging me. This patch addresses that.

Certain operators may buffer the output. We need to flush the last set of records from such operators, 
when we encounter the last input record, before calling getNextTuple() for the last time.

Currently, to flush the last set of records, we compute RDD.count() and compare the count with a 
running counter to determine if we have reached the last record. This is an unnecessary and inefficient.

This patch:
- Gets rid of the use of RDD.count() in CollectedGroupConverter and LimitConverter.
- Adds an abstract method endInput() which is executed before we call getNextTuple() for the last time.
- Deletes POStreamSpark since it was just handling the last record.
- Removed special code in POCollectedGroup to handle the last record.
- While I was here, renamed POOutputConsumerIterator to OutputConsumerIterator. The "PO" prefix
should only be usded for physical operators.


Diffs (updated)
-----

  src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/PhysicalOperator.java 3e75cadbca4d98bfd7aeb49da5f298acdab2bc4d 
  src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCollectedGroup.java 7f81c1dce3304011fe89896dd61d46977bcc8821 
  src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java 8654b1b6c6436ed6dd0f24358e5b633e6c963d54 
  src/org/apache/pig/backend/hadoop/executionengine/spark/converter/CollectedGroupConverter.java 99005b7a4eeec5f024512c4aefb396117f574619 
  src/org/apache/pig/backend/hadoop/executionengine/spark/converter/ForEachConverter.java e976cea44f83e83a3ce28fd03374ede7b52fdeb9 
  src/org/apache/pig/backend/hadoop/executionengine/spark/converter/LimitConverter.java 3074f5e6ea4ac15f38affcdaa3e23c3d66bdc104 
  src/org/apache/pig/backend/hadoop/executionengine/spark/converter/POOutputConsumerIterator.java 8b31197f879ed14086a7377635f2877fa3db0e9f 
  src/org/apache/pig/backend/hadoop/executionengine/spark/converter/StreamConverter.java 2c115fc3044c08cc8c96d27a08c9761456aea158 
  src/org/apache/pig/backend/hadoop/executionengine/spark/operator/POStreamSpark.java f42d8ff4653245c9c14ddbea949e66a7609a02a5 
  src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkCompiler.java c66fe7b154f18ccc3aa0f5dab31a5952947fdca2 
  test/org/apache/pig/test/TestCollectedGroup.java a958d335a880432145dc80fec0ef2054ce90a3bf 

Diff: https://reviews.apache.org/r/34003/diff/


Testing
-------

TestStreaming passes.
No new failures in TestCollectedGroup.


Thanks,

Mohit Sabharwal


Re: Review Request 34003: PIG-4542: OutputConsumerIterator should flush buffered records

Posted by Mohit Sabharwal <mo...@cloudera.com>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/34003/
-----------------------------------------------------------

(Updated May 12, 2015, 4:21 a.m.)


Review request for pig, liyun zhang, Praveen Rachabattuni, and Xuefu Zhang.


Changes
-------

Incorp. review feedback.
 - Added to SparkLauncher: new PhyPlanSetter(leaf.physicalPlan).visit();
 - Fixed parent plan in CollectedGroupConverter


Bugs: PIG-4542
    https://issues.apache.org/jira/browse/PIG-4542


Repository: pig-git


Description
-------

PIG-4542: OutputConsumerIterator should flush buffered records

Unnecessary use of RDD.count() was bugging me. This patch addresses that.

Certain operators may buffer the output. We need to flush the last set of records from such operators, 
when we encounter the last input record, before calling getNextTuple() for the last time.

Currently, to flush the last set of records, we compute RDD.count() and compare the count with a 
running counter to determine if we have reached the last record. This is an unnecessary and inefficient.

This patch:
- Gets rid of the use of RDD.count() in CollectedGroupConverter and LimitConverter.
- Adds an abstract method endInput() which is executed before we call getNextTuple() for the last time.
- Deletes POStreamSpark since it was just handling the last record.
- Removed special code in POCollectedGroup to handle the last record.
- While I was here, renamed POOutputConsumerIterator to OutputConsumerIterator. The "PO" prefix
should only be usded for physical operators.


Diffs (updated)
-----

  src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/PhysicalOperator.java 3e75cadbca4d98bfd7aeb49da5f298acdab2bc4d 
  src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCollectedGroup.java 7f81c1dce3304011fe89896dd61d46977bcc8821 
  src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java 8654b1b6c6436ed6dd0f24358e5b633e6c963d54 
  src/org/apache/pig/backend/hadoop/executionengine/spark/converter/CollectedGroupConverter.java 99005b7a4eeec5f024512c4aefb396117f574619 
  src/org/apache/pig/backend/hadoop/executionengine/spark/converter/ForEachConverter.java e976cea44f83e83a3ce28fd03374ede7b52fdeb9 
  src/org/apache/pig/backend/hadoop/executionengine/spark/converter/LimitConverter.java 3074f5e6ea4ac15f38affcdaa3e23c3d66bdc104 
  src/org/apache/pig/backend/hadoop/executionengine/spark/converter/POOutputConsumerIterator.java 8b31197f879ed14086a7377635f2877fa3db0e9f 
  src/org/apache/pig/backend/hadoop/executionengine/spark/converter/StreamConverter.java 2c115fc3044c08cc8c96d27a08c9761456aea158 
  src/org/apache/pig/backend/hadoop/executionengine/spark/operator/POStreamSpark.java f42d8ff4653245c9c14ddbea949e66a7609a02a5 
  src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkCompiler.java c66fe7b154f18ccc3aa0f5dab31a5952947fdca2 
  test/org/apache/pig/test/TestCollectedGroup.java a958d335a880432145dc80fec0ef2054ce90a3bf 

Diff: https://reviews.apache.org/r/34003/diff/


Testing
-------

TestStreaming passes.
No new failures in TestCollectedGroup.


Thanks,

Mohit Sabharwal


Re: Review Request 34003: PIG-4542: OutputConsumerIterator should flush buffered records

Posted by Mohit Sabharwal <mo...@cloudera.com>.

> On May 12, 2015, 12:14 a.m., kelly zhang wrote:
> > src/org/apache/pig/backend/hadoop/executionengine/spark/converter/CollectedGroupConverter.java, line 108
> > <https://reviews.apache.org/r/34003/diff/1/?file=953940#file953940line108>
> >
> >     poCollectedGroup.getPlans().get(0) equals poCollectedGroup.parentPlan?

The only reason we are setting parentPlan in CollectGroupCoverter is so that the this code in POCollectedGroup does not throw a NullPointerException:

                if (this.parentPlan.endOfAllInput) {
                    return getStreamCloseResult();
                } else {
                    break;
                }

I think poCollectedGroup.getPlans().get(0) was randomly chosen, any physical plan with endOfAllInput set would have been ok. 

poCollectedGroip.getPlans().get(0) is in fact an expression plan, not the parent plan of POCollectedGroup.

I discovered that we aren't setting the parent plans for the physical operators inside SparkPlan to the relevant SparkOperator (parent). I've fixed that using:

				new PhyPlanSetter(sparkOperator).visit();
                
This way, we can simply set

poCollectedGroup.getParentPlan().endOfAllInput = true

in CollectedGroupConverter.


- Mohit


-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/34003/#review83325
-----------------------------------------------------------


On May 8, 2015, 10:26 p.m., Mohit Sabharwal wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/34003/
> -----------------------------------------------------------
> 
> (Updated May 8, 2015, 10:26 p.m.)
> 
> 
> Review request for pig, liyun zhang, Praveen Rachabattuni, and Xuefu Zhang.
> 
> 
> Bugs: PIG-4542
>     https://issues.apache.org/jira/browse/PIG-4542
> 
> 
> Repository: pig-git
> 
> 
> Description
> -------
> 
> PIG-4542: OutputConsumerIterator should flush buffered records
> 
> Unnecessary use of RDD.count() was bugging me. This patch addresses that.
> 
> Certain operators may buffer the output. We need to flush the last set of records from such operators, 
> when we encounter the last input record, before calling getNextTuple() for the last time.
> 
> Currently, to flush the last set of records, we compute RDD.count() and compare the count with a 
> running counter to determine if we have reached the last record. This is an unnecessary and inefficient.
> 
> This patch:
> - Gets rid of the use of RDD.count() in CollectedGroupConverter and LimitConverter.
> - Adds an abstract method endInput() which is executed before we call getNextTuple() for the last time.
> - Deletes POStreamSpark since it was just handling the last record.
> - Removed special code in POCollectedGroup to handle the last record.
> - While I was here, renamed POOutputConsumerIterator to OutputConsumerIterator. The "PO" prefix
> should only be usded for physical operators.
> 
> 
> Diffs
> -----
> 
>   src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCollectedGroup.java 7f81c1dce3304011fe89896dd61d46977bcc8821 
>   src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java 8654b1b6c6436ed6dd0f24358e5b633e6c963d54 
>   src/org/apache/pig/backend/hadoop/executionengine/spark/converter/CollectedGroupConverter.java 99005b7a4eeec5f024512c4aefb396117f574619 
>   src/org/apache/pig/backend/hadoop/executionengine/spark/converter/ForEachConverter.java e976cea44f83e83a3ce28fd03374ede7b52fdeb9 
>   src/org/apache/pig/backend/hadoop/executionengine/spark/converter/LimitConverter.java 3074f5e6ea4ac15f38affcdaa3e23c3d66bdc104 
>   src/org/apache/pig/backend/hadoop/executionengine/spark/converter/POOutputConsumerIterator.java 8b31197f879ed14086a7377635f2877fa3db0e9f 
>   src/org/apache/pig/backend/hadoop/executionengine/spark/converter/StreamConverter.java 2c115fc3044c08cc8c96d27a08c9761456aea158 
>   src/org/apache/pig/backend/hadoop/executionengine/spark/operator/POStreamSpark.java f42d8ff4653245c9c14ddbea949e66a7609a02a5 
>   src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkCompiler.java c66fe7b154f18ccc3aa0f5dab31a5952947fdca2 
>   test/org/apache/pig/test/TestCollectedGroup.java a958d335a880432145dc80fec0ef2054ce90a3bf 
> 
> Diff: https://reviews.apache.org/r/34003/diff/
> 
> 
> Testing
> -------
> 
> TestStreaming passes.
> No new failures in TestCollectedGroup.
> 
> 
> Thanks,
> 
> Mohit Sabharwal
> 
>


Re: Review Request 34003: PIG-4542: OutputConsumerIterator should flush buffered records

Posted by kelly zhang <li...@intel.com>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/34003/#review83325
-----------------------------------------------------------



src/org/apache/pig/backend/hadoop/executionengine/spark/converter/CollectedGroupConverter.java
<https://reviews.apache.org/r/34003/#comment134271>

    poCollectedGroup.getPlans().get(0) equals poCollectedGroup.parentPlan?


- kelly zhang


On May 8, 2015, 10:26 p.m., Mohit Sabharwal wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/34003/
> -----------------------------------------------------------
> 
> (Updated May 8, 2015, 10:26 p.m.)
> 
> 
> Review request for pig, liyun zhang, Praveen Rachabattuni, and Xuefu Zhang.
> 
> 
> Bugs: PIG-4542
>     https://issues.apache.org/jira/browse/PIG-4542
> 
> 
> Repository: pig-git
> 
> 
> Description
> -------
> 
> PIG-4542: OutputConsumerIterator should flush buffered records
> 
> Unnecessary use of RDD.count() was bugging me. This patch addresses that.
> 
> Certain operators may buffer the output. We need to flush the last set of records from such operators, 
> when we encounter the last input record, before calling getNextTuple() for the last time.
> 
> Currently, to flush the last set of records, we compute RDD.count() and compare the count with a 
> running counter to determine if we have reached the last record. This is an unnecessary and inefficient.
> 
> This patch:
> - Gets rid of the use of RDD.count() in CollectedGroupConverter and LimitConverter.
> - Adds an abstract method endInput() which is executed before we call getNextTuple() for the last time.
> - Deletes POStreamSpark since it was just handling the last record.
> - Removed special code in POCollectedGroup to handle the last record.
> - While I was here, renamed POOutputConsumerIterator to OutputConsumerIterator. The "PO" prefix
> should only be usded for physical operators.
> 
> 
> Diffs
> -----
> 
>   src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCollectedGroup.java 7f81c1dce3304011fe89896dd61d46977bcc8821 
>   src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java 8654b1b6c6436ed6dd0f24358e5b633e6c963d54 
>   src/org/apache/pig/backend/hadoop/executionengine/spark/converter/CollectedGroupConverter.java 99005b7a4eeec5f024512c4aefb396117f574619 
>   src/org/apache/pig/backend/hadoop/executionengine/spark/converter/ForEachConverter.java e976cea44f83e83a3ce28fd03374ede7b52fdeb9 
>   src/org/apache/pig/backend/hadoop/executionengine/spark/converter/LimitConverter.java 3074f5e6ea4ac15f38affcdaa3e23c3d66bdc104 
>   src/org/apache/pig/backend/hadoop/executionengine/spark/converter/POOutputConsumerIterator.java 8b31197f879ed14086a7377635f2877fa3db0e9f 
>   src/org/apache/pig/backend/hadoop/executionengine/spark/converter/StreamConverter.java 2c115fc3044c08cc8c96d27a08c9761456aea158 
>   src/org/apache/pig/backend/hadoop/executionengine/spark/operator/POStreamSpark.java f42d8ff4653245c9c14ddbea949e66a7609a02a5 
>   src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkCompiler.java c66fe7b154f18ccc3aa0f5dab31a5952947fdca2 
>   test/org/apache/pig/test/TestCollectedGroup.java a958d335a880432145dc80fec0ef2054ce90a3bf 
> 
> Diff: https://reviews.apache.org/r/34003/diff/
> 
> 
> Testing
> -------
> 
> TestStreaming passes.
> No new failures in TestCollectedGroup.
> 
> 
> Thanks,
> 
> Mohit Sabharwal
> 
>