You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@pig.apache.org by Rohini Palaniswamy <ro...@gmail.com> on 2015/05/20 23:59:08 UTC

Re: A problem about implicit POSPLIT found in mr mode

Liyun,
    I was out and could not reply soon. Please send questions like this to
dev@ so that other committers can also look into and answer.

  There is no bug here and behavior is as expected.

 C = group B by id;

D = foreach C {

E = order B by f desc;

F = E.f;

generate group, myudfs.AccumulativeSumBag(F);

};


There is no secondary key optimization applied in this mapreduce plan at
all.  Usually order by (POSort) above would usually be removed and replaced
with secondary key sort. But in this case, the output of group by is just
stored into HDFS as there is a split. Then on the map phase of the join, it
is loaded twice (once for D and once for G) and the inner foreach of those
processed and then joined in the reduce. since the UDF is executing in the
map it is not run as accumulator, but run as normal EvalFunc and exec() is
called.

 In this case it would have been better if the foreach statements where
executed in the reduce of the group by and two different outputs stored.
But the MultiQueryOptimizer is not applied if secondary key optimization is
possible.

MultiQueryOptimizer.java

if (successor.getUseSecondaryKey()) {

                log.debug("Splittee " + successor.getOperatorKey().getId()

                        + " uses secondary key, do not merge it");

                continue;

            }

In this case, both the splits have same secondary key as they both do order
B by f desc;  (which is not the case most of the time) and is possible to
apply MultiQueryOptimizer. But currently MultiQueryOptimizer does not have
that intelligence to check and merge into one plan if all of them have same
secondary key.

Regards,
Rohini


On Tue, May 19, 2015 at 1:53 AM, Zhang, Liyun <li...@intel.com> wrote:

>  Hi Rohini:
>
>    I found a problem when executing following script in mr mode:
>
>
>
> *testAccumulator.join.pig*
>
>
>
> REGISTER /home/zly/prj/oss/kellyzly/pig/bin/myudfs.jar;
>
> A = load './testAccumulator.txt' as (id:int,f);
>
> B = foreach A generate id, f, id as t;
>
> C = group B by id;
>
> D = foreach C {
>
> E = order B by f desc;
>
> F = E.f;
>
> generate group, myudfs.AccumulativeSumBag(F);
>
> };
>
> G = foreach C {
>
> E = order B by f desc;
>
> F = E.f;
>
> generate group, myudfs.AccumulativeSumBag(F);
>
> };
>
> H = join D by group, G by group;
>
> store H into 'testAccumulator.join.out';
>
> explain H
>
>
>
> cat myudfs/AccumulativeSumBag.java:
>
> package myudfs;
>
>
>
> import java.io.IOException;
>
> import java.util.Iterator;
>
> import org.apache.pig.EvalFunc;
>
> import org.apache.pig.Accumulator;
>
> import org.apache.pig.data.DataBag;
>
> import org.apache.pig.data.Tuple;
>
>
>
> /**
>
> * This class is for testing of accumulator udfs
>
> *
>
> */
>
> public class AccumulativeSumBag extends EvalFunc<String> implements
> Accumulator<String>
>
> {
>
>
>
>     StringBuffer sb;
>
>
>
>     public AccumulativeSumBag() {
>
>     }
>
>
>
>     public void accumulate(Tuple tuple) throws IOException {
>
>         DataBag databag = (DataBag)tuple.get(0);
>
>         if(databag == null)
>
>             return;
>
>
>
>         if (sb == null) {
>
>             sb = new StringBuffer();
>
>         }
>
>
>
>         Iterator<Tuple> iterator = databag.iterator();
>
>         while(iterator.hasNext()) {
>
>             Tuple t = iterator.next();
>
>             if (t.size()>1 && t.get(1) == null) {
>
>                 continue;
>
>             }
>
>
>
>             sb.append(t.toString());
>
>         }
>
>     }
>
>
>
>     public String getValue() {
>
>         if (sb != null && sb.length()>0) {
>
>             return sb.toString();
>
>         }
>
>         return null;
>
>     }
>
>
>
>     public void cleanup() {
>
>         sb = null;
>
>     }
>
>
>
>     public String exec(Tuple tuple) throws IOException {
>
>         throw new IOException("exec() should not be called");
>
>     }
>
> }
>
>
>
> the error message is:
>
> ava.lang.Exception: org.apache.pig.backend.executionengine.ExecException:
> ERROR 0: Exception while executing (Name: H: Local
> Rearrange[tuple]{int}(fals     e) - scope-117 Operator Key: scope-117):
> org.apache.pig.backend.executionengine.ExecException: ERROR 2078: Caught
> error from UDF: myudfs.AccumulativeSum     Bag [exec() should not be called]
>
> 5619         at
> org.apache.hadoop.mapred.LocalJobRunner$Job.runTasks(LocalJobRunner.java:462)
>
> 5620         at
> org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:522)
>
> 5621 Caused by: org.apache.pig.backend.executionengine.ExecException:
> ERROR 0: Exception while executing (Name: H: Local
> Rearrange[tuple]{int}(false) - scope     -117 Operator Key: scope-117):
> org.apache.pig.backend.executionengine.ExecException: ERROR 2078: Caught
> error from UDF: myudfs.AccumulativeSumBag [exec(     ) should not be called]
>
> 5622         at
> org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator.processInput(PhysicalOperator.java:316)
>
> 5623         at
> org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLocalRearrange.getNextTuple(POLocalRearrange.java:291)
>
> 5624         at
> org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POUnion.getNextTuple(POUnion.java:167)
>
> 5625         at
> org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigGenericMapBase.runPipeline(PigGenericMapBase.java:279)
>
> 5626         at
> org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigGenericMapBase.map(PigGenericMapBase.java:274)
>
> 5627         at
> org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigGenericMapBase.map(PigGenericMapBase.java:64)
>
> 5628         at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:145)
>
> 5629         at
> org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:764)
>
> 5630         at org.apache.hadoop.mapred.MapTask.run(MapTask.java:340)
>
> 5631         at
> org.apache.hadoop.mapred.LocalJobRunner$Job$MapTaskRunnable.run(LocalJobRunner.java:243)
>
> 5632         at
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
>
> 5633         at java.util.concurrent.FutureTask.run(FutureTask.java:262)
>
> 5634         at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>
> 5635         at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>
> 5636         at java.lang.Thread.run(Thread.java:744)
>
>
>
> Following is the physical plan and mr plan.
>
> #-----------------------------------------------
>
> # Physical Plan:
>
> #-----------------------------------------------
>
> H: Store(fakefile:org.apache.pig.builtin.PigStorage) - scope-57
>
> |
>
> |---H: New For Each(true,true)[tuple] - scope-56
>
>     |   |
>
>     |   Project[bag][1] - scope-54
>
>     |   |
>
>     |   Project[bag][2] - scope-55
>
>     |
>
>     |---H: Package(Packager)[tuple]{int} - scope-49
>
>         |
>
>         |---H: Global Rearrange[tuple] - scope-48
>
>             |
>
>             |---H: Local Rearrange[tuple]{int}(false) - scope-50
>
>             |   |   |
>
>             |   |   Project[int][0] - scope-51
>
>             |   |
>
>             |   |---D: New For Each(false,false)[bag] - scope-32
>
>             |       |   |
>
>             |       |   Project[int][0] - scope-22
>
>             |       |   |
>
>             |       |   POUserFunc(myudfs.AccumulativeSumBag)[chararray] -
> scope-25
>
>             |       |   |
>
>             |       |   |---RelationToExpressionProject[bag][*] - scope-24
>
>             |       |       |
>
>             |       |       |---F: New For Each(false)[bag] - scope-31
>
>             |       |           |   |
>
>             |       |           |   Project[bytearray][1] - scope-29
>
>             |       |           |
>
>             |       |           |---E: POSort[bag]() - scope-28
>
>             |       |               |   |
>
>             |       |               |   Project[bytearray][1] - scope-27
>
>             |       |               |
>
>             |       |               |---Project[bag][1] - scope-26
>
>             |       |
>
>             |       |---C: Filter[bag] - scope-20
>
>             |           |   |
>
>             |           |   Constant(true) - scope-21
>
>             |           |
>
>             |           |---C: Split - scope-19    // here an implicit
> Split is generated
>
>             |               |
>
>             |               |---C: Package(Packager)[tuple]{int} - scope-16
>
>             |                   |
>
>             |                   |---C: Global Rearrange[tuple] - scope-15
>
>             |                       |
>
>             |                       |---C: Local
> Rearrange[tuple]{int}(false) - scope-17
>
>             |                           |   |
>
>             |                           |   Project[int][0] - scope-18
>
>             |                           |
>
>             |                           |---B: New For
> Each(false,false,false)[bag] - scope-14
>
>             |                               |   |
>
>             |                               |   Project[int][0] - scope-7
>
>            |                               |   |
>
>             |                               |   Project[bytearray][1] -
> scope-9
>
>             |                               |   |
>
>             |                               |
> POUserFunc(org.apache.pig.impl.builtin.IdentityColumn)[int] - scope-12
>
>             |                               |   |
>
>             |                               |   |---Project[int][0] -
> scope-11
>
>             |                               |
>
>             |                               |---A: New For
> Each(false,false)[bag] - scope-6
>
>             |                                   |   |
>
>             |                                   |   Cast[int] - scope-2
>
>             |                                   |   |
>
>             |                                   |
> |---Project[bytearray][0] - scope-1
>
>             |                                   |   |
>
>             |                                   |   Project[bytearray][1]
> - scope-4
>
>             |                                   |
>
>             |                                   |---A: Load(hdfs://
> zly2.sh.intel.com:8020/user/root/testAccumulator.txt:org.apache.pig.builtin.PigStorage)
> - scope-0
>
>             |
>
>             |---H: Local Rearrange[tuple]{int}(false) - scope-52
>
>                 |   |
>
>                 |   Project[int][0] - scope-53
>
>                 |
>
>                 |---G: New For Each(false,false)[bag] - scope-45
>
>                     |   |
>
>                     |   Project[int][0] - scope-35
>
>                     |   |
>
>                     |   POUserFunc(myudfs.AccumulativeSumBag)[chararray] -
> scope-38
>
>                     |   |
>
>                     |   |---RelationToExpressionProject[bag][*] - scope-37
>
>                     |       |
>
>                     |       |---F: New For Each(false)[bag] - scope-44
>
>                     |           |   |
>
>                     |           |   Project[bytearray][1] - scope-42
>
>                     |           |
>
>                     |           |---E: POSort[bag]() - scope-41
>
>                     |               |   |
>
>                     |               |   Project[bytearray][1] - scope-40
>
>                     |               |
>
>                     |               |---Project[bag][1] - scope-39
>
>                     |
>
>                     |---C: Filter[bag] - scope-33
>
>                         |   |
>
>                         |   Constant(true) - scope-34
>
>                         |
>
>                         |---C: Split - scope-19
>
>                             |
>
>                             |---C: Package(Packager)[tuple]{int} - scope-16
>
>                                 |
>
>                                 |---C: Global Rearrange[tuple] - scope-15
>
>                                     |
>
>                                     |---C: Local
> Rearrange[tuple]{int}(false) - scope-17
>
>                                         |   |
>
>                                         |   Project[int][0] - scope-18
>
>                                         |
>
>                                         |---B: New For
> Each(false,false,false)[bag] - scope-14
>
>                                             |   |
>
>                                             |   Project[int][0] - scope-7
>
>                                             |   |
>
>                                             |   Project[bytearray][1] -
> scope-9
>
>                                             |   |
>
>                                             |
> POUserFunc(org.apache.pig.impl.builtin.IdentityColumn)[int] - scope-12
>
>                                             |   |
>
>                                             |   |---Project[int][0] -
> scope-11
>
>                                             |
>
>                                             |---A: New For
> Each(false,false)[bag] - scope-6
>
>                                                 |   |
>
>                                                 |   Cast[int] - scope-2
>
>                                                 |   |
>
>                                                 |
> |---Project[bytearray][0] - scope-1
>
>                                                 |   |
>
>                                                 |   Project[bytearray][1]
> - scope-4
>
>                                                 |
>
>                                                 |---A: Load(hdfs://
> zly2.sh.intel.com:8020/user/root/testAccumulator.txt:org.apache.pig.builtin.PigStorage)
> - scope-0
>
>
>
> #--------------------------------------------------
>
> # Map Reduce Plan
>
> #--------------------------------------------------
>
> MapReduce node scope-58
>
> Map Plan
>
> C: Local Rearrange[tuple]{int}(false) - scope-17
>
> |   |
>
> |   Project[int][0] - scope-18
>
> |
>
> |---B: New For Each(false,false,false)[bag] - scope-14
>
>     |   |
>
>     |   Project[int][0] - scope-7
>
>     |   |
>
>     |   Project[bytearray][1] - scope-9
>
>     |   |
>
>     |   POUserFunc(org.apache.pig.impl.builtin.IdentityColumn)[int] -
> scope-12
>
>     |   |
>
>     |   |---Project[int][0] - scope-11
>
>     |
>
>     |---A: New For Each(false,false)[bag] - scope-6
>
>         |   |
>
>         |   Cast[int] - scope-2
>
>         |   |
>
>         |   |---Project[bytearray][0] - scope-1
>
>         |   |
>
>         |   Project[bytearray][1] - scope-4
>
>         |
>
>         |---A: Load(hdfs://
> zly2.sh.intel.com:8020/user/root/testAccumulator.txt:org.apache.pig.builtin.PigStorage)
> - scope-0--------
>
> Reduce Plan
>
> Store(hdfs://
> zly2.sh.intel.com:8020/tmp/temp-281610513/tmp1960264662:org.apache.pig.impl.io.InterStorage)
> - scope-59
>
> |
>
> |---C: Package(Packager)[tuple]{int} - scope-16--------
>
> Global sort: false
>
> ----------------
>
>
>
> MapReduce node scope-64
>
> Map Plan
>
> Union[tuple] - scope-65
>
> |
>
> |---H: Local Rearrange[tuple]{int}(false) - scope-50
>
> |   |   |
>
> |   |   Project[int][0] - scope-51
>
> |   |
>
> |   |---D: New For Each(false,false)[bag] - scope-32
>
> |       |   |
>
> |       |   Project[int][0] - scope-22
>
> |       |   |
>
> |       |   POUserFunc(myudfs.AccumulativeSumBag)[chararray] - scope-25
>
> |       |   |
>
> |       |   |---RelationToExpressionProject[bag][*] - scope-24
>
> |       |       |
>
> |       |       |---F: New For Each(false)[bag] - scope-31
>
> |       |           |   |
>
> |       |           |   Project[bytearray][1] - scope-29
>
> |       |           |
>
> |       |           |---E: POSort[bag]() - scope-28   // POSort should be
> deleted in  SecondaryKeyOptimizerUtil.java#applySecondaryKeySort . but it
> is not deleted because the POSplit(scope-19 in physical plan ) makes group
> and foreach in different operators
>
> |       |               |   |
>
> |       |               |   Project[bytearray][1] - scope-27
>
> |       |               |
>
> |       |               |---Project[bag][1] - scope-26
>
> |       |
>
> |       |---Load(hdfs://
> zly2.sh.intel.com:8020/tmp/temp-281610513/tmp1960264662:org.apache.pig.impl.io.InterStorage)
> - scope-60
>
> |
>
> |---H: Local Rearrange[tuple]{int}(false) - scope-52
>
>     |   |
>
>     |   Project[int][0] - scope-53
>
>     |
>
>     |---G: New For Each(false,false)[bag] - scope-45
>
>         |   |
>
>         |   Project[int][0] - scope-35
>
>         |   |
>
>         |   POUserFunc(myudfs.AccumulativeSumBag)[chararray] - scope-38
>
>         |   |
>
>         |   |---RelationToExpressionProject[bag][*] - scope-37
>
>         |       |
>
>         |       |---F: New For Each(false)[bag] - scope-44
>
>         |           |   |
>
>         |           |   Project[bytearray][1] - scope-42
>
>         |           |
>
>         |           |---E: POSort[bag]() - scope-41
>
>         |               |   |
>
>         |               |   Project[bytearray][1] - scope-40
>
>         |               |
>
>         |               |---Project[bag][1] - scope-39
>
>         |
>
>         |---Load(hdfs://
> zly2.sh.intel.com:8020/tmp/temp-281610513/tmp1960264662:org.apache.pig.impl.io.InterStorage)
> - scope-62--------
>
> Reduce Plan
>
> H: Store(fakefile:org.apache.pig.builtin.PigStorage) - scope-57
>
> |
>
> |---H: Package(JoinPackager(true,true))[tuple]{int} - scope-49--------
>
> Global sort: false
>
> ----------------
>
>
>
> The reason why it fails is POSort is not deleted when secondary key sort
> is enables(POSort should be deleted in
> SecondaryKeyOptimizerUtil.java#applySecondaryKeySort). If POSplit is not
> deleted, the value “foundUDF “ in AccumulatorOptimizerUtil#addAccumulator
> is false and po_foreach.setAccumulative will not be called. This causes “Caught
> error from UDF: myudfs.AccumulativeSum     Bag [exec() should not be called]”.
> Because an implicit POSplit is generated, when poSplit is encounted in mr
> plan, a new mr Operator is generated(POSplit(scope-19) splits the physical
> plan into MapReduceNode scope-58  and MapReduceNode scope-64) . So
> SecondaryKeyOptimizerUtil.java#applySecondaryKeySort does not work.
>
>
>
> AccumulatorOptimizerUtil#addAccumulator
>
>   public static void addAccumulator(PhysicalPlan plan) {
>
>         // See if this is a map-reduce job
>
>         List<PhysicalOperator> pos = plan.getRoots();
>
>         if (pos == null || pos.size() == 0) {
>
>             return;
>
>         }
>
>
>
>        // See if this is a POPackage
>
>         PhysicalOperator po_package = pos.get(0);
>
>         if (!po_package.getClass().equals(POPackage.class)) {
>
>             return;
>
>         }
>
>
>
>         Packager pkgr = ((POPackage) po_package).getPkgr();
>
>         // Check that this is a standard package, not a subclass
>
>         if (!pkgr.getClass().equals(Packager.class)) {
>
>             return;
>
>         }
>
>
>
>         // if POPackage is for distinct, just return
>
>         if (pkgr.isDistinct()) {
>
>             return;
>
>         }
>
>
>
>         // if any input to POPackage is inner, just return
>
>         boolean[] isInner = pkgr.getInner();
>
>         for (boolean b: isInner) {
>
>             if (b) {
>
>                 return;
>
>             }
>
>         }
>
>
>
>         List<PhysicalOperator> l = plan.getSuccessors(po_package);
>
>         // there should be only one POForEach
>
>         if (l == null || l.size() == 0 || l.size() > 1) {
>
>             return;
>
>         }
>
>
>
>         PhysicalOperator po_foreach = l.get(0);
>
>         if (!(po_foreach instanceof POForEach)) {
>
>             return;
>
>         }
>
>
>
>         boolean foundUDF = false;
>
>         List<PhysicalPlan> list = ((POForEach)po_foreach).getInputPlans();
>
>         for (PhysicalPlan p: list) {
>
>             PhysicalOperator po = p.getLeaves().get(0);
>
>
>
>             // only expression operators are allowed
>
>             if (!(po instanceof ExpressionOperator)) {
>
>                 return;
>
>             }
>
>
>
>             if (((ExpressionOperator)po).containUDF()) {
>
>                 foundUDF = true;
>
>             }
>
>
>
>             if (!check(po)) {
>
>                 return;
>
>             }
>
>         }
>
>
>
>         if (foundUDF) {
>
>             // if all tests are passed, reducer can run in accumulative
> mode
>
>             LOG.info("Reducer is to run in accumulative mode.");
>
>             po_package.setAccumulative();
>
>             po_foreach.setAccumulative();
>
>         }
>
>     }
>
>
>
>
>
> My question: is it a bug  or pig does not deal with this kind of script
> case when implicit posplit is generated when secondary key optimization
> enables ?
>
>
>
>
>
>
>
> Kelly Zhang/Zhang,Liyun
>
> Best Regards
>
>
>

RE: A problem about implicit POSPLIT found in mr mode

Posted by "Zhang, Liyun" <li...@intel.com>.
Hi all:
Any one can help see this issue?  If it is a bug, I will file  a jira for it . If pig code does not deal with this kind of script, please tell me.




Kelly Zhang/Zhang,Liyun
Best Regards


From: Rohini Palaniswamy [mailto:rohini.aditya@gmail.com]
Sent: Thursday, May 21, 2015 5:59 AM
To: Zhang, Liyun; dev@pig.apache.org
Cc: Mohit Sabharwal; praveen@sigmoidanalytics.com; Xuefu Zhang
Subject: Re: A problem about implicit POSPLIT found in mr mode

Liyun,
    I was out and could not reply soon. Please send questions like this to dev@ so that other committers can also look into and answer.

  There is no bug here and behavior is as expected.

 C = group B by id;

D = foreach C {

E = order B by f desc;

F = E.f;

generate group, myudfs.AccumulativeSumBag(F);

};

There is no secondary key optimization applied in this mapreduce plan at all.  Usually order by (POSort) above would usually be removed and replaced with secondary key sort. But in this case, the output of group by is just stored into HDFS as there is a split. Then on the map phase of the join, it is loaded twice (once for D and once for G) and the inner foreach of those processed and then joined in the reduce. since the UDF is executing in the map it is not run as accumulator, but run as normal EvalFunc and exec() is called.

 In this case it would have been better if the foreach statements where executed in the reduce of the group by and two different outputs stored.  But the MultiQueryOptimizer is not applied if secondary key optimization is possible.

MultiQueryOptimizer.java
if (successor.getUseSecondaryKey()) {
                log.debug("Splittee " + successor.getOperatorKey().getId()
                        + " uses secondary key, do not merge it");
                continue;
            }

In this case, both the splits have same secondary key as they both do order B by f desc;  (which is not the case most of the time) and is possible to apply MultiQueryOptimizer. But currently MultiQueryOptimizer does not have that intelligence to check and merge into one plan if all of them have same secondary key.

Regards,
Rohini


On Tue, May 19, 2015 at 1:53 AM, Zhang, Liyun <li...@intel.com>> wrote:
Hi Rohini:
   I found a problem when executing following script in mr mode:


testAccumulator.join.pig



REGISTER /home/zly/prj/oss/kellyzly/pig/bin/myudfs.jar;

A = load './testAccumulator.txt' as (id:int,f);

B = foreach A generate id, f, id as t;

C = group B by id;

D = foreach C {

E = order B by f desc;

F = E.f;

generate group, myudfs.AccumulativeSumBag(F);

};

G = foreach C {

E = order B by f desc;

F = E.f;

generate group, myudfs.AccumulativeSumBag(F);

};

H = join D by group, G by group;

store H into 'testAccumulator.join.out';

explain H

cat myudfs/AccumulativeSumBag.java:
package myudfs;

import java.io.IOException;
import java.util.Iterator;
import org.apache.pig.EvalFunc;
import org.apache.pig.Accumulator;
import org.apache.pig.data.DataBag;
import org.apache.pig.data.Tuple;

/**
* This class is for testing of accumulator udfs
*
*/
public class AccumulativeSumBag extends EvalFunc<String> implements Accumulator<String>
{

    StringBuffer sb;

    public AccumulativeSumBag() {
    }

    public void accumulate(Tuple tuple) throws IOException {
        DataBag databag = (DataBag)tuple.get(0);
        if(databag == null)
            return;

        if (sb == null) {
            sb = new StringBuffer();
        }

        Iterator<Tuple> iterator = databag.iterator();
        while(iterator.hasNext()) {
            Tuple t = iterator.next();
            if (t.size()>1 && t.get(1) == null) {
                continue;
            }

            sb.append(t.toString());
        }
    }

    public String getValue() {
        if (sb != null && sb.length()>0) {
            return sb.toString();
        }
        return null;
    }

    public void cleanup() {
        sb = null;
    }

    public String exec(Tuple tuple) throws IOException {
        throw new IOException("exec() should not be called");
    }
}

the error message is:
ava.lang.Exception: org.apache.pig.backend.executionengine.ExecException: ERROR 0: Exception while executing (Name: H: Local Rearrange[tuple]{int}(fals     e) - scope-117 Operator Key: scope-117): org.apache.pig.backend.executionengine.ExecException: ERROR 2078: Caught error from UDF: myudfs.AccumulativeSum     Bag [exec() should not be called]
5619         at org.apache.hadoop.mapred.LocalJobRunner$Job.runTasks(LocalJobRunner.java:462)
5620         at org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:522)
5621 Caused by: org.apache.pig.backend.executionengine.ExecException: ERROR 0: Exception while executing (Name: H: Local Rearrange[tuple]{int}(false) - scope     -117 Operator Key: scope-117): org.apache.pig.backend.executionengine.ExecException: ERROR 2078: Caught error from UDF: myudfs.AccumulativeSumBag [exec(     ) should not be called]
5622         at org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator.processInput(PhysicalOperator.java:316)
5623         at org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLocalRearrange.getNextTuple(POLocalRearrange.java:291)
5624         at org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POUnion.getNextTuple(POUnion.java:167)
5625         at org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigGenericMapBase.runPipeline(PigGenericMapBase.java:279)
5626         at org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigGenericMapBase.map(PigGenericMapBase.java:274)
5627         at org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigGenericMapBase.map(PigGenericMapBase.java:64)
5628         at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:145)
5629         at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:764)
5630         at org.apache.hadoop.mapred.MapTask.run(MapTask.java:340)
5631         at org.apache.hadoop.mapred.LocalJobRunner$Job$MapTaskRunnable.run(LocalJobRunner.java:243)
5632         at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
5633         at java.util.concurrent.FutureTask.run(FutureTask.java:262)
5634         at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
5635         at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
5636         at java.lang.Thread.run(Thread.java:744)

Following is the physical plan and mr plan.
#-----------------------------------------------
# Physical Plan:
#-----------------------------------------------
H: Store(fakefile:org.apache.pig.builtin.PigStorage) - scope-57
|
|---H: New For Each(true,true)[tuple] - scope-56
    |   |
    |   Project[bag][1] - scope-54
    |   |
    |   Project[bag][2] - scope-55
    |
    |---H: Package(Packager)[tuple]{int} - scope-49
        |
        |---H: Global Rearrange[tuple] - scope-48
            |
            |---H: Local Rearrange[tuple]{int}(false) - scope-50
            |   |   |
            |   |   Project[int][0] - scope-51
            |   |
            |   |---D: New For Each(false,false)[bag] - scope-32
            |       |   |
            |       |   Project[int][0] - scope-22
            |       |   |
            |       |   POUserFunc(myudfs.AccumulativeSumBag)[chararray] - scope-25
            |       |   |
            |       |   |---RelationToExpressionProject[bag][*] - scope-24
            |       |       |
            |       |       |---F: New For Each(false)[bag] - scope-31
            |       |           |   |
            |       |           |   Project[bytearray][1] - scope-29
            |       |           |
            |       |           |---E: POSort[bag]() - scope-28
            |       |               |   |
            |       |               |   Project[bytearray][1] - scope-27
            |       |               |
            |       |               |---Project[bag][1] - scope-26
            |       |
            |       |---C: Filter[bag] - scope-20
            |           |   |
            |           |   Constant(true) - scope-21
            |           |
            |           |---C: Split - scope-19    // here an implicit Split is generated
            |               |
            |               |---C: Package(Packager)[tuple]{int} - scope-16
            |                   |
            |                   |---C: Global Rearrange[tuple] - scope-15
            |                       |
            |                       |---C: Local Rearrange[tuple]{int}(false) - scope-17
            |                           |   |
            |                           |   Project[int][0] - scope-18
            |                           |
            |                           |---B: New For Each(false,false,false)[bag] - scope-14
            |                               |   |
            |                               |   Project[int][0] - scope-7
           |                               |   |
            |                               |   Project[bytearray][1] - scope-9
            |                               |   |
            |                               |   POUserFunc(org.apache.pig.impl.builtin.IdentityColumn)[int] - scope-12
            |                               |   |
            |                               |   |---Project[int][0] - scope-11
            |                               |
            |                               |---A: New For Each(false,false)[bag] - scope-6
            |                                   |   |
            |                                   |   Cast[int] - scope-2
            |                                   |   |
            |                                   |   |---Project[bytearray][0] - scope-1
            |                                   |   |
            |                                   |   Project[bytearray][1] - scope-4
            |                                   |
            |                                   |---A: Load(hdfs://zly2.sh.intel.com:8020/user/root/testAccumulator.txt:org.apache.pig.builtin.PigStorage<http://zly2.sh.intel.com:8020/user/root/testAccumulator.txt:org.apache.pig.builtin.PigStorage>) - scope-0
            |
            |---H: Local Rearrange[tuple]{int}(false) - scope-52
                |   |
                |   Project[int][0] - scope-53
                |
                |---G: New For Each(false,false)[bag] - scope-45
                    |   |
                    |   Project[int][0] - scope-35
                    |   |
                    |   POUserFunc(myudfs.AccumulativeSumBag)[chararray] - scope-38
                    |   |
                    |   |---RelationToExpressionProject[bag][*] - scope-37
                    |       |
                    |       |---F: New For Each(false)[bag] - scope-44
                    |           |   |
                    |           |   Project[bytearray][1] - scope-42
                    |           |
                    |           |---E: POSort[bag]() - scope-41
                    |               |   |
                    |               |   Project[bytearray][1] - scope-40
                    |               |
                    |               |---Project[bag][1] - scope-39
                    |
                    |---C: Filter[bag] - scope-33
                        |   |
                        |   Constant(true) - scope-34
                        |
                        |---C: Split - scope-19
                            |
                            |---C: Package(Packager)[tuple]{int} - scope-16
                                |
                                |---C: Global Rearrange[tuple] - scope-15
                                    |
                                    |---C: Local Rearrange[tuple]{int}(false) - scope-17
                                        |   |
                                        |   Project[int][0] - scope-18
                                        |
                                        |---B: New For Each(false,false,false)[bag] - scope-14
                                            |   |
                                            |   Project[int][0] - scope-7
                                            |   |
                                            |   Project[bytearray][1] - scope-9
                                            |   |
                                            |   POUserFunc(org.apache.pig.impl.builtin.IdentityColumn)[int] - scope-12
                                            |   |
                                            |   |---Project[int][0] - scope-11
                                            |
                                            |---A: New For Each(false,false)[bag] - scope-6
                                                |   |
                                                |   Cast[int] - scope-2
                                                |   |
                                                |   |---Project[bytearray][0] - scope-1
                                                |   |
                                                |   Project[bytearray][1] - scope-4
                                                |
                                                |---A: Load(hdfs://zly2.sh.intel.com:8020/user/root/testAccumulator.txt:org.apache.pig.builtin.PigStorage<http://zly2.sh.intel.com:8020/user/root/testAccumulator.txt:org.apache.pig.builtin.PigStorage>) - scope-0

#--------------------------------------------------
# Map Reduce Plan
#--------------------------------------------------
MapReduce node scope-58
Map Plan
C: Local Rearrange[tuple]{int}(false) - scope-17
|   |
|   Project[int][0] - scope-18
|
|---B: New For Each(false,false,false)[bag] - scope-14
    |   |
    |   Project[int][0] - scope-7
    |   |
    |   Project[bytearray][1] - scope-9
    |   |
    |   POUserFunc(org.apache.pig.impl.builtin.IdentityColumn)[int] - scope-12
    |   |
    |   |---Project[int][0] - scope-11
    |
    |---A: New For Each(false,false)[bag] - scope-6
        |   |
        |   Cast[int] - scope-2
        |   |
        |   |---Project[bytearray][0] - scope-1
        |   |
        |   Project[bytearray][1] - scope-4
        |
        |---A: Load(hdfs://zly2.sh.intel.com:8020/user/root/testAccumulator.txt:org.apache.pig.builtin.PigStorage<http://zly2.sh.intel.com:8020/user/root/testAccumulator.txt:org.apache.pig.builtin.PigStorage>) - scope-0--------
Reduce Plan
Store(hdfs://zly2.sh.intel.com:8020/tmp/temp-281610513/tmp1960264662:org.apache.pig.impl.io.InterStorage<http://zly2.sh.intel.com:8020/tmp/temp-281610513/tmp1960264662:org.apache.pig.impl.io.InterStorage>) - scope-59
|
|---C: Package(Packager)[tuple]{int} - scope-16--------
Global sort: false
----------------

MapReduce node scope-64
Map Plan
Union[tuple] - scope-65
|
|---H: Local Rearrange[tuple]{int}(false) - scope-50
|   |   |
|   |   Project[int][0] - scope-51
|   |
|   |---D: New For Each(false,false)[bag] - scope-32
|       |   |
|       |   Project[int][0] - scope-22
|       |   |
|       |   POUserFunc(myudfs.AccumulativeSumBag)[chararray] - scope-25
|       |   |
|       |   |---RelationToExpressionProject[bag][*] - scope-24
|       |       |
|       |       |---F: New For Each(false)[bag] - scope-31
|       |           |   |
|       |           |   Project[bytearray][1] - scope-29
|       |           |
|       |           |---E: POSort[bag]() - scope-28   // POSort should be deleted in  SecondaryKeyOptimizerUtil.java#applySecondaryKeySort . but it is not deleted because the POSplit(scope-19 in physical plan ) makes group and foreach in different operators
|       |               |   |
|       |               |   Project[bytearray][1] - scope-27
|       |               |
|       |               |---Project[bag][1] - scope-26
|       |
|       |---Load(hdfs://zly2.sh.intel.com:8020/tmp/temp-281610513/tmp1960264662:org.apache.pig.impl.io.InterStorage<http://zly2.sh.intel.com:8020/tmp/temp-281610513/tmp1960264662:org.apache.pig.impl.io.InterStorage>) - scope-60
|
|---H: Local Rearrange[tuple]{int}(false) - scope-52
    |   |
    |   Project[int][0] - scope-53
    |
    |---G: New For Each(false,false)[bag] - scope-45
        |   |
        |   Project[int][0] - scope-35
        |   |
        |   POUserFunc(myudfs.AccumulativeSumBag)[chararray] - scope-38
        |   |
        |   |---RelationToExpressionProject[bag][*] - scope-37
        |       |
        |       |---F: New For Each(false)[bag] - scope-44
        |           |   |
        |           |   Project[bytearray][1] - scope-42
        |           |
        |           |---E: POSort[bag]() - scope-41
        |               |   |
        |               |   Project[bytearray][1] - scope-40
        |               |
        |               |---Project[bag][1] - scope-39
        |
        |---Load(hdfs://zly2.sh.intel.com:8020/tmp/temp-281610513/tmp1960264662:org.apache.pig.impl.io.InterStorage<http://zly2.sh.intel.com:8020/tmp/temp-281610513/tmp1960264662:org.apache.pig.impl.io.InterStorage>) - scope-62--------
Reduce Plan
H: Store(fakefile:org.apache.pig.builtin.PigStorage) - scope-57
|
|---H: Package(JoinPackager(true,true))[tuple]{int} - scope-49--------
Global sort: false
----------------

The reason why it fails is POSort is not deleted when secondary key sort is enables(POSort should be deleted in SecondaryKeyOptimizerUtil.java#applySecondaryKeySort). If POSplit is not deleted, the value “foundUDF “ in AccumulatorOptimizerUtil#addAccumulator is false and po_foreach.setAccumulative will not be called. This causes “Caught error from UDF: myudfs.AccumulativeSum     Bag [exec() should not be called]”. Because an implicit POSplit is generated, when poSplit is encounted in mr plan, a new mr Operator is generated(POSplit(scope-19) splits the physical plan into MapReduceNode scope-58  and MapReduceNode scope-64) . So SecondaryKeyOptimizerUtil.java#applySecondaryKeySort does not work.

AccumulatorOptimizerUtil#addAccumulator
  public static void addAccumulator(PhysicalPlan plan) {
        // See if this is a map-reduce job
        List<PhysicalOperator> pos = plan.getRoots();
        if (pos == null || pos.size() == 0) {
            return;
        }

       // See if this is a POPackage
        PhysicalOperator po_package = pos.get(0);
        if (!po_package.getClass().equals(POPackage.class)) {
            return;
        }

        Packager pkgr = ((POPackage) po_package).getPkgr();
        // Check that this is a standard package, not a subclass
        if (!pkgr.getClass().equals(Packager.class)) {
            return;
        }

        // if POPackage is for distinct, just return
        if (pkgr.isDistinct()) {
            return;
        }

        // if any input to POPackage is inner, just return
        boolean[] isInner = pkgr.getInner();
        for (boolean b: isInner) {
            if (b) {
                return;
            }
        }

        List<PhysicalOperator> l = plan.getSuccessors(po_package);
        // there should be only one POForEach
        if (l == null || l.size() == 0 || l.size() > 1) {
            return;
        }

        PhysicalOperator po_foreach = l.get(0);
        if (!(po_foreach instanceof POForEach)) {
            return;
        }

        boolean foundUDF = false;
        List<PhysicalPlan> list = ((POForEach)po_foreach).getInputPlans();
        for (PhysicalPlan p: list) {
            PhysicalOperator po = p.getLeaves().get(0);

            // only expression operators are allowed
            if (!(po instanceof ExpressionOperator)) {
                return;
            }

            if (((ExpressionOperator)po).containUDF()) {
                foundUDF = true;
            }

            if (!check(po)) {
                return;
            }
        }

        if (foundUDF) {
            // if all tests are passed, reducer can run in accumulative mode
            LOG.info("Reducer is to run in accumulative mode.");
            po_package.setAccumulative();
            po_foreach.setAccumulative();
        }
    }


My question: is it a bug  or pig does not deal with this kind of script case when implicit posplit is generated when secondary key optimization enables ?



Kelly Zhang/Zhang,Liyun
Best Regards