You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@pig.apache.org by "Zhang, Liyun" <li...@intel.com> on 2015/02/05 09:46:45 UTC

How to deal with visitLocalRearrange in spark mode

Hi all:
Now i'm working on PIG-4374<https://issues.apache.org/jira/browse/PIG-4374>(Add SparkPlan in spark package). I met problem in following scripts in spark mode.
Join.pig
A = load '/SkewedJoinInput1.txt' as (id,name,n);
B = load '/SkewedJoinInput2.txt' as (id,name);
C = group A by id;
D = group B by id;
E = join C by group, D by group;
store E into '/skewedjoin.out';
explain E;

The physical plan will change to a mr plan which contains 3 mapreduce nodes (see attached mr_join.txt)
"logroup" will converts to  "poLocalRearrange","poGlobalRearrange", "poPackage"
"lojoin" will converts to "poLocalRearrange","poGlobalRearrange", "poPackage","poPackage"

   in mapreduce mode, In MapReduceOper, there is mapplan, reduceplan, combineplan.
   org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRCompiler.visitLocalRearrange
   org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRCompiler.addToMap
   private void addToMap(PhysicalOperator op) throws PlanException, IOException{
         if (compiledInputs.length == 1) {
             //For speed
             MapReduceOper mro = compiledInputs[0];
             if (!mro.isMapDone()) {
                 mro.mapPlan.addAsLeaf(op);
             } else if (mro.isMapDone() && !mro.isReduceDone()) {
                 FileSpec fSpec = getTempFileSpec();

                 POStore st = getStore();     // MyComment: It will first add a POStore in mro.reducePlan and store the mro result in a tmp file.
                                                    // Then create a new MROper which contains a poload which loads previous tmp file
                 st.setSFile(fSpec);
                 mro.reducePlan.addAsLeaf(st);
                 mro.setReduceDone(true);
                 mro = startNew(fSpec, mro);
                 mro.mapPlan.addAsLeaf(op);
                 compiledInputs[0] = mro;
             } else {
                 int errCode = 2022;
                 String msg = "Both map and reduce phases have been done. This is unexpected while compiling.";
                 throw new PlanException(msg, errCode, PigException.BUG);
             }
             curMROp = mro;

          ....
          }

      In SparkOper I created, there is only plan.
      How can i deal with the situation i mentioned above? Now I use following ways to deal with:
      org.apache.pig.backend.hadoop.executionengine.spark.plan.SparkCompiler.visitLocalRearrange
      org.apache.pig.backend.hadoop.executionengine.spark.plan.SparkCompiler.addToMap
      private void addToMap(POLocalRearrange op) throws PlanException, IOException {
              if (compiledInputs.length == 1) {
                  SparkOper sparkOp = compiledInputs[0];
                  List<PhysicalOperator> preds =  plan.getPredecessors(op); //MyComment: It will first search the predecessor of POLocalRearrange,
                  if( preds!=null && preds.size() >0 && preds.size() == 1){
                      if(!( preds.get(0) instanceof  POLoad)  ){     // If predecessor is not a poload(usually the precessor of polocalrearrange is poload when using "group", "join")
                          FileSpec fSpec = getTempFileSpec();        //it will add a POStore in sparkOper.plan and store the sparkOper result in a tmp file
                          POStore st = getStore();                   // Then create a new SparkOper which contains a poload which loads previous tmp file
                          st.setSFile(fSpec);
                          sparkOp.plan.addAsLeaf(st);
                          sparkOp = startNew(fSpec, sparkOp);
                          compiledInputs[0] = sparkOp;
                      }
                  }
                  sparkOp.plan.addAsLeaf(op);
                  curSparkOp = sparkOp;
              } else {
              }
                  .....
              }


Can anyone tell me how tez deal with this situation, I want to reference something from other execution mode like mapreduce, tez.


Best regards
Zhang,Liyun


Re: How to deal with visitLocalRearrange in spark mode

Posted by Praveen R <pr...@sigmoidanalytics.com>.
Hi Kelly,

Incase of pig-spark, we may not need to split the operators into map,
reduce and combine plan but instead have all related operators on the same
plan since the handling of intermediate data and optimising the data
transformations flow would be taken care by Spark.

Regards,
Praveen

On Thu, Feb 5, 2015 at 2:16 PM, Zhang, Liyun <li...@intel.com> wrote:

>  Hi all:
>
> Now i'm working on PIG-4374
> <https://issues.apache.org/jira/browse/PIG-4374>(Add SparkPlan in spark
> package). I met problem in following scripts in spark mode.
>
> Join.pig
>
> A = load '/SkewedJoinInput1.txt' as (id,name,n);
>
> B = load '/SkewedJoinInput2.txt' as (id,name);
>
> C = group A by id;
>
> D = group B by id;
>
> E = join C by group, D by group;
>
> store E into '/skewedjoin.out';
>
> explain E;
>
>
>
> The physical plan will change to a mr plan which contains 3 mapreduce
> nodes (see attached mr_join.txt)
>
> "logroup" will converts to  "poLocalRearrange","poGlobalRearrange",
> "poPackage"
>
> "lojoin" will converts to "poLocalRearrange","poGlobalRearrange",
> "poPackage","poPackage"
>
>
>
>    in mapreduce mode, In MapReduceOper, there is mapplan, reduceplan,
> combineplan.
>
>
> org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRCompiler.visitLocalRearrange
>
>
> org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRCompiler.addToMap
>
>    private void addToMap(PhysicalOperator op) throws PlanException,
> IOException{
>
>          if (compiledInputs.length == 1) {
>
>              //For speed
>
>              MapReduceOper mro = compiledInputs[0];
>
>              if (!mro.isMapDone()) {
>
>                  mro.mapPlan.addAsLeaf(op);
>
>              } else if (mro.isMapDone() && !mro.isReduceDone()) {
>
>                  FileSpec fSpec = getTempFileSpec();
>
>
>
>                  POStore st = getStore();     // MyComment: It will first
> add a POStore in mro.reducePlan and store the mro result in a tmp file.
>
>                                                     // Then create a new
> MROper which contains a poload which loads previous tmp file
>
>                  st.setSFile(fSpec);
>
>                  mro.reducePlan.addAsLeaf(st);
>
>                  mro.setReduceDone(true);
>
>                  mro = startNew(fSpec, mro);
>
>                  mro.mapPlan.addAsLeaf(op);
>
>                  compiledInputs[0] = mro;
>
>              } else {
>
>                  int errCode = 2022;
>
>                  String msg = "Both map and reduce phases have been done.
> This is unexpected while compiling.";
>
>                  throw new PlanException(msg, errCode, PigException.BUG);
>
>              }
>
>              curMROp = mro;
>
>
>
>           ....
>
>           }
>
>
>
>       In SparkOper I created, there is only plan.
>
>       How can i deal with the situation i mentioned above? Now I use
> following ways to deal with:
>
>
> org.apache.pig.backend.hadoop.executionengine.spark.plan.SparkCompiler.visitLocalRearrange
>
>
> org.apache.pig.backend.hadoop.executionengine.spark.plan.SparkCompiler.addToMap
>
>       private void addToMap(POLocalRearrange op) throws PlanException,
> IOException {
>
>               if (compiledInputs.length == 1) {
>
>                   SparkOper sparkOp = compiledInputs[0];
>
>                   List<PhysicalOperator> preds =
> plan.getPredecessors(op); //MyComment: It will first search the predecessor
> of POLocalRearrange,
>
>                   if( preds!=null && preds.size() >0 && preds.size() == 1){
>
>                       if(!( preds.get(0) instanceof  POLoad)  ){     // If
> predecessor is not a poload(usually the precessor of polocalrearrange is
> poload when using "group", "join")
>
>                           FileSpec fSpec = getTempFileSpec();        //it
> will add a POStore in sparkOper.plan and store the sparkOper result in a
> tmp file
>
>                           POStore st = getStore();                   //
> Then create a new SparkOper which contains a poload which loads previous
> tmp file
>
>                           st.setSFile(fSpec);
>
>                           sparkOp.plan.addAsLeaf(st);
>
>                           sparkOp = startNew(fSpec, sparkOp);
>
>                           compiledInputs[0] = sparkOp;
>
>                       }
>
>                   }
>
>                   sparkOp.plan.addAsLeaf(op);
>
>                   curSparkOp = sparkOp;
>
>               } else {
>
>               }
>
>                   .....
>
>               }
>
>
>
>
>
> Can anyone tell me how tez deal with this situation, I want to reference
> something from other execution mode like mapreduce, tez.
>
>
>
>
>
> Best regards
>
> Zhang,Liyun
>
>
>