You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@pig.apache.org by "Adam Szita (JIRA)" <ji...@apache.org> on 2017/03/31 14:00:47 UTC
[jira] [Commented] (PIG-5207) BugFix e2e tests fail on spark
[ https://issues.apache.org/jira/browse/PIG-5207?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15950940#comment-15950940 ]
Adam Szita commented on PIG-5207:
---------------------------------
This test case uses COR UDF to calculate correlation between 3 vars.
The cause of the issue is that when execution reaches the Final implementation of the COR UDF there are some junk bags in the input Tuple:
{code}
--input
|--DefaultBag: {((943629.1899999954,19810.98000000007,476680.0,52620.35740000006,2.5723842E7)}
|--DefaultBag: {((157499.53767599948,19810.98000000007,52620.35740000006,52620.35740000006,503441.4849212208)}
|--InternalCachedBag:
|--0 = {BinSedesTuple@7900} "(157499.53767599948,19810.98000000007,52620.35740000006,52620.35740000006,503441.4849212208)"
|--1 = {Long@7901} "10000"
|--2 = {BinSedesTuple@7902} "(943629.1899999954,19810.98000000007,476680.0,52620.35740000006,2.5723842E7)"
|--3 = {Long@7903} "10000"
|--4 = {BinSedesTuple@7904} "(2509050.8495000014,52620.35740000006,476680.0,503441.4849212208,2.5723842E7)"
|--5 = {Long@7905} "10000"
{code}
The real result to be consumed is at position 2, but of course since we expect 1 entry here the implementation queries for {{.get(0)}} and shortly after we will get a ClassCastException.
*This is because of a fault in the CombinerOptimizer of Spark, it doesn't remove all original input projections in POUserFunc.Final part* as seen [here|https://github.com/apache/pig/blob/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/optimizer/CombinerOptimizer.java#L299]: {{PhysicalOperator udfInput = pplan.getPredecessors(combineUdf).get(0);}} //only the first input (this is why we didn't see this issue when using for example the AVG UDF)
As seen in the plan:
{code}
Spark node scope-38
D: Store(file:/tmp/temp-1343607396/tmp944122059:org.apache.pig.impl.io.InterStorage) - scope-37
|
|---D: New For Each(false,true)[tuple] - scope-47 (postReduceFE)
| |
| Project[chararray][0] - scope-39
| |
| Project[bag][1] - scope-43
| |
| |---Project[bag][1] - scope-42
|
| POUserFunc(org.apache.pig.builtin.COR$Final)[bag] - scope-46
| |
| |---Project[bag][0] - scope-41 << residual unneeded projection, remains from the original UDF's input
| | |
| | |---Project[bag][1] - scope-40
| |
| |---Project[bag][2] - scope-45 << residual unneeded projection, remains from the original UDF's input
| | |
| | |---Project[bag][1] - scope-44
| |
| |---Project[bag][1] - scope-67 << actual subresult comes in here
|
|---C: Reduce By(false,false)[tuple] - scope-57 (cfe)
| |
| Project[chararray][0] - scope-58
{code}
After fixing this I saw that although now I get good results, the order of the results is off. The actual correlation values between var0-var1, etc.. are shifted with respect to MR's output.
This is due to another plan generation bug and it is actually observable in the plan above, that {{POUserFunc}} has Project {{0,2,1}} as its input (instead of {{0,1,2}}).
The root of this problem is the cloning of the ForEach operator [here|https://github.com/apache/pig/blob/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/optimizer/CombinerOptimizer.java#L141]. Cloning this will trigger cloning the associated PhysicalPlan instance and unfortunately [that method|https://github.com/apache/pig/blob/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhysicalPlan.java#L217] has a bug:
It doesn't keep the order of the {{List<PhysicalOperator>}} lists in the {{mToEdges}} because it is only considering {{mFromEdges}} to connect the cloned Operator instances.
Keeping the ordering cannot be achieved by looking either only at {{mToEdges}} or only at {{mFromEdges}} since both operate with lists.
As a fix to this I'm sorting these lists in the cloned plan according to what was in the original plan (by the use of the {{matches}} map that maps original and cloned ops.
This doesn't come up in MR mode because the ForEach op is not cloned there but rather modified in-place during combiner optimization. We could do the same in Spark too but I feel this should rather be fixed in the common code for future clone() convenience.
[~kellyzly] can you review the spark parts in CombinerOptimizer.java please?
[~rohini] can you please review the common parts in PhysicalPlan.java?
Please find the fix in my patch: [^PIG-5207.0.patch]
> BugFix e2e tests fail on spark
> ------------------------------
>
> Key: PIG-5207
> URL: https://issues.apache.org/jira/browse/PIG-5207
> Project: Pig
> Issue Type: Sub-task
> Components: spark
> Reporter: Adam Szita
> Assignee: Adam Szita
> Fix For: spark-branch
>
> Attachments: PIG-5207.0.patch
>
>
> Observed ClassCastException in BugFix 1 and 2 test cases. The exception is thrown from and UDF: COR.Final
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)