You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@pig.apache.org by "Kai Londenberg (JIRA)" <ji...@apache.org> on 2013/02/22 14:58:13 UTC
[jira] [Updated] (PIG-3212) Race Conditions in POSort and
(Internal)SortedBag during Proactive Spill.
[ https://issues.apache.org/jira/browse/PIG-3212?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Kai Londenberg updated PIG-3212:
--------------------------------
Description:
The following bug exists in the latest release of Pig 0.11.0
While running some large jobs involving groups and sorts like these:
{code}
events_by_user = GROUP events BY user_id;
sorted_events_by_user = FOREACH events_by_user {
A = ORDER events BY ts, split_idx, line_num;
GENERATE group, A;
}
{code}
I got a pretty strange behaviour: While this worked on small datasets, if I ran it on large datasets, the results were sometimes not sorted perfectly.
So after a long debugging session, I tracked it down to at least one race condition:
The following partial stack trace shows how a proactive spill gets triggered on an InternalSortedBag. A spill in turn triggers a sort of that InternalSortedBag.
{code}
at org.apache.pig.data.SortedSpillBag.proactive_spill(SortedSpillBag.java:83)
at org.apache.pig.data.InternalSortedBag.spill(InternalSortedBag.java:455)
at org.apache.pig.impl.util.SpillableMemoryManager.handleNotification(SpillableMemoryManager.java:243)
at sun.management.NotificationEmitterSupport.sendNotification(NotificationEmitterSupport.java:138)
at sun.management.MemoryImpl.createNotification(MemoryImpl.java:171)
at sun.management.MemoryPoolImpl$PoolSensor.triggerAction(MemoryPoolImpl.java:272)
at sun.management.Sensor.trigger(Sensor.java:120)
{code}
At the same time, the same InternalSortedBag might be sorted or accessed within a POSort Operation. For example using the following Code path (line numbers might be off, I had to add debug statements to diagnose this)
{code}
at org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSort.getNext(POSort.java:346)
at org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator.getNext(PhysicalOperator.java:492)
at org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POProject.processInputBag(POProject.java:582)
at org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.PORelationToExprProject.getNext(PORelationToExprProject.java:107)
at org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator.getNext(PhysicalOperator.java:394)
at org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POForEach.processPlan(POForEach.java:372)
at org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POForEach.getNext(POForEach.java:297)
at org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator.processInput(PhysicalOperator.java:368)
at org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSplit.getNext(POSplit.java:214)
at org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigGenericMapReduce$Reduce.runPipeline(PigGenericMapReduce.java:465)
at org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigGenericMapReduce$Reduce.processOnePackageOutput(PigGenericMapReduce.java:433)
at org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigGenericMapReduce$Reduce.reduce(PigGenericMapReduce.java:413)
at org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigGenericMapReduce$Reduce.reduce(PigGenericMapReduce.java:257)
at org.apache.hadoop.mapreduce.Reducer.run(Reducer.java:176)
at org.apache.hadoop.mapred.ReduceTask.runNewReducer(ReduceTask.java:566)
at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:408)
at org.apache.hadoop.mapred.Child.main(Child.java:170)
{code}
The key here is: Both operations try to compare and modify elements of the SortedBag simultaneously. This leads to all kinds of problems, most notably incorrectly sorted data.
POSort.SortComparator that's passed as a Comparison function to (Internal)SortedBag is not thread safe, since it works by attaching single input tuples to PhysicalOperator's - these Operators in turn are part of the POSort.sortPlans and are re-used among each thread accessing the (Internal)SortedBag.
was:
The following bug exists in the latest release of Pig 0.11.0
While running some large jobs involving groups and sorts like these:
{code}
events_by_user = GROUP events BY user_id;
sorted_events_by_user = FOREACH events_by_user {
A = ORDER events BY ts, split_idx, line_num;
GENERATE group, A;
}
{code}
I got a pretty strange behaviour: While this worked on small datasets, if I ran it on large datasets, the results were sometimes not sorted perfectly.
So after a long debugging session, I tracked it down to at least one race condition:
The following partial stack trace shows how a proactive spill gets triggered on an InternalSortedBag. A spill in turn triggers a sort of that InternalSortedBag.
{code}
at org.apache.pig.data.SortedSpillBag.proactive_spill(SortedSpillBag.java:83)
at org.apache.pig.data.InternalSortedBag.spill(InternalSortedBag.java:455)
at org.apache.pig.impl.util.SpillableMemoryManager.handleNotification(SpillableMemoryManager.java:243)
at sun.management.NotificationEmitterSupport.sendNotification(NotificationEmitterSupport.java:138)
at sun.management.MemoryImpl.createNotification(MemoryImpl.java:171)
at sun.management.MemoryPoolImpl$PoolSensor.triggerAction(MemoryPoolImpl.java:272)
at sun.management.Sensor.trigger(Sensor.java:120)
{code}
At the same time, the same InternalSortedBag might be sorted or accessed within a POSort Operation. For example using the following Code path (line numbers might be off, I had to add debug statements to diagnose this)
{code}
at org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSort.getNext(POSort.java:346)
at org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator.getNext(PhysicalOperator.java:492)
at org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POProject.processInputBag(POProject.java:582)
at org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.PORelationToExprProject.getNext(PORelationToExprProject.java:107)
at org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator.getNext(PhysicalOperator.java:394)
at org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POForEach.processPlan(POForEach.java:372)
at org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POForEach.getNext(POForEach.java:297)
at org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator.processInput(PhysicalOperator.java:368)
at org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSplit.getNext(POSplit.java:214)
at org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigGenericMapReduce$Reduce.runPipeline(PigGenericMapReduce.java:465)
at org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigGenericMapReduce$Reduce.processOnePackageOutput(PigGenericMapReduce.java:433)
at org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigGenericMapReduce$Reduce.reduce(PigGenericMapReduce.java:413)
at org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigGenericMapReduce$Reduce.reduce(PigGenericMapReduce.java:257)
at org.apache.hadoop.mapreduce.Reducer.run(Reducer.java:176)
at org.apache.hadoop.mapred.ReduceTask.runNewReducer(ReduceTask.java:566)
at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:408)
at org.apache.hadoop.mapred.Child.main(Child.java:170)
{code}
The key here is: Both operations try to compare and modify elements of the SortedBag simultaneously. This leads to all kinds of problems, most notably incorrectly sorted data.
POSort.SortComparator that's passed as a Comparison function to (Internal)SortedBag is not thread safe, since it works by attaching single input tuples to PhysicalOperator's - these Operators in turn are part of the POSort.sortPlans and are re-used among each thread accessing the (Internal)SortedBag.
Affects Version/s: 0.11
> Race Conditions in POSort and (Internal)SortedBag during Proactive Spill.
> -------------------------------------------------------------------------
>
> Key: PIG-3212
> URL: https://issues.apache.org/jira/browse/PIG-3212
> Project: Pig
> Issue Type: Bug
> Affects Versions: 0.11
> Reporter: Kai Londenberg
> Priority: Critical
>
> The following bug exists in the latest release of Pig 0.11.0
> While running some large jobs involving groups and sorts like these:
> {code}
> events_by_user = GROUP events BY user_id;
> sorted_events_by_user = FOREACH events_by_user {
> A = ORDER events BY ts, split_idx, line_num;
> GENERATE group, A;
> }
> {code}
> I got a pretty strange behaviour: While this worked on small datasets, if I ran it on large datasets, the results were sometimes not sorted perfectly.
> So after a long debugging session, I tracked it down to at least one race condition:
> The following partial stack trace shows how a proactive spill gets triggered on an InternalSortedBag. A spill in turn triggers a sort of that InternalSortedBag.
> {code}
> at org.apache.pig.data.SortedSpillBag.proactive_spill(SortedSpillBag.java:83)
> at org.apache.pig.data.InternalSortedBag.spill(InternalSortedBag.java:455)
> at org.apache.pig.impl.util.SpillableMemoryManager.handleNotification(SpillableMemoryManager.java:243)
> at sun.management.NotificationEmitterSupport.sendNotification(NotificationEmitterSupport.java:138)
> at sun.management.MemoryImpl.createNotification(MemoryImpl.java:171)
> at sun.management.MemoryPoolImpl$PoolSensor.triggerAction(MemoryPoolImpl.java:272)
> at sun.management.Sensor.trigger(Sensor.java:120)
> {code}
> At the same time, the same InternalSortedBag might be sorted or accessed within a POSort Operation. For example using the following Code path (line numbers might be off, I had to add debug statements to diagnose this)
> {code}
> at org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSort.getNext(POSort.java:346)
> at org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator.getNext(PhysicalOperator.java:492)
> at org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POProject.processInputBag(POProject.java:582)
> at org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.PORelationToExprProject.getNext(PORelationToExprProject.java:107)
> at org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator.getNext(PhysicalOperator.java:394)
> at org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POForEach.processPlan(POForEach.java:372)
> at org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POForEach.getNext(POForEach.java:297)
> at org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator.processInput(PhysicalOperator.java:368)
> at org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSplit.getNext(POSplit.java:214)
> at org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigGenericMapReduce$Reduce.runPipeline(PigGenericMapReduce.java:465)
> at org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigGenericMapReduce$Reduce.processOnePackageOutput(PigGenericMapReduce.java:433)
> at org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigGenericMapReduce$Reduce.reduce(PigGenericMapReduce.java:413)
> at org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigGenericMapReduce$Reduce.reduce(PigGenericMapReduce.java:257)
> at org.apache.hadoop.mapreduce.Reducer.run(Reducer.java:176)
> at org.apache.hadoop.mapred.ReduceTask.runNewReducer(ReduceTask.java:566)
> at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:408)
> at org.apache.hadoop.mapred.Child.main(Child.java:170)
> {code}
> The key here is: Both operations try to compare and modify elements of the SortedBag simultaneously. This leads to all kinds of problems, most notably incorrectly sorted data.
> POSort.SortComparator that's passed as a Comparison function to (Internal)SortedBag is not thread safe, since it works by attaching single input tuples to PhysicalOperator's - these Operators in turn are part of the POSort.sortPlans and are re-used among each thread accessing the (Internal)SortedBag.
>
--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira