You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@pig.apache.org by "Zhang, Liyun" <li...@intel.com> on 2015/02/05 09:46:45 UTC
How to deal with visitLocalRearrange in spark mode
Hi all:
Now i'm working on PIG-4374<https://issues.apache.org/jira/browse/PIG-4374>(Add SparkPlan in spark package). I met problem in following scripts in spark mode.
Join.pig
A = load '/SkewedJoinInput1.txt' as (id,name,n);
B = load '/SkewedJoinInput2.txt' as (id,name);
C = group A by id;
D = group B by id;
E = join C by group, D by group;
store E into '/skewedjoin.out';
explain E;
The physical plan will change to a mr plan which contains 3 mapreduce nodes (see attached mr_join.txt)
"logroup" will converts to "poLocalRearrange","poGlobalRearrange", "poPackage"
"lojoin" will converts to "poLocalRearrange","poGlobalRearrange", "poPackage","poPackage"
in mapreduce mode, In MapReduceOper, there is mapplan, reduceplan, combineplan.
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRCompiler.visitLocalRearrange
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRCompiler.addToMap
private void addToMap(PhysicalOperator op) throws PlanException, IOException{
if (compiledInputs.length == 1) {
//For speed
MapReduceOper mro = compiledInputs[0];
if (!mro.isMapDone()) {
mro.mapPlan.addAsLeaf(op);
} else if (mro.isMapDone() && !mro.isReduceDone()) {
FileSpec fSpec = getTempFileSpec();
POStore st = getStore(); // MyComment: It will first add a POStore in mro.reducePlan and store the mro result in a tmp file.
// Then create a new MROper which contains a poload which loads previous tmp file
st.setSFile(fSpec);
mro.reducePlan.addAsLeaf(st);
mro.setReduceDone(true);
mro = startNew(fSpec, mro);
mro.mapPlan.addAsLeaf(op);
compiledInputs[0] = mro;
} else {
int errCode = 2022;
String msg = "Both map and reduce phases have been done. This is unexpected while compiling.";
throw new PlanException(msg, errCode, PigException.BUG);
}
curMROp = mro;
....
}
In SparkOper I created, there is only plan.
How can i deal with the situation i mentioned above? Now I use following ways to deal with:
org.apache.pig.backend.hadoop.executionengine.spark.plan.SparkCompiler.visitLocalRearrange
org.apache.pig.backend.hadoop.executionengine.spark.plan.SparkCompiler.addToMap
private void addToMap(POLocalRearrange op) throws PlanException, IOException {
if (compiledInputs.length == 1) {
SparkOper sparkOp = compiledInputs[0];
List<PhysicalOperator> preds = plan.getPredecessors(op); //MyComment: It will first search the predecessor of POLocalRearrange,
if( preds!=null && preds.size() >0 && preds.size() == 1){
if(!( preds.get(0) instanceof POLoad) ){ // If predecessor is not a poload(usually the precessor of polocalrearrange is poload when using "group", "join")
FileSpec fSpec = getTempFileSpec(); //it will add a POStore in sparkOper.plan and store the sparkOper result in a tmp file
POStore st = getStore(); // Then create a new SparkOper which contains a poload which loads previous tmp file
st.setSFile(fSpec);
sparkOp.plan.addAsLeaf(st);
sparkOp = startNew(fSpec, sparkOp);
compiledInputs[0] = sparkOp;
}
}
sparkOp.plan.addAsLeaf(op);
curSparkOp = sparkOp;
} else {
}
.....
}
Can anyone tell me how tez deal with this situation, I want to reference something from other execution mode like mapreduce, tez.
Best regards
Zhang,Liyun
Re: How to deal with visitLocalRearrange in spark mode
Posted by Praveen R <pr...@sigmoidanalytics.com>.
Hi Kelly,
Incase of pig-spark, we may not need to split the operators into map,
reduce and combine plan but instead have all related operators on the same
plan since the handling of intermediate data and optimising the data
transformations flow would be taken care by Spark.
Regards,
Praveen
On Thu, Feb 5, 2015 at 2:16 PM, Zhang, Liyun <li...@intel.com> wrote:
> Hi all:
>
> Now i'm working on PIG-4374
> <https://issues.apache.org/jira/browse/PIG-4374>(Add SparkPlan in spark
> package). I met problem in following scripts in spark mode.
>
> Join.pig
>
> A = load '/SkewedJoinInput1.txt' as (id,name,n);
>
> B = load '/SkewedJoinInput2.txt' as (id,name);
>
> C = group A by id;
>
> D = group B by id;
>
> E = join C by group, D by group;
>
> store E into '/skewedjoin.out';
>
> explain E;
>
>
>
> The physical plan will change to a mr plan which contains 3 mapreduce
> nodes (see attached mr_join.txt)
>
> "logroup" will converts to "poLocalRearrange","poGlobalRearrange",
> "poPackage"
>
> "lojoin" will converts to "poLocalRearrange","poGlobalRearrange",
> "poPackage","poPackage"
>
>
>
> in mapreduce mode, In MapReduceOper, there is mapplan, reduceplan,
> combineplan.
>
>
> org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRCompiler.visitLocalRearrange
>
>
> org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRCompiler.addToMap
>
> private void addToMap(PhysicalOperator op) throws PlanException,
> IOException{
>
> if (compiledInputs.length == 1) {
>
> //For speed
>
> MapReduceOper mro = compiledInputs[0];
>
> if (!mro.isMapDone()) {
>
> mro.mapPlan.addAsLeaf(op);
>
> } else if (mro.isMapDone() && !mro.isReduceDone()) {
>
> FileSpec fSpec = getTempFileSpec();
>
>
>
> POStore st = getStore(); // MyComment: It will first
> add a POStore in mro.reducePlan and store the mro result in a tmp file.
>
> // Then create a new
> MROper which contains a poload which loads previous tmp file
>
> st.setSFile(fSpec);
>
> mro.reducePlan.addAsLeaf(st);
>
> mro.setReduceDone(true);
>
> mro = startNew(fSpec, mro);
>
> mro.mapPlan.addAsLeaf(op);
>
> compiledInputs[0] = mro;
>
> } else {
>
> int errCode = 2022;
>
> String msg = "Both map and reduce phases have been done.
> This is unexpected while compiling.";
>
> throw new PlanException(msg, errCode, PigException.BUG);
>
> }
>
> curMROp = mro;
>
>
>
> ....
>
> }
>
>
>
> In SparkOper I created, there is only plan.
>
> How can i deal with the situation i mentioned above? Now I use
> following ways to deal with:
>
>
> org.apache.pig.backend.hadoop.executionengine.spark.plan.SparkCompiler.visitLocalRearrange
>
>
> org.apache.pig.backend.hadoop.executionengine.spark.plan.SparkCompiler.addToMap
>
> private void addToMap(POLocalRearrange op) throws PlanException,
> IOException {
>
> if (compiledInputs.length == 1) {
>
> SparkOper sparkOp = compiledInputs[0];
>
> List<PhysicalOperator> preds =
> plan.getPredecessors(op); //MyComment: It will first search the predecessor
> of POLocalRearrange,
>
> if( preds!=null && preds.size() >0 && preds.size() == 1){
>
> if(!( preds.get(0) instanceof POLoad) ){ // If
> predecessor is not a poload(usually the precessor of polocalrearrange is
> poload when using "group", "join")
>
> FileSpec fSpec = getTempFileSpec(); //it
> will add a POStore in sparkOper.plan and store the sparkOper result in a
> tmp file
>
> POStore st = getStore(); //
> Then create a new SparkOper which contains a poload which loads previous
> tmp file
>
> st.setSFile(fSpec);
>
> sparkOp.plan.addAsLeaf(st);
>
> sparkOp = startNew(fSpec, sparkOp);
>
> compiledInputs[0] = sparkOp;
>
> }
>
> }
>
> sparkOp.plan.addAsLeaf(op);
>
> curSparkOp = sparkOp;
>
> } else {
>
> }
>
> .....
>
> }
>
>
>
>
>
> Can anyone tell me how tez deal with this situation, I want to reference
> something from other execution mode like mapreduce, tez.
>
>
>
>
>
> Best regards
>
> Zhang,Liyun
>
>
>