You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by xu...@apache.org on 2016/02/05 05:43:12 UTC

svn commit: r1728600 - in /pig/branches/spark: src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/ src/org/apache/pig/backend/hadoop/executionengine/spark/converter/ src/org/apache/pig/backend/hadoop/executionengine/spa...

Author: xuefu
Date: Fri Feb  5 04:43:12 2016
New Revision: 1728600

URL: http://svn.apache.org/viewvc?rev=1728600&view=rev
Log:
PIG-4766: Ensure GroupBy is optimized for all algebraic Operations (Pallavi via Xuefu)

Modified:
    pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/PORelationToExprProject.java
    pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/ReduceByConverter.java
    pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/optimizer/CombinerOptimizer.java
    pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/util/CombinerOptimizerUtil.java
    pig/branches/spark/test/org/apache/pig/newplan/logical/relational/TestLocationInPhysicalPlan.java
    pig/branches/spark/test/org/apache/pig/test/TestCombiner.java

Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/PORelationToExprProject.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/PORelationToExprProject.java?rev=1728600&r1=1728599&r2=1728600&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/PORelationToExprProject.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/PORelationToExprProject.java Fri Feb  5 04:43:12 2016
@@ -25,6 +25,7 @@ import org.apache.pig.backend.hadoop.exe
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
 import org.apache.pig.data.DataType;
 import org.apache.pig.data.NonSpillableDataBag;
+import org.apache.pig.impl.plan.NodeIdGenerator;
 import org.apache.pig.impl.plan.OperatorKey;
 import org.apache.pig.impl.plan.VisitorException;
 
@@ -138,10 +139,23 @@ public class PORelationToExprProject ext
         sendEmptyBagOnEOP = false;
         return(r);
     }
-       
+
+    // See PIG-4644
     @Override
     public PORelationToExprProject clone() throws CloneNotSupportedException {
-        return (PORelationToExprProject) super.clone();
+        ArrayList<Integer> cols = new ArrayList<>(columns.size());
+        // Can reuse the same Integer objects, as they are immutable
+        for (Integer i : columns) {
+            cols.add(i);
+        }
+        PORelationToExprProject clone = new PORelationToExprProject(new OperatorKey(mKey.scope,
+                NodeIdGenerator.getGenerator().getNextNodeId(mKey.scope)),
+                requestedParallelism, cols);
+        clone.cloneHelper(this);
+        clone.overloaded = overloaded;
+        clone.resultType = resultType;
+        clone.sendEmptyBagOnEOP = sendEmptyBagOnEOP;
+        return clone;
     }
     
 }

Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/ReduceByConverter.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/ReduceByConverter.java?rev=1728600&r1=1728599&r2=1728600&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/ReduceByConverter.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/ReduceByConverter.java Fri Feb  5 04:43:12 2016
@@ -172,12 +172,17 @@ public class ReduceByConverter implement
         @Override
         public Tuple apply(Tuple v1, Tuple v2) {
             LOG.debug("MergeValuesFunction in : " + v1 + " , " + v2);
-            Tuple result = tf.newTuple();
+            Tuple result = tf.newTuple(2);
             DataBag bag = DefaultBagFactory.getInstance().newDefaultBag();
             Tuple t = new DefaultTuple();
             try {
                 // Package the input tuples so they can be processed by Algebraic functions.
                 Object key = v1.get(0);
+                if (key == null) {
+                    key = "";
+                } else {
+                    result.set(0, key);
+                }
                 bag.add((Tuple) v1.get(1));
                 bag.add((Tuple) v2.get(1));
                 t.append(key);
@@ -197,14 +202,14 @@ public class ReduceByConverter implement
                 // But, we want the result to look like this:
                 // (ABC,((2),(3))) - A tuple with key and a value tuple (containing values).
                 // Hence, the construction of a new value tuple
-                result.append(t.get(0));
+
                 Tuple valueTuple = tf.newTuple();
                 for (Object o : ((Tuple) r.result).getAll()) {
                     if (!o.equals(key)) {
                         valueTuple.append(o);
                     }
                 }
-                result.append(valueTuple);
+                result.set(1,valueTuple);
                 LOG.debug("MergeValuesFunction out : " + result);
                 return result;
             } catch (ExecException e) {

Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/optimizer/CombinerOptimizer.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/optimizer/CombinerOptimizer.java?rev=1728600&r1=1728599&r2=1728600&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/optimizer/CombinerOptimizer.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/optimizer/CombinerOptimizer.java Fri Feb  5 04:43:12 2016
@@ -17,7 +17,6 @@
  */
 package org.apache.pig.backend.hadoop.executionengine.spark.optimizer;
 
-import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.LinkedList;
 import java.util.List;
@@ -26,8 +25,8 @@ import java.util.Map;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.pig.PigException;
+import org.apache.pig.backend.executionengine.ExecException;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.ConstantExpression;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POProject;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POUserFunc;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
@@ -54,7 +53,6 @@ import org.apache.pig.impl.plan.VisitorE
 import org.apache.pig.impl.plan.optimizer.OptimizerException;
 import org.apache.pig.impl.util.Pair;
 
-import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 
 /**
@@ -73,7 +71,7 @@ public class CombinerOptimizer extends S
     public void visitSparkOp(SparkOperator sparkOp) throws VisitorException {
         try {
             addCombiner(sparkOp.physicalPlan);
-        } catch (PlanException e) {
+        } catch (Exception e) {
             throw new VisitorException(e);
         }
     }
@@ -91,7 +89,7 @@ public class CombinerOptimizer extends S
     //      -> localRearrange
     //         -> foreach (using algebraicOp.Initial)
     //             -> CombinerRearrange
-    private void addCombiner(PhysicalPlan phyPlan) throws VisitorException, PlanException {
+    private void addCombiner(PhysicalPlan phyPlan) throws VisitorException, PlanException, CloneNotSupportedException {
 
         List<PhysicalOperator> leaves = phyPlan.getLeaves();
         if (leaves == null || leaves.size() != 1) {
@@ -120,6 +118,9 @@ public class CombinerOptimizer extends S
             }
             PhysicalOperator successor = poPackageSuccessors.get(0);
 
+            // Retaining the original successor to be used later in modifying the plan.
+            PhysicalOperator packageSuccessor = successor;
+
             if (successor instanceof POLimit) {
                 // POLimit is acceptable, as long as it has a single foreach as
                 // successor
@@ -137,11 +138,14 @@ public class CombinerOptimizer extends S
                 if (foreachSuccessors == null || foreachSuccessors.size() != 1) {
                     continue;
                 }
-                List<PhysicalPlan> feInners = foreach.getInputPlans();
+                // Clone foreach so it can be modified to a post-reduce foreach.
+                POForEach postReduceFE = foreach.clone();
+                List<PhysicalPlan> feInners = postReduceFE.getInputPlans();
 
                 // find algebraic operators and also check if the foreach statement
                 // is suitable for combiner use
-                List<Pair<PhysicalOperator, PhysicalPlan>> algebraicOps = findAlgebraicOps(feInners);
+                List<Pair<PhysicalOperator, PhysicalPlan>> algebraicOps = CombinerOptimizerUtil.findAlgebraicOps
+                        (feInners);
                 if (algebraicOps == null || algebraicOps.size() == 0) {
                     // the plan is not combinable or there is nothing to combine
                     // we're done
@@ -149,19 +153,18 @@ public class CombinerOptimizer extends S
                 }
                 try {
                     List<PhysicalOperator> glrPredecessors = phyPlan.getPredecessors(glr);
-                    if (glrPredecessors == null || glrPredecessors.isEmpty()) {
+                    // Exclude co-group from optimization
+                    if (glrPredecessors == null || glrPredecessors.size() != 1) {
                         continue;
                     }
 
                     if (!(glrPredecessors.get(0) instanceof POLocalRearrange)) {
                         continue;
                     }
-                    LOG.info("Algebraic operations found. Optimizing plan to use combiner.");
 
                     POLocalRearrange rearrange = (POLocalRearrange) glrPredecessors.get(0);
-                    PhysicalOperator foreachSuccessor = foreachSuccessors.get(0);
-                    // Clone foreach so it can be modified to an operation post-reduce.
-                    POForEach postReduceFE = foreach.clone();
+
+                    LOG.info("Algebraic operations found. Optimizing plan to use combiner.");
 
                     // Trim the global rearrange and the preceeding package.
                     convertToMapSideForEach(phyPlan, poPackage);
@@ -183,7 +186,7 @@ public class CombinerOptimizer extends S
                     }
 
                     // create new map foreach -
-                    POForEach mfe = CombinerOptimizerUtil.createForEachWithGrpProj(foreach, poPackage.getPkgr()
+                    POForEach mfe = CombinerOptimizerUtil.createForEachWithGrpProj(postReduceFE, poPackage.getPkgr()
                             .getKeyType());
                     Map<PhysicalOperator, Integer> op2newpos = Maps.newHashMap();
                     Integer pos = 1;
@@ -210,7 +213,7 @@ public class CombinerOptimizer extends S
                     }
 
                     // create new combine foreach
-                    POForEach cfe = CombinerOptimizerUtil.createForEachWithGrpProj(foreach, poPackage.getPkgr()
+                    POForEach cfe = CombinerOptimizerUtil.createForEachWithGrpProj(postReduceFE, poPackage.getPkgr()
                             .getKeyType());
                     // add algebraic functions with appropriate projection
                     CombinerOptimizerUtil.addAlgebraicFuncToCombineFE(cfe, op2newpos);
@@ -250,25 +253,29 @@ public class CombinerOptimizer extends S
                             .getRequestedParallelism(),
                             cfe.getInputPlans(), cfe.getToBeFlattened(), combinePack.getPkgr());
                     reduceOperator.setCustomPartitioner(glr.getCustomPartitioner());
-                    fixReduceSideFE(postReduceFE, cfe);
+                    fixReduceSideFE(postReduceFE, algebraicOps);
                     CombinerOptimizerUtil.changeFunc(reduceOperator, POUserFunc.INTERMEDIATE);
                     updatePackager(reduceOperator, newRearrange);
 
                     // Add the new operators
                     phyPlan.add(reduceOperator);
                     phyPlan.add(newRearrange);
-                    phyPlan.add(postReduceFE);
-                    // Reconnect as follows :
-                    // foreach (using algebraicOp.Final)
-                    //   -> reduceBy (uses algebraicOp.Intermediate)
+                    phyPlan.add(mfe);
+                    // Connect the new operators as follows:
+                    // reduceBy (using algebraicOp.Intermediate)
+                    //   -> rearrange
                     //      -> foreach (using algebraicOp.Initial)
-                    phyPlan.disconnect(foreach, foreachSuccessor);
-                    phyPlan.connect(foreach, newRearrange);
+                    phyPlan.connect(mfe, newRearrange);
                     phyPlan.connect(newRearrange, reduceOperator);
-                    phyPlan.connect(reduceOperator, postReduceFE);
-                    phyPlan.replace(foreach, mfe);
-                    phyPlan.connect(postReduceFE, foreachSuccessor);
 
+                    // Insert the reduce stage between combiner rearrange and its successor.
+                    phyPlan.disconnect(combinerLocalRearrange, packageSuccessor);
+                    phyPlan.connect(reduceOperator, packageSuccessor);
+                    phyPlan.connect(combinerLocalRearrange, mfe);
+
+                    // Replace foreach with post reduce foreach
+                    phyPlan.add(postReduceFE);
+                    phyPlan.replace(foreach, postReduceFE);
                 } catch (Exception e) {
                     int errCode = 2018;
                     String msg = "Internal error. Unable to introduce the combiner for optimization.";
@@ -278,6 +285,30 @@ public class CombinerOptimizer extends S
         }
     }
 
+    // Modifies the input plans of the post reduce foreach to match the output of reduce stage.
+    private void fixReduceSideFE(POForEach postReduceFE, List<Pair<PhysicalOperator, PhysicalPlan>> algebraicOps)
+            throws ExecException, PlanException {
+        int i=1;
+        for (Pair<PhysicalOperator, PhysicalPlan> algebraicOp : algebraicOps) {
+            POUserFunc combineUdf = (POUserFunc) algebraicOp.first;
+            PhysicalPlan pplan = algebraicOp.second;
+            combineUdf.setAlgebraicFunction(POUserFunc.FINAL);
+
+            POProject newProj = new POProject(
+                    CombinerOptimizerUtil.createOperatorKey(postReduceFE.getOperatorKey().getScope()),
+                    1, i
+            );
+            newProj.setResultType(DataType.BAG);
+
+            PhysicalOperator udfInput = pplan.getPredecessors(combineUdf).get(0);
+            pplan.disconnect(udfInput, combineUdf);
+            pplan.add(newProj);
+            pplan.connect(newProj, combineUdf);
+            i++;
+        }
+        postReduceFE.setResultType(DataType.TUPLE);
+    }
+
     // Modifies the map side of foreach (before reduce).
     private void convertToMapSideForEach(PhysicalPlan physicalPlan, POPackage poPackage)
             throws PlanException {
@@ -296,21 +327,6 @@ public class CombinerOptimizer extends S
         physicalPlan.removeAndReconnect(poPackage);
     }
 
-
-    // TODO: Modify the post reduce plan in case of nested algebraic(ExpressionOperator) or logical operations.
-    private void fixReduceSideFE(POForEach postReduceFE, POForEach cfe) throws PlanException,
-            CloneNotSupportedException {
-        List<PhysicalPlan> plans = cfe.getInputPlans();
-        List<PhysicalPlan> newPlans = new ArrayList<>();
-        for (int i = 0; i < plans.size(); i++) {
-            PhysicalPlan inputPlan = plans.get(i).clone();
-            newPlans.add(inputPlan);
-        }
-        postReduceFE.setInputPlans(newPlans);
-        CombinerOptimizerUtil.changeFunc(postReduceFE, POUserFunc.FINAL);
-        postReduceFE.setResultType(DataType.TUPLE);
-    }
-
     // Update the ReduceBy Operator with the packaging used by Local rearrange.
     private void updatePackager(POReduceBySpark reduceOperator, POLocalRearrange lrearrange) throws OptimizerException {
         Packager pkgr = reduceOperator.getPkgr();
@@ -318,7 +334,7 @@ public class CombinerOptimizer extends S
         // update the keyInfo information if already present in the POPackage
         Map<Integer, Pair<Boolean, Map<Integer, Integer>>> keyInfo = pkgr.getKeyInfo();
         if (keyInfo == null)
-            keyInfo = new HashMap<Integer, Pair<Boolean, Map<Integer, Integer>>>();
+            keyInfo = new HashMap<>();
 
         if (keyInfo.get(Integer.valueOf(lrearrange.getIndex())) != null) {
             // something is wrong - we should not be getting key info
@@ -339,82 +355,6 @@ public class CombinerOptimizer extends S
     }
 
     /**
-     * find algebraic operators and also check if the foreach statement is
-     * suitable for combiner use
-     *
-     * @param feInners inner plans of foreach
-     * @return null if plan is not combinable, otherwise list of combinable operators
-     * @throws VisitorException
-     */
-    // TODO : Since all combinable cases are not handled, not using the utility method in CombinerOptimizerUtil
-    private static List<Pair<PhysicalOperator, PhysicalPlan>> findAlgebraicOps(List<PhysicalPlan> feInners)
-            throws VisitorException {
-        List<Pair<PhysicalOperator, PhysicalPlan>> algebraicOps = Lists.newArrayList();
-
-        // check each foreach inner plan
-        for (PhysicalPlan pplan : feInners) {
-            // check for presence of non combinable operators
-            CombinerOptimizerUtil.AlgebraicPlanChecker algChecker = new CombinerOptimizerUtil.AlgebraicPlanChecker
-                    (pplan);
-            algChecker.visit();
-            if (algChecker.sawNonAlgebraic) {
-                return null;
-            }
-
-            // TODO : Distinct is combinable. Handle it.
-            if (algChecker.sawDistinctAgg) {
-                return null;
-            }
-
-            List<PhysicalOperator> roots = pplan.getRoots();
-            // combinable operators have to be attached to POProject root(s)
-            for (PhysicalOperator root : roots) {
-                if (root instanceof ConstantExpression) {
-                    continue;
-                }
-                if (!(root instanceof POProject)) {
-                    // how can this happen? - expect root of inner plan to be
-                    // constant or project. not combining it
-                    return null;
-                }
-                POProject proj = (POProject) root;
-                POUserFunc combineUdf = getAlgebraicSuccessor(pplan);
-                if (combineUdf == null) {
-                    if (proj.isProjectToEnd()) {
-                        // project-star or project to end
-                        // not combinable
-                        return null;
-                    }
-                    // Check to see if this is a projection of the grouping column.
-                    // If so, it will be a projection of col 0
-                    List<Integer> cols = proj.getColumns();
-                    if (cols != null && cols.size() == 1 && cols.get(0) == 0) {
-                        // it is project of grouping column, so the plan is
-                        // still combinable
-                        continue;
-                    } else {
-                        //not combinable
-                        return null;
-                    }
-                }
-
-                // The algebraic udf can have more than one input. Add the udf only once
-                boolean exist = false;
-                for (Pair<PhysicalOperator, PhysicalPlan> pair : algebraicOps) {
-                    if (pair.first.equals(combineUdf)) {
-                        exist = true;
-                        break;
-                    }
-                }
-                if (!exist)
-                    algebraicOps.add(new Pair<PhysicalOperator, PhysicalPlan>(combineUdf, pplan));
-            }
-        }
-
-        return algebraicOps;
-    }
-
-    /**
      * Look for a algebraic POUserFunc that is the leaf of an input plan.
      *
      * @param pplan physical plan

Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/util/CombinerOptimizerUtil.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/util/CombinerOptimizerUtil.java?rev=1728600&r1=1728599&r2=1728600&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/util/CombinerOptimizerUtil.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/util/CombinerOptimizerUtil.java Fri Feb  5 04:43:12 2016
@@ -316,7 +316,7 @@ public class CombinerOptimizerUtil {
      * @return null if plan is not combinable, otherwise list of combinable operators
      * @throws VisitorException
      */
-    private static List<Pair<PhysicalOperator, PhysicalPlan>> findAlgebraicOps(List<PhysicalPlan> feInners)
+    public static List<Pair<PhysicalOperator, PhysicalPlan>> findAlgebraicOps(List<PhysicalPlan> feInners)
             throws VisitorException {
         List<Pair<PhysicalOperator, PhysicalPlan>> algebraicOps = Lists.newArrayList();
 
@@ -570,7 +570,7 @@ public class CombinerOptimizerUtil {
         return pclr;
     }
 
-    private static OperatorKey createOperatorKey(String scope) {
+    public static OperatorKey createOperatorKey(String scope) {
         return new OperatorKey(scope, NodeIdGenerator.getGenerator().getNextNodeId(scope));
     }
 
@@ -663,13 +663,13 @@ public class CombinerOptimizerUtil {
      * Checks if there is something that prevents the use of algebraic interface,
      * and looks for the PODistinct that can be used as algebraic
      */
-    public static class AlgebraicPlanChecker extends PhyPlanVisitor {
-        public boolean sawNonAlgebraic = false;
-        public boolean sawDistinctAgg = false;
+    private static class AlgebraicPlanChecker extends PhyPlanVisitor {
+        boolean sawNonAlgebraic = false;
+        boolean sawDistinctAgg = false;
         private boolean sawForeach = false;
         private PODistinct distinct = null;
 
-        public AlgebraicPlanChecker(PhysicalPlan plan) {
+        AlgebraicPlanChecker(PhysicalPlan plan) {
             super(plan, new DependencyOrderWalker<PhysicalOperator, PhysicalPlan>(plan));
         }
 

Modified: pig/branches/spark/test/org/apache/pig/newplan/logical/relational/TestLocationInPhysicalPlan.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/newplan/logical/relational/TestLocationInPhysicalPlan.java?rev=1728600&r1=1728599&r2=1728600&view=diff
==============================================================================
--- pig/branches/spark/test/org/apache/pig/newplan/logical/relational/TestLocationInPhysicalPlan.java (original)
+++ pig/branches/spark/test/org/apache/pig/newplan/logical/relational/TestLocationInPhysicalPlan.java Fri Feb  5 04:43:12 2016
@@ -63,7 +63,7 @@ public class TestLocationInPhysicalPlan
         if (Util.getLocalTestMode().toString().equals("TEZ_LOCAL")) {
             Assert.assertEquals("A[1,4],A[3,4],B[2,4]", jStats.getAliasLocation());
         } else if (Util.getLocalTestMode().toString().equals("SPARK_LOCAL")) {
-            Assert.assertEquals("A[1,4],B[2,4],A[3,4]", jStats.getAliasLocation());
+            Assert.assertEquals("A[1,4],A[3,4],B[2,4],A[3,4]", jStats.getAliasLocation());
         } else {
             Assert.assertEquals("M: A[1,4],A[3,4],B[2,4] C: A[3,4],B[2,4] R: A[3,4]", jStats.getAliasLocation());
         }

Modified: pig/branches/spark/test/org/apache/pig/test/TestCombiner.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/TestCombiner.java?rev=1728600&r1=1728599&r2=1728600&view=diff
==============================================================================
--- pig/branches/spark/test/org/apache/pig/test/TestCombiner.java (original)
+++ pig/branches/spark/test/org/apache/pig/test/TestCombiner.java Fri Feb  5 04:43:12 2016
@@ -412,7 +412,13 @@ public class TestCombiner {
         ByteArrayOutputStream baos = new ByteArrayOutputStream();
         PrintStream ps = new PrintStream(baos);
         pigServer.explain(variable, ps);
-        boolean combinerFound = baos.toString().matches("(?si).*combine plan.*");
+        boolean combinerFound;
+        if (pigServer.getPigContext().getExecType().name().equalsIgnoreCase("spark")) {
+            combinerFound = baos.toString().contains("Reduce By");
+        } else {
+            combinerFound = baos.toString().matches("(?si).*combine plan.*");
+        }
+
         System.out.println(baos.toString());
         assertEquals("is combiner present as expected", combineExpected, combinerFound);
     }