You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@pig.apache.org by "Rohini Palaniswamy (JIRA)" <ji...@apache.org> on 2019/01/22 19:04:00 UTC

[jira] [Created] (PIG-5378) Optimize DISTINCT COUNT inside foreach

Rohini Palaniswamy created PIG-5378:
---------------------------------------

             Summary: Optimize DISTINCT COUNT inside foreach
                 Key: PIG-5378
                 URL: https://issues.apache.org/jira/browse/PIG-5378
             Project: Pig
          Issue Type: Improvement
            Reporter: Rohini Palaniswamy


When there is DISTINCT COUNT, the combiner is usually applied. In too many of our scripts, have seen that the DISTINCT bag grows to 10s of thousands or millions of items making the hash aggregation really worse. Even if hash aggregation is turned off, the combiner will still aggregate and in the reducer there is way too much spill because of big bag.

This can be avoided if we apply secondary sort with ordering and make it use POSortedDistinct. Just PODistinct is still not good enough as it will need to hold all the elements in a HashSet. POSortedDistinct requires no memory at all.

Two things to be done:
1) If we see a distinct count, turn it into a POSortedDistinct using SecondaryKeyOptimizer. Currently CombinerOptimizer runs first. We need to turn off applying combiner optimizer for distinct. Can make this configurable using pig.optimize.nested.distinct = true and keep it default in our clusters.
2) SecondaryKeyOptimizer is not converting it into POSortedDistinct in below case because of a POForEach in plan before PODistinct (https://github.com/apache/pig/blob/branch-0.17/src/org/apache/pig/backend/hadoop/executionengine/util/SecondaryKeyOptimizerUtil.java#L529-L533).

{code}
B = GROUP A BY f1;
C = FOREACH B {
        sorted = ORDER A by f2;
	unique = DISTINCT sorted.f2;
	GENERATE group, COUNT(unique) as cnt;
}
{code}

does not generate POSortedDistinct and has to be fixed. Worked around by doing

{code}
B = GROUP A BY f1;
C = FOREACH B {
        fields = A.f2;
        sorted = ORDER A by f2;
	unique = DISTINCT sorted;
	GENERATE group, COUNT(unique) as cnt;
}






--
This message was sent by Atlassian JIRA
(v7.6.3#76005)