You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@pig.apache.org by "Adam Szita (JIRA)" <ji...@apache.org> on 2017/03/31 14:00:47 UTC

[jira] [Commented] (PIG-5207) BugFix e2e tests fail on spark

    [ https://issues.apache.org/jira/browse/PIG-5207?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15950940#comment-15950940 ] 

Adam Szita commented on PIG-5207:
---------------------------------

This test case uses COR UDF to calculate correlation between 3 vars.
The cause of the issue is that when execution reaches the Final implementation of the COR UDF there are some junk bags in the input Tuple:
{code}
--input
  |--DefaultBag: {((943629.1899999954,19810.98000000007,476680.0,52620.35740000006,2.5723842E7)}
  |--DefaultBag: {((157499.53767599948,19810.98000000007,52620.35740000006,52620.35740000006,503441.4849212208)}
  |--InternalCachedBag: 
     |--0 = {BinSedesTuple@7900} "(157499.53767599948,19810.98000000007,52620.35740000006,52620.35740000006,503441.4849212208)"
     |--1 = {Long@7901} "10000"
     |--2 = {BinSedesTuple@7902} "(943629.1899999954,19810.98000000007,476680.0,52620.35740000006,2.5723842E7)"
     |--3 = {Long@7903} "10000"
     |--4 = {BinSedesTuple@7904} "(2509050.8495000014,52620.35740000006,476680.0,503441.4849212208,2.5723842E7)"
     |--5 = {Long@7905} "10000"
{code}

The real result to be consumed is at position 2, but of course since we expect 1 entry here the implementation queries for {{.get(0)}} and shortly after we will get a ClassCastException.
*This is because of a fault in the CombinerOptimizer of Spark, it doesn't remove all original input projections in POUserFunc.Final part* as seen [here|https://github.com/apache/pig/blob/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/optimizer/CombinerOptimizer.java#L299]: {{PhysicalOperator udfInput = pplan.getPredecessors(combineUdf).get(0);}} //only the first input (this is why we didn't see this issue when using for example the AVG UDF)
As seen in the plan:
{code}
Spark node scope-38
D: Store(file:/tmp/temp-1343607396/tmp944122059:org.apache.pig.impl.io.InterStorage) - scope-37
|
|---D: New For Each(false,true)[tuple] - scope-47 (postReduceFE)
    |   |
    |   Project[chararray][0] - scope-39
    |   |
    |   Project[bag][1] - scope-43
    |   |
    |   |---Project[bag][1] - scope-42
    |   
    |   POUserFunc(org.apache.pig.builtin.COR$Final)[bag] - scope-46
    |   |
    |   |---Project[bag][0] - scope-41   << residual unneeded projection, remains from the original UDF's input
    |   |   |
    |   |   |---Project[bag][1] - scope-40
    |   |
    |   |---Project[bag][2] - scope-45   << residual unneeded projection, remains from the original UDF's input
    |   |   |
    |   |   |---Project[bag][1] - scope-44
    |   |
    |   |---Project[bag][1] - scope-67   << actual subresult comes in here
    |
    |---C: Reduce By(false,false)[tuple] - scope-57  (cfe)
        |   |
        |   Project[chararray][0] - scope-58
{code}

After fixing this I saw that although now I get good results, the order of the results is off. The actual correlation values between var0-var1, etc.. are shifted with respect to MR's output.
This is due to another plan generation bug and it is actually observable in the plan above, that {{POUserFunc}} has Project {{0,2,1}} as its input (instead of {{0,1,2}}).

The root of this problem is the cloning of the ForEach operator [here|https://github.com/apache/pig/blob/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/optimizer/CombinerOptimizer.java#L141]. Cloning this will trigger cloning the associated PhysicalPlan instance and unfortunately [that method|https://github.com/apache/pig/blob/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhysicalPlan.java#L217] has a bug:

It doesn't keep the order of the {{List<PhysicalOperator>}} lists in the {{mToEdges}} because it is only considering {{mFromEdges}} to connect the cloned Operator instances.
Keeping the ordering cannot be achieved by looking either only at {{mToEdges}} or only at {{mFromEdges}} since both operate with lists.

As a fix to this I'm sorting these lists in the cloned plan according to what was in the original plan (by the use of the {{matches}} map that maps original and cloned ops.

This doesn't come up in MR mode because the ForEach op is not cloned there but rather modified in-place during combiner optimization. We could do the same in Spark too but I feel this should rather be fixed in the common code for future clone() convenience. 

[~kellyzly] can you review the spark parts in CombinerOptimizer.java please?
[~rohini] can you please review the common parts in PhysicalPlan.java?

Please find the fix in my patch: [^PIG-5207.0.patch]

> BugFix e2e tests fail on spark
> ------------------------------
>
>                 Key: PIG-5207
>                 URL: https://issues.apache.org/jira/browse/PIG-5207
>             Project: Pig
>          Issue Type: Sub-task
>          Components: spark
>            Reporter: Adam Szita
>            Assignee: Adam Szita
>             Fix For: spark-branch
>
>         Attachments: PIG-5207.0.patch
>
>
> Observed ClassCastException in BugFix 1 and 2 test cases. The exception is thrown from and UDF: COR.Final



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)