You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@pig.apache.org by "Brian Johnson (JIRA)" <ji...@apache.org> on 2014/09/11 22:28:33 UTC
[jira] [Updated] (PIG-4166) Collected group drops last record when
combined with merge join
[ https://issues.apache.org/jira/browse/PIG-4166?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Brian Johnson updated PIG-4166:
-------------------------------
Description:
If the final two keys in each relation join, they will never make it to the final output. The reason is that POMergeJoin does a read-ahead and POCollectedGroup doesn't call processInput when this.parentPlan.endOfAllInput == true. This prevents the final join from being output because POMergeJoin never sees endOfAllInput == true.
{code}
diff --git a/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCollectedGroup.java b/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCollectedGroup.java
index c355d1d..8fd44fa 100644
--- a/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCollectedGroup.java
+++ b/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCollectedGroup.java
@@ -127,28 +127,30 @@ public class POCollectedGroup extends PhysicalOperator {
@Override
public Result getNextTuple() throws ExecException {
- // Since the output is buffered, we need to flush the last
- // set of records when the close method is called by mapper.
- if (this.parentPlan.endOfAllInput) {
- if (outputBag != null) {
- Tuple tup = mTupleFactory.newTuple(2);
- tup.set(0, prevKey);
- tup.set(1, outputBag);
- outputBag = null;
- return new Result(POStatus.STATUS_OK, tup);
- }
-
- return new Result(POStatus.STATUS_EOP, null);
- }
+
Result inp = null;
Result res = null;
while (true) {
inp = processInput();
+
if (inp.returnStatus == POStatus.STATUS_EOP ||
inp.returnStatus == POStatus.STATUS_ERR) {
- break;
+ // Since the output is buffered, we need to flush the last
+ // set of records when the close method is called by mapper.
+ if (this.parentPlan.endOfAllInput) {
+ if (outputBag != null) {
+ Tuple tup = mTupleFactory.newTuple(2);
+ tup.set(0, prevKey);
+ tup.set(1, outputBag);
+ outputBag = null;
+ return new Result(POStatus.STATUS_OK, tup);
+ }
+
+ return new Result(POStatus.STATUS_EOP, null);
+ } else
+ break;
}
if (inp.returnStatus == POStatus.STATUS_NULL) {
{code}
was:
If the final two keys in each relation join, they will never make it to the final output. The reason is that POMergeJoin does a read-ahead and POCollectedGroup doesn't call processInput when this.parentPlan.endOfAllInput == true. This prevents the final join from being output because POMergeJoin never sees endOfAllInput == true.
diff --git a/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCollectedGroup.java b/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCollectedGroup.java
index c355d1d..8fd44fa 100644
--- a/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCollectedGroup.java
+++ b/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCollectedGroup.java
@@ -127,28 +127,30 @@ public class POCollectedGroup extends PhysicalOperator {
@Override
public Result getNextTuple() throws ExecException {
- // Since the output is buffered, we need to flush the last
- // set of records when the close method is called by mapper.
- if (this.parentPlan.endOfAllInput) {
- if (outputBag != null) {
- Tuple tup = mTupleFactory.newTuple(2);
- tup.set(0, prevKey);
- tup.set(1, outputBag);
- outputBag = null;
- return new Result(POStatus.STATUS_OK, tup);
- }
-
- return new Result(POStatus.STATUS_EOP, null);
- }
+
Result inp = null;
Result res = null;
while (true) {
inp = processInput();
+
if (inp.returnStatus == POStatus.STATUS_EOP ||
inp.returnStatus == POStatus.STATUS_ERR) {
- break;
+ // Since the output is buffered, we need to flush the last
+ // set of records when the close method is called by mapper.
+ if (this.parentPlan.endOfAllInput) {
+ if (outputBag != null) {
+ Tuple tup = mTupleFactory.newTuple(2);
+ tup.set(0, prevKey);
+ tup.set(1, outputBag);
+ outputBag = null;
+ return new Result(POStatus.STATUS_OK, tup);
+ }
+
+ return new Result(POStatus.STATUS_EOP, null);
+ } else
+ break;
}
if (inp.returnStatus == POStatus.STATUS_NULL) {
> Collected group drops last record when combined with merge join
> ---------------------------------------------------------------
>
> Key: PIG-4166
> URL: https://issues.apache.org/jira/browse/PIG-4166
> Project: Pig
> Issue Type: Bug
> Affects Versions: 0.12.0
> Reporter: Brian Johnson
>
> If the final two keys in each relation join, they will never make it to the final output. The reason is that POMergeJoin does a read-ahead and POCollectedGroup doesn't call processInput when this.parentPlan.endOfAllInput == true. This prevents the final join from being output because POMergeJoin never sees endOfAllInput == true.
> {code}
> diff --git a/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCollectedGroup.java b/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCollectedGroup.java
> index c355d1d..8fd44fa 100644
> --- a/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCollectedGroup.java
> +++ b/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCollectedGroup.java
> @@ -127,28 +127,30 @@ public class POCollectedGroup extends PhysicalOperator {
> @Override
> public Result getNextTuple() throws ExecException {
>
> - // Since the output is buffered, we need to flush the last
> - // set of records when the close method is called by mapper.
> - if (this.parentPlan.endOfAllInput) {
> - if (outputBag != null) {
> - Tuple tup = mTupleFactory.newTuple(2);
> - tup.set(0, prevKey);
> - tup.set(1, outputBag);
> - outputBag = null;
> - return new Result(POStatus.STATUS_OK, tup);
> - }
> -
> - return new Result(POStatus.STATUS_EOP, null);
> - }
> +
>
> Result inp = null;
> Result res = null;
>
> while (true) {
> inp = processInput();
> +
> if (inp.returnStatus == POStatus.STATUS_EOP ||
> inp.returnStatus == POStatus.STATUS_ERR) {
> - break;
> + // Since the output is buffered, we need to flush the last
> + // set of records when the close method is called by mapper.
> + if (this.parentPlan.endOfAllInput) {
> + if (outputBag != null) {
> + Tuple tup = mTupleFactory.newTuple(2);
> + tup.set(0, prevKey);
> + tup.set(1, outputBag);
> + outputBag = null;
> + return new Result(POStatus.STATUS_OK, tup);
> + }
> +
> + return new Result(POStatus.STATUS_EOP, null);
> + } else
> + break;
> }
>
> if (inp.returnStatus == POStatus.STATUS_NULL) {
> {code}
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)