You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ro...@apache.org on 2014/02/07 02:18:15 UTC

svn commit: r1565509 [1/3] - in /pig/branches/tez: src/org/apache/pig/ src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/ src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/ src/org/apache/pig/backend/hadoop/executione...

Author: rohini
Date: Fri Feb  7 01:18:14 2014
New Revision: 1565509

URL: http://svn.apache.org/r1565509
Log:
PIG-3748: Support for multiquery off in Tez

Added:
    pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/POValueInputTez.java
    pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/POValueOutputTez.java
Modified:
    pig/branches/tez/src/org/apache/pig/Main.java
    pig/branches/tez/src/org/apache/pig/PigServer.java
    pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java
    pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhyPlanVisitor.java
    pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/util/PlanHelper.java
    pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/PigProcessor.java
    pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezCompiler.java
    pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java
    pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezOperPlan.java
    pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezOperator.java
    pig/branches/tez/test/org/apache/pig/test/TestBatchAliases.java
    pig/branches/tez/test/org/apache/pig/test/TestGrunt.java
    pig/branches/tez/test/org/apache/pig/test/TestLoad.java
    pig/branches/tez/test/org/apache/pig/test/TestMultiQuery.java
    pig/branches/tez/test/org/apache/pig/test/TestMultiQueryBasic.java
    pig/branches/tez/test/org/apache/pig/test/TestMultiQueryCompiler.java
    pig/branches/tez/test/org/apache/pig/test/TestMultiQueryLocal.java
    pig/branches/tez/test/org/apache/pig/test/TestPigRunner.java
    pig/branches/tez/test/org/apache/pig/test/TestPigServer.java
    pig/branches/tez/test/org/apache/pig/test/TestStore.java
    pig/branches/tez/test/org/apache/pig/test/data/GoldenFiles/TEZC7.gld

Modified: pig/branches/tez/src/org/apache/pig/Main.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/Main.java?rev=1565509&r1=1565508&r2=1565509&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/Main.java (original)
+++ pig/branches/tez/src/org/apache/pig/Main.java Fri Feb  7 01:18:14 2014
@@ -297,7 +297,7 @@ public class Main {
 
                 case 'M':
                     // turns off multiquery optimization
-                    properties.setProperty("opt.multiquery",""+false);
+                    properties.setProperty(PigConfiguration.OPT_MULTIQUERY,""+false);
                     break;
 
                 case 'p':

Modified: pig/branches/tez/src/org/apache/pig/PigServer.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/PigServer.java?rev=1565509&r1=1565508&r2=1565509&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/PigServer.java (original)
+++ pig/branches/tez/src/org/apache/pig/PigServer.java Fri Feb  7 01:18:14 2014
@@ -229,7 +229,8 @@ public class PigServer {
         currDAG = new Graph(false);
 
         aggregateWarning = "true".equalsIgnoreCase(pigContext.getProperties().getProperty("aggregate.warning"));
-        isMultiQuery = "true".equalsIgnoreCase(pigContext.getProperties().getProperty("opt.multiquery","true"));
+        isMultiQuery = "true".equalsIgnoreCase(pigContext.getProperties()
+                .getProperty(PigConfiguration.OPT_MULTIQUERY, "true"));
 
         jobName = pigContext.getProperties().getProperty(
                 PigContext.JOB_NAME,

Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java?rev=1565509&r1=1565508&r2=1565509&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java Fri Feb  7 01:18:14 2014
@@ -668,7 +668,7 @@ public class MapReduceLauncher extends L
         fRem.visit();
 
         boolean isMultiQuery =
-            "true".equalsIgnoreCase(pc.getProperties().getProperty("opt.multiquery","true"));
+            "true".equalsIgnoreCase(pc.getProperties().getProperty(PigConfiguration.OPT_MULTIQUERY, "true"));
 
         if (isMultiQuery) {
             // reduces the number of MROpers in the MR plan generated

Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhyPlanVisitor.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhyPlanVisitor.java?rev=1565509&r1=1565508&r2=1565509&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhyPlanVisitor.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhyPlanVisitor.java Fri Feb  7 01:18:14 2014
@@ -93,6 +93,10 @@ public class PhyPlanVisitor extends Plan
         super(plan, walker);
     }
 
+    public void visit(PhysicalOperator op) {
+        // do nothing
+    }
+
     public void visitLoad(POLoad ld) throws VisitorException{
         //do nothing
     }

Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/util/PlanHelper.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/util/PlanHelper.java?rev=1565509&r1=1565508&r2=1565509&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/util/PlanHelper.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/util/PlanHelper.java Fri Feb  7 01:18:14 2014
@@ -168,8 +168,9 @@ public class PlanHelper {
             return !foundOps.isEmpty();
         }
 
+        @Override
         @SuppressWarnings("unchecked")
-        private void visit(PhysicalOperator op) {
+        public void visit(PhysicalOperator op) {
             if (opClass.isAssignableFrom(op.getClass())) {
                 foundOps.add((C) op);
             }

Added: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/POValueInputTez.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/POValueInputTez.java?rev=1565509&view=auto
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/POValueInputTez.java (added)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/POValueInputTez.java Fri Feb  7 01:18:14 2014
@@ -0,0 +1,113 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.backend.hadoop.executionengine.tez;
+
+import java.io.IOException;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.plan.OperatorKey;
+import org.apache.pig.impl.plan.VisitorException;
+import org.apache.tez.runtime.api.LogicalInput;
+import org.apache.tez.runtime.library.api.KeyValueReader;
+
+/**
+ * POValueInputTez is used read tuples from a Tez Intermediate output from a 1-1
+ * edge
+ */
+public class POValueInputTez extends PhysicalOperator implements TezLoad {
+
+    private static final long serialVersionUID = 1L;
+    private static final Log LOG = LogFactory.getLog(POValueInputTez.class);
+    private String inputKey;
+    // TODO Change this to value only reader after implementing
+    // value only input output
+    private transient KeyValueReader reader;
+
+    public POValueInputTez(OperatorKey k) {
+        super(k);
+    }
+
+    @Override
+    public void attachInputs(Map<String, LogicalInput> inputs,
+            Configuration conf)
+            throws ExecException {
+        LogicalInput input = inputs.get(inputKey);
+        if (input == null) {
+            throw new ExecException("Input from vertex " + inputKey + " is missing");
+        }
+        try {
+            reader = (KeyValueReader) input.getReader();
+            LOG.info("Attached input from vertex " + inputKey + " : input=" + input + ", reader=" + reader);
+        } catch (Exception e) {
+            throw new ExecException(e);
+        }
+    }
+
+    @Override
+    public Result getNextTuple() throws ExecException {
+        try {
+            if (reader.next()) {
+                return new Result(POStatus.STATUS_OK, reader.getCurrentValue());
+            } else {
+                return RESULT_EOP;
+            }
+        } catch (IOException e) {
+            throw new ExecException(e);
+        }
+    }
+
+    public void setInputKey(String inputKey) {
+        this.inputKey = inputKey;
+    }
+
+    @Override
+    public Tuple illustratorMarkup(Object in, Object out, int eqClassIndex) {
+        // TODO Auto-generated method stub
+        return null;
+    }
+
+    @Override
+    public void visit(PhyPlanVisitor v) throws VisitorException {
+        v.visit(this);
+    }
+
+    @Override
+    public boolean supportsMultipleInputs() {
+        return false;
+    }
+
+    @Override
+    public boolean supportsMultipleOutputs() {
+        return false;
+    }
+
+    @Override
+    public String name() {
+        return "POValueInputTez - " + mKey.toString() + "\t<-\t " + inputKey;
+    }
+}

Added: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/POValueOutputTez.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/POValueOutputTez.java?rev=1565509&view=auto
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/POValueOutputTez.java (added)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/POValueOutputTez.java Fri Feb  7 01:18:14 2014
@@ -0,0 +1,150 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.tez;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Writable;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.plan.OperatorKey;
+import org.apache.pig.impl.plan.VisitorException;
+import org.apache.tez.runtime.api.LogicalOutput;
+import org.apache.tez.runtime.library.api.KeyValueWriter;
+
+public class POValueOutputTez extends PhysicalOperator implements TezOutput {
+
+    private static final long serialVersionUID = 1L;
+    private static final Log LOG = LogFactory.getLog(POValueOutputTez.class);
+    // TODO Change this to outputKey and write only once
+    // when a shared edge support is available in Tez
+    protected Set<String> outputKeys = new HashSet<String>();
+    // TODO Change this to value only writer after implementing
+    // value only input output
+    protected transient List<KeyValueWriter> writers;
+
+    private static EmptyWritable EMPTY_KEY = new EmptyWritable();
+
+    public POValueOutputTez(OperatorKey k) {
+        super(k);
+    }
+
+    @Override
+    public void attachOutputs(Map<String, LogicalOutput> outputs,
+            Configuration conf) throws ExecException {
+        writers = new ArrayList<KeyValueWriter>();
+        for (String outputKey : outputKeys) {
+            LogicalOutput output = outputs.get(outputKey);
+            if (output == null) {
+                throw new ExecException("Output to vertex " + outputKey
+                        + " is missing");
+            }
+            try {
+                KeyValueWriter writer = (KeyValueWriter) output.getWriter();
+                writers.add(writer);
+                LOG.info("Attached output to vertex " + outputKey + " : output=" + output + ", writer=" + writer);
+            } catch (Exception e) {
+                throw new ExecException(e);
+            }
+        }
+    }
+
+    public void addOutputKey(String outputKey) {
+        outputKeys.add(outputKey);
+    }
+
+    public void removeOutputKey(String outputKey) {
+        outputKeys.remove(outputKey);
+    }
+
+    @Override
+    public Result getNextTuple() throws ExecException {
+        Result inp;
+        while (true) {
+            inp = processInput();
+            if (inp.returnStatus == POStatus.STATUS_EOP
+                    || inp.returnStatus == POStatus.STATUS_ERR) {
+                break;
+            }
+            if (inp.returnStatus == POStatus.STATUS_NULL) {
+                continue;
+            }
+            for (KeyValueWriter writer : writers) {
+                try {
+                    writer.write(EMPTY_KEY, inp.result);
+                } catch (IOException e) {
+                    throw new ExecException(e);
+                }
+            }
+            return RESULT_EMPTY;
+        }
+        return inp;
+    }
+
+    @Override
+    public Tuple illustratorMarkup(Object in, Object out, int eqClassIndex) {
+        // TODO Auto-generated method stub
+        return null;
+    }
+
+    @Override
+    public void visit(PhyPlanVisitor v) throws VisitorException {
+        v.visit(this);
+    }
+
+    @Override
+    public boolean supportsMultipleInputs() {
+        return false;
+    }
+
+    @Override
+    public boolean supportsMultipleOutputs() {
+        return false;
+    }
+
+    @Override
+    public String name() {
+        return "POValueOutputTez - " + mKey.toString() + "\t->\t " + outputKeys;
+    }
+
+    public static class EmptyWritable implements Writable {
+
+        @Override
+        public void write(DataOutput out) throws IOException {
+        }
+
+        @Override
+        public void readFields(DataInput in) throws IOException {
+        }
+    }
+
+}

Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/PigProcessor.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/PigProcessor.java?rev=1565509&r1=1565508&r2=1565509&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/PigProcessor.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/PigProcessor.java Fri Feb  7 01:18:14 2014
@@ -155,6 +155,10 @@ public class PigProcessor implements Log
         for (POIdentityInOutTez identityInOut : identityInOuts){
             identityInOut.attachInputs(inputs, conf);
         }
+        LinkedList<POValueInputTez> valueInputs = PlanHelper.getPhysicalOperators(execPlan, POValueInputTez.class);
+        for (POValueInputTez input : valueInputs){
+            input.attachInputs(inputs, conf);
+        }
         LinkedList<POFRJoinTez> broadcasts = PlanHelper.getPhysicalOperators(execPlan, POFRJoinTez.class);
         for (POFRJoinTez broadcast : broadcasts){
             broadcast.attachInputs(inputs, conf);
@@ -170,6 +174,10 @@ public class PigProcessor implements Log
         for (POLocalRearrangeTez lr : rearranges){
             lr.attachOutputs(outputs, conf);
         }
+        LinkedList<POValueOutputTez> valueOutputs = PlanHelper.getPhysicalOperators(execPlan, POValueOutputTez.class);
+        for (POValueOutputTez output : valueOutputs){
+            output.attachOutputs(outputs, conf);
+        }
         for (Entry<String, LogicalOutput> entry : outputs.entrySet()){
             LogicalOutput logicalOutput = entry.getValue();
             if (logicalOutput instanceof MROutput){

Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezCompiler.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezCompiler.java?rev=1565509&r1=1565508&r2=1565509&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezCompiler.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezCompiler.java Fri Feb  7 01:18:14 2014
@@ -186,7 +186,7 @@ public class TezCompiler extends PhyPlan
     // Segment a single DAG into a DAG graph
     public TezPlanContainer getPlanContainer() throws PlanException {
         TezPlanContainer tezPlanContainer = new TezPlanContainer(pigContext);
-        TezPlanContainerNode node = new TezPlanContainerNode(new OperatorKey(scope, nig.getNextNodeId(scope)), tezPlan);
+        TezPlanContainerNode node = new TezPlanContainerNode(OperatorKey.genOpKey(scope), tezPlan);
         tezPlanContainer.add(node);
         tezPlanContainer.split(node);
         return tezPlanContainer;
@@ -480,7 +480,7 @@ public class TezCompiler extends PhyPlan
      * @param op Operator to remove
      * @throws VisitorException
      */
-    public void removeDupOpTreeOfSplit(TezOperPlan plan, TezOperator op)
+    private void removeDupOpTreeOfSplit(TezOperPlan plan, TezOperator op, boolean isMultiQuery)
             throws VisitorException {
         Stack<TezOperator> stack = new Stack<TezOperator>();
         stack.push(op);
@@ -488,31 +488,47 @@ public class TezCompiler extends PhyPlan
             op = stack.pop();
             List<TezOperator> predecessors = plan.getPredecessors(op);
             if (predecessors != null) {
-                for (TezOperator pred : predecessors) {
-                    List<POSplit> splits = PlanHelper.getPhysicalOperators(
-                            pred.plan, POSplit.class);
-                    if (splits.isEmpty()) {
-                        stack.push(pred);
-                    } else {
-                        for (POSplit split : splits) {
-                            PhysicalPlan planToRemove = null;
-                            for (PhysicalPlan splitPlan : split.getPlans()) {
-                                PhysicalOperator phyOp = splitPlan.getLeaves().get(0);
-                                if (phyOp instanceof POLocalRearrangeTez) {
-                                    POLocalRearrangeTez lr = (POLocalRearrangeTez) phyOp;
-                                    if (lr.getOutputKey().equals(
-                                            op.getOperatorKey().toString())) {
-                                        planToRemove = splitPlan;
-                                        break;
+                if (isMultiQuery) {
+                    for (TezOperator pred : predecessors) {
+                        if (!pred.isSplitOperator()) {
+                            stack.push(pred);
+                        } else {
+                            List<POSplit> splits = PlanHelper.getPhysicalOperators(
+                                    pred.plan, POSplit.class);
+                            for (POSplit split : splits) {
+                                PhysicalPlan planToRemove = null;
+                                for (PhysicalPlan splitPlan : split.getPlans()) {
+                                    PhysicalOperator phyOp = splitPlan
+                                            .getLeaves().get(0);
+                                    if (phyOp instanceof POLocalRearrangeTez) {
+                                        POLocalRearrangeTez lr = (POLocalRearrangeTez) phyOp;
+                                        if (lr.getOutputKey().equals(
+                                                op.getOperatorKey().toString())) {
+                                            planToRemove = splitPlan;
+                                            break;
+                                        }
                                     }
                                 }
-                            }
-                            if (planToRemove != null) {
-                                split.getPlans().remove(planToRemove);
-                                break;
+                                if (planToRemove != null) {
+                                    split.getPlans().remove(planToRemove);
+                                    break;
+                                }
                             }
                         }
                     }
+                } else {
+                    for (TezOperator pred : predecessors) {
+                        // Remove everything till we encounter another split
+                        if (!pred.isSplitOperator()) {
+                            stack.push(pred);
+                        } else {
+                            // If split operator, just remove from the output
+                            POValueOutputTez valueOut = (POValueOutputTez)pred.plan.getLeaves().get(0);
+                            valueOut.removeOutputKey(op.getOperatorKey().toString());
+                            //TODO Handle shared edge when available in Tez
+                            pred.outEdges.remove(op.getOperatorKey().toString());
+                        }
+                    }
                 }
             }
             plan.remove(op);
@@ -520,6 +536,42 @@ public class TezCompiler extends PhyPlan
     }
 
     /**
+     * In case of mulitple levels of split, after removing duplicate tree we need to reset
+     * input of operators in the old tree as some of the inputs of the PhysicalOperator in
+     * original tree will now be overwritten and referring to operators in
+     * duplicate tree. For eg: POFilter inputs will refer to the duplicate tree's
+     * POValueInputTez even though it is connected to a original split tree's POValueInputTez
+     */
+    private void resetInputsOfPredecessors(TezOperPlan plan, TezOperator op) {
+        Stack<TezOperator> stack = new Stack<TezOperator>();
+        stack.push(op);
+        while (!stack.isEmpty()) {
+            op = stack.pop();
+            List<TezOperator> predecessors = plan.getPredecessors(op);
+            if (predecessors != null) {
+                for (TezOperator pred : predecessors) {
+                    resetInputs(pred.plan, pred.plan.getLeaves());
+                    if (!pred.isSplitOperator()) {
+                        stack.push(pred);
+                    }
+                }
+            }
+        }
+    }
+
+    private void resetInputs(PhysicalPlan plan, List<PhysicalOperator> ops) {
+        for (PhysicalOperator op : ops) {
+            List<PhysicalOperator> preds = plan.getPredecessors(op);
+            if (preds != null) {
+                for (PhysicalOperator pred : preds) {
+                    pred.setInputs(plan.getPredecessors(pred));
+                    resetInputs(plan, plan.getPredecessors(pred));
+                }
+            }
+        }
+    }
+
+    /**
      * Merges the TezOperators in the compiledInputs into a single merged
      * TezOperator.
      *
@@ -686,7 +738,7 @@ public class TezCompiler extends PhyPlan
         pkg.getPkgr().setDistinct(true);
         plan.addAsLeaf(pkg);
 
-        POProject project = new POProject(new OperatorKey(scope, nig.getNextNodeId(scope)));
+        POProject project = new POProject(OperatorKey.genOpKey(scope));
         project.setResultType(DataType.TUPLE);
         project.setStar(false);
         project.setColumn(0);
@@ -699,7 +751,7 @@ public class TezCompiler extends PhyPlan
     @Override
     public void visitFilter(POFilter op) throws VisitorException {
         try {
-            if (curTezOp.isSplitSubPlan()) {
+            if (curTezOp.isSplitSubPlan() || curTezOp.getSplitParent() != null) {
                 // Do not add the filter. Refer NoopFilterRemover.java of MR
                 PhysicalPlan filterPlan = op.getPlan();
                 if (filterPlan.size() == 1) {
@@ -841,7 +893,7 @@ public class TezCompiler extends PhyPlan
             curTezOp.plan.addAsLeaf(forEach);
 
             if (!pigContext.inIllustrator) {
-                POLimit limitCopy = new POLimit(new OperatorKey(scope, nig.getNextNodeId(scope)));
+                POLimit limitCopy = new POLimit(OperatorKey.genOpKey(scope));
                 limitCopy.setAlias(op.getAlias());
                 limitCopy.setLimit(op.getLimit());
                 limitCopy.setLimitPlan(op.getLimitPlan());
@@ -1281,7 +1333,7 @@ public class TezCompiler extends PhyPlan
             POPackage pkg = getPackage(1, DataType.BYTEARRAY);
             curTezOp.plan.add(pkg);
 
-            POProject project = new POProject(new OperatorKey(scope, nig.getNextNodeId(scope)));
+            POProject project = new POProject(OperatorKey.genOpKey(scope));
             project.setResultType(DataType.BAG);
             project.setStar(false);
             project.setColumn(1);
@@ -1327,7 +1379,7 @@ public class TezCompiler extends PhyPlan
             // parallelism of POPartitionRearrange to -1, so its parallelism
             // will be determined by the size of streaming table.
             POPartitionRearrangeTez pr =
-                    new POPartitionRearrangeTez(new OperatorKey(scope, nig.getNextNodeId(scope)));
+                    new POPartitionRearrangeTez(OperatorKey.genOpKey(scope));
             try {
                 pr.setIndex(1);
             } catch (ExecException e) {
@@ -1349,7 +1401,7 @@ public class TezCompiler extends PhyPlan
 
             // Create POGlobalRearrange
             POGlobalRearrange gr =
-                    new POGlobalRearrange(new OperatorKey(scope, nig.getNextNodeId(scope)), rp);
+                    new POGlobalRearrange(OperatorKey.genOpKey(scope), rp);
             // Skewed join has its own special partitioner
             gr.setResultType(DataType.TUPLE);
             gr.visit(this);
@@ -1376,7 +1428,7 @@ public class TezCompiler extends PhyPlan
             // Add corresponding POProjects
             for (int i=0; i < 2; i++) {
                 ep = new PhysicalPlan();
-                POProject prj = new POProject(new OperatorKey(scope, nig.getNextNodeId(scope)));
+                POProject prj = new POProject(OperatorKey.genOpKey(scope));
                 prj.setColumn(i+1);
                 prj.setOverloaded(false);
                 prj.setResultType(DataType.BAG);
@@ -1390,7 +1442,7 @@ public class TezCompiler extends PhyPlan
             }
 
             POForEach fe =
-                    new POForEach(new OperatorKey(scope, nig.getNextNodeId(scope)), -1, eps, flat);
+                    new POForEach(OperatorKey.genOpKey(scope), -1, eps, flat);
             fe.setResultType(DataType.TUPLE);
             fe.visit(this);
 
@@ -1832,7 +1884,7 @@ public class TezCompiler extends PhyPlan
         }
 
         POIdentityInOutTez identityInOutTez = new POIdentityInOutTez(
-                new OperatorKey(scope, nig.getNextNodeId(scope)),
+                OperatorKey.genOpKey(scope),
                 inputOperRearrange);
         identityInOutTez.setInputKey(inputOper.getOperatorKey().toString());
         oper1.plan.addAsLeaf(identityInOutTez);
@@ -1845,7 +1897,7 @@ public class TezCompiler extends PhyPlan
         identityInOutTez.setOutputKey(oper2.getOperatorKey().toString());
 
         if (limit!=-1) {
-            POPackage pkg_c = new POPackage(new OperatorKey(scope, nig.getNextNodeId(scope)));
+            POPackage pkg_c = new POPackage(OperatorKey.genOpKey(scope));
             pkg_c.setPkgr(new LitePackager());
             pkg_c.getPkgr().setKeyType((fields == null || fields.length > 1) ? DataType.TUPLE : keyType);
             pkg_c.setNumInps(1);
@@ -1892,7 +1944,7 @@ public class TezCompiler extends PhyPlan
             combinePlan.addAsLeaf(lr_c2);
         }
 
-        POPackage pkg = new POPackage(new OperatorKey(scope, nig.getNextNodeId(scope)));
+        POPackage pkg = new POPackage(OperatorKey.genOpKey(scope));
         pkg.setPkgr(new LitePackager());
         pkg.getPkgr().setKeyType((fields == null || fields.length > 1) ? DataType.TUPLE : keyType);
         pkg.setNumInps(1);
@@ -1937,7 +1989,7 @@ public class TezCompiler extends PhyPlan
                 throw new PlanException(msg, errCode, PigException.BUG, ve);
             }
 
-            POLocalRearrangeTez lr = new POLocalRearrangeTez(new OperatorKey(scope, nig.getNextNodeId(scope)));
+            POLocalRearrangeTez lr = new POLocalRearrangeTez(OperatorKey.genOpKey(scope));
             POLocalRearrangeTez lrSample = localRearrangeFactory.create(LocalRearrangeType.NULL);
 
             TezOperator prevOper = endSingleInputWithStoreAndSample(op, lr, lrSample, keyType, fields);
@@ -2002,31 +2054,65 @@ public class TezCompiler extends PhyPlan
     @Override
     public void visitSplit(POSplit op) throws VisitorException {
         try {
-            if (splitsSeen.containsKey(op.getOperatorKey())) {
-                // Since the plan for this split already exists in the tez plan,
-                // discard the hierarchy or tez operators we constructed so far
-                // till we encountered the split in this tree
-                removeDupOpTreeOfSplit(tezPlan, curTezOp);
-                curTezOp = startNew(op.getOperatorKey());
-            } else {
-                nonBlocking(op);
-                if(curTezOp.isSplitSubPlan()) {
-                    // Split followed by another split
-                    // Set inputs to null as POSplit will attach input to roots
-                    for (PhysicalOperator root : curTezOp.plan.getRoots()) {
-                        root.setInputs(null);
+            boolean isMultiQuery = "true".equalsIgnoreCase(pigContext
+                    .getProperties().getProperty(PigConfiguration.OPT_MULTIQUERY, "true"));
+
+            if (isMultiQuery) {
+                if (splitsSeen.containsKey(op.getOperatorKey())) {
+                    // Since the plan for this split already exists in the tez plan,
+                    // discard the hierarchy or tez operators we constructed so far
+                    // till we encountered the split in this tree
+                    removeDupOpTreeOfSplit(tezPlan, curTezOp, isMultiQuery);
+                    curTezOp = startNew(op.getOperatorKey());
+                } else {
+                    nonBlocking(op);
+                    if(curTezOp.isSplitSubPlan()) {
+                        // Split followed by another split
+                        // Set inputs to null as POSplit will attach input to roots
+                        for (PhysicalOperator root : curTezOp.plan.getRoots()) {
+                            root.setInputs(null);
+                        }
+                        TezOperator splitOp = splitsSeen.get(curTezOp.getSplitOperatorKey());
+                        POSplit split = findPOSplit(splitOp, curTezOp.getSplitOperatorKey());
+                        split.addPlan(curTezOp.plan);
+                        addSubPlanPropertiesToParent(splitOp, curTezOp);
+                        splitsSeen.put(op.getOperatorKey(), splitOp);
+                        phyToTezOpMap.put(op, splitOp);
+                    } else {
+                        curTezOp.setSplitOperator(true);
+                        splitsSeen.put(op.getOperatorKey(), curTezOp);
+                        phyToTezOpMap.put(op, curTezOp);
                     }
-                    TezOperator splitOp = splitsSeen.get(curTezOp.getSplitOperatorKey());
-                    POSplit split = findPOSplit(splitOp, curTezOp.getSplitOperatorKey());
-                    split.addPlan(curTezOp.plan);
-                    addSubPlanPropertiesToParent(splitOp, curTezOp);
+                    curTezOp = startNew(op.getOperatorKey());
+                }
+            } else {
+                TezOperator splitOp = curTezOp;
+                POValueOutputTez output = null;
+                if (splitsSeen.containsKey(op.getOperatorKey())) {
+                    removeDupOpTreeOfSplit(tezPlan, curTezOp, isMultiQuery);
+                    splitOp = splitsSeen.get(op.getOperatorKey());
+                    resetInputsOfPredecessors(tezPlan, splitOp);
+                    output = (POValueOutputTez)splitOp.plan.getLeaves().get(0);
+                } else {
+                    splitOp.setSplitOperator(true);
                     splitsSeen.put(op.getOperatorKey(), splitOp);
                     phyToTezOpMap.put(op, splitOp);
-                } else {
-                    splitsSeen.put(op.getOperatorKey(), curTezOp);
-                    phyToTezOpMap.put(op, curTezOp);
+                    output = new POValueOutputTez(OperatorKey.genOpKey(scope));
+                    splitOp.plan.addAsLeaf(output);
                 }
-                curTezOp = startNew(op.getOperatorKey());
+                curTezOp = getTezOp();
+                curTezOp.setSplitParent(splitOp.getOperatorKey());
+                tezPlan.add(curTezOp);
+                output.addOutputKey(curTezOp.getOperatorKey().toString());
+                TezEdgeDescriptor edge = TezCompilerUtil.connect(tezPlan, splitOp, curTezOp);
+                //TODO shared edge once support is available in Tez
+                edge.dataMovementType = DataMovementType.ONE_TO_ONE;
+                edge.outputClassName = OnFileUnorderedKVOutput.class.getName();
+                edge.inputClassName = ShuffledUnorderedKVInput.class.getName();
+                curTezOp.setRequestedParallelismByReference(splitOp);
+                POValueInputTez input = new POValueInputTez(OperatorKey.genOpKey(scope));
+                input.setInputKey(splitOp.getOperatorKey().toString());
+                curTezOp.plan.addAsLeaf(input);
             }
         } catch (Exception e) {
             int errCode = 2034;
@@ -2102,7 +2188,7 @@ public class TezCompiler extends PhyPlan
     private POPackage getPackage(int numOfInputs, byte keyType) {
         // The default value of boolean is false
         boolean[] inner = new boolean[numOfInputs];
-        POPackage pkg = new POPackage(new OperatorKey(scope, nig.getNextNodeId(scope)));
+        POPackage pkg = new POPackage(OperatorKey.genOpKey(scope));
         pkg.getPkgr().setInner(inner);
         pkg.getPkgr().setKeyType(keyType);
         pkg.setNumInps(numOfInputs);
@@ -2110,7 +2196,7 @@ public class TezCompiler extends PhyPlan
     }
 
     private TezOperator getTezOp() {
-        return new TezOperator(new OperatorKey(scope, nig.getNextNodeId(scope)));
+        return new TezOperator(OperatorKey.genOpKey(scope));
     }
 }
 

Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java?rev=1565509&r1=1565508&r2=1565509&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java Fri Feb  7 01:18:14 2014
@@ -74,7 +74,7 @@ import org.apache.pig.backend.hadoop.exe
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.util.PlanHelper;
 import org.apache.pig.backend.hadoop.executionengine.tez.TezPOPackageAnnotator.LoRearrangeDiscoverer;
 import org.apache.pig.backend.hadoop.executionengine.tez.util.MRToTezHelper;
-import org.apache.pig.backend.hadoop.executionengine.tez.util.SecurityHelper;
+import org.apache.pig.data.BinSedesTuple;
 import org.apache.pig.data.DataType;
 import org.apache.pig.impl.PigContext;
 import org.apache.pig.impl.io.FileLocalizer;
@@ -239,7 +239,19 @@ public class TezDagBuilder extends TezOp
                     edge.partitionerClass.getName());
         }
 
+        if (from.plan.getLeaves().get(0) instanceof POValueOutputTez) {
+            conf.setIfUnset(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_OUTPUT_KEY_CLASS,
+                    POValueOutputTez.EmptyWritable.class.getName());
+            conf.setIfUnset(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_OUTPUT_VALUE_CLASS,
+                    BinSedesTuple.class.getName());
+            conf.setIfUnset(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_INPUT_KEY_CLASS,
+                    POValueOutputTez.EmptyWritable.class.getName());
+            conf.setIfUnset(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_INPUT_VALUE_CLASS,
+                    BinSedesTuple.class.getName());
+        }
+
         MRToTezHelper.convertMRToTezRuntimeConf(conf, globalConf);
+
         in.setUserPayload(TezUtils.createUserPayloadFromConf(conf));
         out.setUserPayload(TezUtils.createUserPayloadFromConf(conf));
 

Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezOperPlan.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezOperPlan.java?rev=1565509&r1=1565508&r2=1565509&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezOperPlan.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezOperPlan.java Fri Feb  7 01:18:14 2014
@@ -130,5 +130,14 @@ public class TezOperPlan extends Operato
             addExtraResource(entry.getKey(), entry.getValue());
         }
     }
+
+    @Override
+    public void remove(TezOperator op) {
+        //TODO Cleanup outEdges of predecessors and inEdges of successors
+        //TezDAGBuilder would not create the edge. So low priority
+        super.remove(op);
+    }
+
+
 }
 

Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezOperator.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezOperator.java?rev=1565509&r1=1565508&r2=1565509&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezOperator.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezOperator.java Fri Feb  7 01:18:14 2014
@@ -52,7 +52,11 @@ public class TezOperator extends Operato
     // TODO: We need to specify parallelism per vertex in Tez. For now, we set
     // them all to 1.
     // Use AtomicInteger for access by reference and being able to reset in
-    // TezDAGBuilder based on number of input splits. We just need mutability and not concurrency
+    // TezDAGBuilder based on number of input splits.
+    // We just need mutability and not concurrency
+    // This is to ensure that vertexes with 1-1 edge have same parallelism
+    // even when parallelism of source vertex changes.
+    // Can change to int and set to -1 if TEZ-800 gets fixed.
     private AtomicInteger requestedParallelism = new AtomicInteger(-1);
 
     // TODO: When constructing Tez vertex, we have to specify how much resource
@@ -63,9 +67,17 @@ public class TezOperator extends Operato
     //int requestedCpu = 1;
 
     // Presence indicates that this TezOper is sub-plan of a POSplit.
+    // This is in-case when multi-query is turned on
     // Only POStore or POLocalRearrange leaf can be a sub-plan of POSplit
     private OperatorKey splitOperatorKey = null;
 
+    // This indicates that this TezOper has POSplit as a parent.
+    // This is the case where multi-query is turned off.
+    private OperatorKey splitParent = null;
+
+    // This indicates that this TezOper is a split operator
+    private boolean isSplitOper;
+
     // Indicates that the plan creation is complete
     boolean closed = false;
 
@@ -171,6 +183,22 @@ public class TezOperator extends Operato
         return splitOperatorKey != null;
     }
 
+    public OperatorKey getSplitParent() {
+        return splitParent;
+    }
+
+    public void setSplitParent(OperatorKey splitParent) {
+        this.splitParent = splitParent;
+    }
+
+    public boolean isSplitOperator() {
+        return isSplitOper;
+    }
+
+    public void setSplitOperator(boolean isSplitOperator) {
+        this.isSplitOper = isSplitOperator;
+    }
+
     public boolean isClosed() {
         return closed;
     }

Modified: pig/branches/tez/test/org/apache/pig/test/TestBatchAliases.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/test/org/apache/pig/test/TestBatchAliases.java?rev=1565509&r1=1565508&r2=1565509&view=diff
==============================================================================
--- pig/branches/tez/test/org/apache/pig/test/TestBatchAliases.java (original)
+++ pig/branches/tez/test/org/apache/pig/test/TestBatchAliases.java Fri Feb  7 01:18:14 2014
@@ -24,6 +24,7 @@ import java.io.IOException;
 import java.util.Properties;
 
 import org.apache.pig.ExecType;
+import org.apache.pig.PigConfiguration;
 import org.apache.pig.PigServer;
 import org.apache.pig.backend.executionengine.ExecJob;
 import org.apache.pig.impl.io.FileLocalizer;
@@ -37,7 +38,7 @@ public class TestBatchAliases {
 
     @Before
     public void setUp() throws Exception {
-        System.setProperty("opt.multiquery", ""+true);
+        System.setProperty(PigConfiguration.OPT_MULTIQUERY, ""+true);
         myPig = new PigServer(ExecType.LOCAL, new Properties());
         deleteOutputFiles();
     }

Modified: pig/branches/tez/test/org/apache/pig/test/TestGrunt.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/test/org/apache/pig/test/TestGrunt.java?rev=1565509&r1=1565508&r2=1565509&view=diff
==============================================================================
--- pig/branches/tez/test/org/apache/pig/test/TestGrunt.java (original)
+++ pig/branches/tez/test/org/apache/pig/test/TestGrunt.java Fri Feb  7 01:18:14 2014
@@ -41,6 +41,7 @@ import org.apache.log4j.FileAppender;
 import org.apache.log4j.Level;
 import org.apache.log4j.PatternLayout;
 import org.apache.pig.ExecType;
+import org.apache.pig.PigConfiguration;
 import org.apache.pig.PigException;
 import org.apache.pig.PigServer;
 import org.apache.pig.backend.executionengine.ExecException;
@@ -63,7 +64,7 @@ public class TestGrunt {
 
     @BeforeClass
     public static void oneTimeSetup() throws Exception {
-        cluster.setProperty("opt.multiquery","true");
+        cluster.setProperty(PigConfiguration.OPT_MULTIQUERY,"true");
     }
 
     @AfterClass

Modified: pig/branches/tez/test/org/apache/pig/test/TestLoad.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/test/org/apache/pig/test/TestLoad.java?rev=1565509&r1=1565508&r2=1565509&view=diff
==============================================================================
--- pig/branches/tez/test/org/apache/pig/test/TestLoad.java (original)
+++ pig/branches/tez/test/org/apache/pig/test/TestLoad.java Fri Feb  7 01:18:14 2014
@@ -35,6 +35,7 @@ import java.util.Properties;
 
 import org.apache.pig.ExecType;
 import org.apache.pig.FuncSpec;
+import org.apache.pig.PigConfiguration;
 import org.apache.pig.PigServer;
 import org.apache.pig.backend.datastorage.DataStorage;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
@@ -64,18 +65,18 @@ public class TestLoad {
 
     PigContext pc;
     PigServer[] servers;
-    
+
     static MiniCluster cluster = MiniCluster.buildCluster();
-    
+
     @Before
     public void setUp() throws Exception {
         FileLocalizer.deleteTempFiles();
-        servers = new PigServer[] { 
+        servers = new PigServer[] {
                     new PigServer(ExecType.MAPREDUCE, cluster.getProperties()),
                     new PigServer(ExecType.LOCAL, new Properties())
-        };       
+        };
     }
-        
+
     @Test
     public void testGetNextTuple() throws IOException {
         pc = servers[0].getPigContext();
@@ -88,10 +89,10 @@ public class TestLoad {
         POLoad ld = GenPhyOp.topLoadOp();
         ld.setLFile(inpFSpec);
         ld.setPc(pc);
-        
+
         DataBag inpDB = DefaultBagFactory.getInstance().newDefaultBag();
         BufferedReader br = new BufferedReader(new FileReader("test/org/apache/pig/test/data/InputFiles/passwd"));
-        
+
         for(String line = br.readLine();line!=null;line=br.readLine()){
             String[] flds = line.split(":",-1);
             Tuple t = new DefaultTuple();
@@ -113,7 +114,7 @@ public class TestLoad {
     public static void oneTimeTearDown() throws Exception {
         cluster.shutDown();
     }
-    
+
     @Test
     public void testLoadRemoteRel() throws Exception {
         for (PigServer pig : servers) {
@@ -144,7 +145,7 @@ public class TestLoad {
         pc = servers[0].getPigContext();
         boolean noConversionExpected = true;
         checkLoadPath("hdfs:/tmp/test","hdfs:/tmp/test", noConversionExpected);
-        
+
         // check if a location 'hdfs:<abs path>' can actually be read using PigStorage
         String[] inputFileNames = new String[] {
                 "/tmp/TestLoad-testLoadRemoteAbsSchema-input.txt"};
@@ -198,18 +199,18 @@ public class TestLoad {
         boolean noConversionExpected = true;
         checkLoadPath("hdfs:/tmp/test,hdfs:/tmp/test2,hdfs:/tmp/test3",
                 "hdfs:/tmp/test,hdfs:/tmp/test2,hdfs:/tmp/test3", noConversionExpected );
-        
-        // check if a location 'hdfs:<abs path>,hdfs:<abs path>' can actually be 
+
+        // check if a location 'hdfs:<abs path>,hdfs:<abs path>' can actually be
         // read using PigStorage
         String[] inputFileNames = new String[] {
                 "/tmp/TestLoad-testCommaSeparatedString3-input1.txt",
                 "/tmp/TestLoad-testCommaSeparatedString3-input2.txt"};
-        String inputString = "hdfs:" + inputFileNames[0] + ",hdfs:" + 
+        String inputString = "hdfs:" + inputFileNames[0] + ",hdfs:" +
         inputFileNames[1];
         testLoadingMultipleFiles(inputFileNames, inputString);
-        
+
     }
-    
+
     @Test
     public void testCommaSeparatedString4() throws Exception {
         for (PigServer pig : servers) {
@@ -224,12 +225,12 @@ public class TestLoad {
             pc = pig.getPigContext();
             checkLoadPath("/usr/pig/{a,c},usr/pig/b","/usr/pig/{a,c},/tmp/usr/pig/b");
         }
-       
-        // check if a location '<abs path>,<relative path>' can actually be 
+
+        // check if a location '<abs path>,<relative path>' can actually be
         // read using PigStorage
         String loadLocationString = "/tmp/TestLoad-testCommaSeparatedStringMixed-input{1,2}.txt," +
         "TestLoad-testCommaSeparatedStringMixed-input3.txt"; // current working dir is set to /tmp in checkLoadPath()
-       
+
         String[] inputFileNames = new String[] {
                 "/tmp/TestLoad-testCommaSeparatedStringMixed-input1.txt",
                 "/tmp/TestLoad-testCommaSeparatedStringMixed-input2.txt",
@@ -237,7 +238,7 @@ public class TestLoad {
         pc = servers[0].getPigContext(); // test in map reduce mode
         testLoadingMultipleFiles(inputFileNames, loadLocationString);
     }
-    
+
     @Test
     public void testCommaSeparatedString6() throws Exception {
         for (PigServer pig : servers) {
@@ -245,7 +246,7 @@ public class TestLoad {
             checkLoadPath("usr/pig/{a,c},/usr/pig/b","/tmp/usr/pig/{a,c},/usr/pig/b");
         }
     }
-    
+
     @Test
     public void testNonDfsLocation() throws Exception {
         String nonDfsUrl = "har:///user/foo/f.har";
@@ -256,11 +257,11 @@ public class TestLoad {
         nonDfsUrl = nonDfsUrl.replaceFirst("/$", "");
         assertEquals(nonDfsUrl, load.getFileSpec().getFileName());
     }
-    
+
     @SuppressWarnings("unchecked")
-    private void testLoadingMultipleFiles(String[] inputFileNames, 
+    private void testLoadingMultipleFiles(String[] inputFileNames,
             String loadLocationString) throws IOException, ParserException {
-        
+
         String[][] inputStrings = new String[][] {
                 new String[] { "hello\tworld"},
                 new String[] { "bye\tnow"},
@@ -270,7 +271,7 @@ public class TestLoad {
                 (Tuple) Util.getPigConstant("('hello', 'world')"),
                 (Tuple) Util.getPigConstant("('bye', 'now')"),
                 (Tuple) Util.getPigConstant("('all', 'good')")});
-        
+
         List<Tuple> expectedBasedOnNumberOfInputs = new ArrayList<Tuple>();
         for(int i = 0; i < inputFileNames.length; i++) {
             Util.createInputFile(pc, inputFileNames[i], inputStrings[i]);
@@ -280,7 +281,7 @@ public class TestLoad {
             servers[0].registerQuery(" a = load '" + loadLocationString + "' as " +
                     "(s1:chararray, s2:chararray);");
             Iterator<Tuple> it = servers[0].openIterator("a");
-            
+
             List<Tuple> actual = new ArrayList<Tuple>();
             while(it.hasNext()) {
                 actual.add(it.next());
@@ -294,40 +295,40 @@ public class TestLoad {
             }
         }
     }
-    
+
     private void checkLoadPath(String orig, String expected) throws Exception {
         checkLoadPath(orig, expected, false);
     }
 
-    private void checkLoadPath(String orig, String expected, 
+    private void checkLoadPath(String orig, String expected,
             boolean noConversionExpected) throws Exception {
-        
+
         boolean[] multiquery = {true, false};
-        
+
         for (boolean b : multiquery) {
-            pc.getProperties().setProperty("opt.multiquery", "" + b);
-                    
+            pc.getProperties().setProperty(PigConfiguration.OPT_MULTIQUERY, "" + b);
+
             DataStorage dfs = pc.getDfs();
             dfs.setActiveContainer(dfs.asContainer("/tmp"));
             Map<String, String> fileNameMap = new HashMap<String, String>();
-            
+
             QueryParserDriver builder = new QueryParserDriver(pc, "Test-Load", fileNameMap);
-            
+
             String query = "a = load '"+orig+"';";
             LogicalPlan lp = builder.parse(query);
             assertTrue(lp.size()>0);
             Operator op = lp.getSources().get(0);
-            
+
             assertTrue(op instanceof LOLoad);
             LOLoad load = (LOLoad)op;
-    
+
             String p = load.getFileSpec().getFileName();
             System.err.println("DEBUG: p:" + p + " expected:" + expected +", exectype:" + pc.getExecType());
             if(noConversionExpected) {
                 assertEquals(expected, p);
             } else  {
                 String protocol = pc.getExecType() == ExecType.MAPREDUCE ? "hdfs" : "file";
-                // regex : A word character, i.e. [a-zA-Z_0-9] or '-' followed by ':' then any characters 
+                // regex : A word character, i.e. [a-zA-Z_0-9] or '-' followed by ':' then any characters
                 String regex = "[\\-\\w:\\.]";
                 assertTrue(p.matches(".*" + protocol + "://" + regex + "*.*"));
                 assertEquals(expected, p.replaceAll(protocol + "://" + regex + "*/", "/"));

Modified: pig/branches/tez/test/org/apache/pig/test/TestMultiQuery.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/test/org/apache/pig/test/TestMultiQuery.java?rev=1565509&r1=1565508&r2=1565509&view=diff
==============================================================================
--- pig/branches/tez/test/org/apache/pig/test/TestMultiQuery.java (original)
+++ pig/branches/tez/test/org/apache/pig/test/TestMultiQuery.java Fri Feb  7 01:18:14 2014
@@ -26,6 +26,7 @@ import java.util.List;
 import java.util.Properties;
 
 import org.apache.pig.ExecType;
+import org.apache.pig.PigConfiguration;
 import org.apache.pig.PigServer;
 import org.apache.pig.backend.executionengine.ExecJob;
 import org.apache.pig.data.Tuple;
@@ -50,17 +51,17 @@ public class TestMultiQuery {
         Util.copyFromLocalToLocal(
                 "test/org/apache/pig/test/data/passwd2", "passwd2");
         Properties props = new Properties();
-        props.setProperty("opt.multiquery", ""+true);
+        props.setProperty(PigConfiguration.OPT_MULTIQUERY, ""+true);
         myPig = new PigServer(ExecType.LOCAL, props);
     }
-    
+
     @AfterClass
     public static void tearDownAfterClass() throws Exception {
         Util.deleteFile(new PigContext(ExecType.LOCAL, new Properties()), "passwd");
         Util.deleteFile(new PigContext(ExecType.LOCAL, new Properties()), "passwd2");
         deleteOutputFiles();
     }
-    
+
     @Before
     public void setUp() throws Exception {
         deleteOutputFiles();
@@ -75,9 +76,9 @@ public class TestMultiQuery {
     public void testMultiQueryJiraPig1438() throws Exception {
 
         // test case: merge multiple distinct jobs
-        
+
         String INPUT_FILE = "abc";
-        
+
         String[] inputData = {
                 "1\t2\t3",
                 "2\t3\t4",
@@ -85,9 +86,9 @@ public class TestMultiQuery {
                 "2\t3\t4",
                 "1\t2\t3"
         };
-        
+
         Util.createLocalInputFile(INPUT_FILE, inputData);
-       
+
         myPig.setBatchOn();
 
         myPig.registerQuery("A = load '" + INPUT_FILE + "' as (col1:int, col2:int, col3:int);");
@@ -98,58 +99,58 @@ public class TestMultiQuery {
         myPig.registerQuery("D1 = foreach C1 generate col1, col2;");
         myPig.registerQuery("D2 = foreach C2 generate col2, col3;");
         myPig.registerQuery("store D1 into 'output1';");
-        myPig.registerQuery("store D2 into 'output2';");            
-        
+        myPig.registerQuery("store D2 into 'output2';");
+
         myPig.executeBatch();
-        
-        myPig.registerQuery("E = load 'output1' as (a:int, b:int);");            
+
+        myPig.registerQuery("E = load 'output1' as (a:int, b:int);");
         Iterator<Tuple> iter = myPig.openIterator("E");
 
         List<Tuple> expectedResults = Util.getTuplesFromConstantTupleStrings(
-                new String[] { 
+                new String[] {
                         "(1,2)",
                         "(2,3)"
                 });
-        
+
         int counter = 0;
         while (iter.hasNext()) {
-            assertEquals(expectedResults.get(counter++).toString(), iter.next().toString());      
+            assertEquals(expectedResults.get(counter++).toString(), iter.next().toString());
         }
         assertEquals(expectedResults.size(), counter);
-                    
-        myPig.registerQuery("E = load 'output2' as (a:int, b:int);");            
+
+        myPig.registerQuery("E = load 'output2' as (a:int, b:int);");
         iter = myPig.openIterator("E");
 
         expectedResults = Util.getTuplesFromConstantTupleStrings(
-                new String[] { 
+                new String[] {
                         "(2,3)",
                         "(3,4)"
                 });
-        
+
         counter = 0;
         while (iter.hasNext()) {
-            assertEquals(expectedResults.get(counter++).toString(), iter.next().toString());      
+            assertEquals(expectedResults.get(counter++).toString(), iter.next().toString());
         }
 
         assertEquals(expectedResults.size(), counter);
     }
-    
+
     @Test
     public void testMultiQueryJiraPig1252() throws Exception {
 
         // test case: Problems with secondary key optimization and multiquery
         // diamond optimization
-        
+
         String INPUT_FILE = "abc";
-        
+
         String[] inputData = {
             "1\t2\t3",
             "2\t3\t4",
             "3\t\t5",
             "5\t6\t6",
-            "6\t\t7"       
+            "6\t\t7"
         };
-        
+
         Util.createLocalInputFile(INPUT_FILE, inputData);
 
         myPig.setBatchOn();
@@ -161,20 +162,20 @@ public class TestMultiQuery {
         myPig.registerQuery("split B into C if splitcond !=  '', D if splitcond == '';");
         myPig.registerQuery("E = group C by splitcond;");
         myPig.registerQuery("F = foreach E { orderedData = order C by $1, $0; generate flatten(orderedData); };");
-   
+
         Iterator<Tuple> iter = myPig.openIterator("F");
 
         List<Tuple> expectedResults = Util.getTuplesFromConstantTupleStrings(
-                new String[] { 
+                new String[] {
                         "(1,2)",
                         "(2,3)",
                         "(3,5)",
                         "(5,6)"
                 });
-        
+
         int counter = 0;
         while (iter.hasNext()) {
-            assertEquals(expectedResults.get(counter++).toString(), iter.next().toString());                  
+            assertEquals(expectedResults.get(counter++).toString(), iter.next().toString());
         }
 
         assertEquals(expectedResults.size(), counter);
@@ -184,22 +185,22 @@ public class TestMultiQuery {
     public void testMultiQueryJiraPig1169() throws Exception {
 
         // test case: Problems with some top N queries
-        
+
         String INPUT_FILE = "abc";
-        
+
         String[] inputData = {
                 "1\t2\t3",
                 "2\t3\t4",
                 "3\t4\t5",
                 "5\t6\t7",
-                "6\t7\t8"       
+                "6\t7\t8"
         };
-        
+
         Util.createLocalInputFile(INPUT_FILE, inputData);
-       
+
         myPig.setBatchOn();
 
-        myPig.registerQuery("A = load '" + INPUT_FILE 
+        myPig.registerQuery("A = load '" + INPUT_FILE
                 + "' as (a:int, b, c);");
         myPig.registerQuery("A1 = Order A by a desc parallel 3;");
         myPig.registerQuery("A2 = limit A1 2;");
@@ -209,105 +210,105 @@ public class TestMultiQuery {
         myPig.executeBatch();
 
         myPig.registerQuery("B = load 'output2' as (a:int, b, c);");
-        
+
         Iterator<Tuple> iter = myPig.openIterator("B");
 
         List<Tuple> expectedResults = Util.getTuplesFromConstantTupleStrings(
-                new String[] { 
+                new String[] {
                         "(6,7,8)",
                         "(5,6,7)"
                 });
-        
+
         int counter = 0;
         while (iter.hasNext()) {
-            assertEquals(expectedResults.get(counter++).toString(), iter.next().toString());      
+            assertEquals(expectedResults.get(counter++).toString(), iter.next().toString());
         }
 
         assertEquals(expectedResults.size(), counter);
     }
-  
+
     @Test
     public void testMultiQueryJiraPig1171() throws Exception {
 
         // test case: Problems with some top N queries
-        
+
         String INPUT_FILE = "abc";
-        
+
         String[] inputData = {
             "1\tapple\t3",
             "2\torange\t4",
-            "3\tpersimmon\t5"    
+            "3\tpersimmon\t5"
         };
-        
+
         Util.createLocalInputFile(INPUT_FILE, inputData);
 
         myPig.setBatchOn();
 
-        myPig.registerQuery("A = load '" + INPUT_FILE 
+        myPig.registerQuery("A = load '" + INPUT_FILE
                 + "' as (a:long, b, c);");
         myPig.registerQuery("A1 = Order A by a desc;");
         myPig.registerQuery("A2 = limit A1 1;");
-        myPig.registerQuery("B = load '" + INPUT_FILE 
+        myPig.registerQuery("B = load '" + INPUT_FILE
                 + "' as (a:long, b, c);");
         myPig.registerQuery("B1 = Order B by a desc;");
         myPig.registerQuery("B2 = limit B1 1;");
-        
+
         myPig.registerQuery("C = cross A2, B2;");
-        
+
         Iterator<Tuple> iter = myPig.openIterator("C");
 
         List<Tuple> expectedResults = Util.getTuplesFromConstantTupleStrings(
-                new String[] { 
+                new String[] {
                         "(3L,'persimmon',5,3L,'persimmon',5)"
                 });
-        
+
         int counter = 0;
         while (iter.hasNext()) {
-            assertEquals(expectedResults.get(counter++).toString(), iter.next().toString());      
+            assertEquals(expectedResults.get(counter++).toString(), iter.next().toString());
         }
 
         assertEquals(expectedResults.size(), counter);
     }
-    
+
     @Test
     public void testMultiQueryJiraPig1157() throws Exception {
 
         // test case: Sucessive replicated joins do not generate Map Reduce plan and fails due to OOM
-        
+
         String INPUT_FILE = "abc";
         String INPUT_FILE_1 = "abc";
-        
+
         String[] inputData = {
                 "1\tapple\t3",
                 "2\torange\t4",
-                "3\tpersimmon\t5"    
+                "3\tpersimmon\t5"
         };
-            
+
         Util.createLocalInputFile(INPUT_FILE, inputData);
 
         myPig.setBatchOn();
 
-        myPig.registerQuery("A = load '" + INPUT_FILE 
+        myPig.registerQuery("A = load '" + INPUT_FILE
                 + "' as (a:long, b, c);");
         myPig.registerQuery("A1 = FOREACH A GENERATE a;");
         myPig.registerQuery("B = GROUP A1 BY a;");
-        myPig.registerQuery("C = load '" + INPUT_FILE_1 
+        myPig.registerQuery("C = load '" + INPUT_FILE_1
                 + "' as (x:long, y);");
-        myPig.registerQuery("D = JOIN C BY x, B BY group USING 'replicated';");  
-        myPig.registerQuery("E = JOIN A BY a, D by x USING 'replicated';");  
-        
+        myPig.registerQuery("D = JOIN C BY x, B BY group USING 'replicated';");
+        myPig.registerQuery("E = JOIN A BY a, D by x USING 'replicated';");
+
         Iterator<Tuple> iter = myPig.openIterator("E");
 
         List<Tuple> expectedResults = Util.getTuplesFromConstantTupleStrings(
-                new String[] { 
+                new String[] {
                         "(1L,'apple',3,1L,'apple',1L,{(1L)})",
                         "(2L,'orange',4,2L,'orange',2L,{(2L)})",
                         "(3L,'persimmon',5,3L,'persimmon',3L,{(3L)})"
                 });
-        
+
         int counter = 0;
         while (iter.hasNext()) {
-            assertEquals(expectedResults.get(counter++).toString(), iter.next().toString());                  
+            assertEquals(expectedResults.get(counter++).toString(), iter.next().toString());
         }
 
         assertEquals(expectedResults.size(), counter);
@@ -316,7 +317,7 @@ public class TestMultiQuery {
     @Test
     public void testMultiQueryJiraPig1068() throws Exception {
 
-        // test case: COGROUP fails with 'Type mismatch in key from map: 
+        // test case: COGROUP fails with 'Type mismatch in key from map:
         // expected org.apache.pig.impl.io.NullableText, recieved org.apache.pig.impl.io.NullableTuple'
 
         String INPUT_FILE = "pig-1068.txt";
@@ -324,36 +325,36 @@ public class TestMultiQuery {
         String[] inputData = {
             "10\tapple\tlogin\tjar",
             "20\torange\tlogin\tbox",
-            "30\tstrawberry\tquit\tbot"    
+            "30\tstrawberry\tquit\tbot"
         };
-            
+
         Util.createLocalInputFile(INPUT_FILE, inputData);
 
         myPig.setBatchOn();
 
-        myPig.registerQuery("logs = load '" + INPUT_FILE 
+        myPig.registerQuery("logs = load '" + INPUT_FILE
                 + "' as (ts:int, id:chararray, command:chararray, comments:chararray);");
         myPig.registerQuery("SPLIT logs INTO logins IF command == 'login', all_quits IF command == 'quit';");
-        myPig.registerQuery("login_info = FOREACH logins { GENERATE id as id, comments AS client; };");  
+        myPig.registerQuery("login_info = FOREACH logins { GENERATE id as id, comments AS client; };");
         myPig.registerQuery("logins_grouped = GROUP login_info BY (id, client);");
         myPig.registerQuery("count_logins_by_client = FOREACH logins_grouped "
                 + "{ generate group.id AS id, group.client AS client, COUNT($1) AS count; };");
         myPig.registerQuery("all_quits_grouped = GROUP all_quits BY id; ");
         myPig.registerQuery("quits = FOREACH all_quits_grouped { GENERATE FLATTEN(all_quits); };");
         myPig.registerQuery("joined_session_info = COGROUP quits BY id, count_logins_by_client BY id;");
-        
+
         Iterator<Tuple> iter = myPig.openIterator("joined_session_info");
 
         List<Tuple> expectedResults = Util.getTuplesFromConstantTupleStrings(
-                new String[] { 
+                new String[] {
                         "('apple',{},{('apple','jar',1L)})",
                         "('orange',{},{('orange','box',1L)})",
                         "('strawberry',{(30,'strawberry','quit','bot')},{})"
                 });
-        
+
         int counter = 0;
         while (iter.hasNext()) {
-            assertEquals(expectedResults.get(counter++).toString(), iter.next().toString());                
+            assertEquals(expectedResults.get(counter++).toString(), iter.next().toString());
         }
 
         assertEquals(expectedResults.size(), counter);
@@ -364,22 +365,22 @@ public class TestMultiQuery {
 
         myPig.setBatchOn();
 
-        myPig.registerQuery("a = load 'passwd' " 
+        myPig.registerQuery("a = load 'passwd' "
                 + "using PigStorage(':') as (uname:chararray, passwd:chararray, uid:int, gid:int);");
         myPig.registerQuery("split a into plan1 if (uid > 5), plan2 if ( uid < 5);");
         myPig.registerQuery("b = group plan1 by uname;");
-        myPig.registerQuery("c = foreach b { tmp = order plan1 by uid desc; " 
+        myPig.registerQuery("c = foreach b { tmp = order plan1 by uid desc; "
                 + "generate flatten(group) as foo, tmp; };");
         myPig.registerQuery("d = filter c BY foo is not null;");
         myPig.registerQuery("store d into 'output1';");
         myPig.registerQuery("store plan2 into 'output2';");
-         
+
         List<ExecJob> jobs = myPig.executeBatch();
         for (ExecJob job : jobs) {
             assertTrue(job.getStatus() == ExecJob.JOB_STATUS.COMPLETED);
         }
-    }    
-    
+    }
+
     @Test
     public void testMultiQueryJiraPig1114() throws Exception {
 
@@ -390,9 +391,9 @@ public class TestMultiQuery {
         String[] inputData = {
             "10\tjar",
             "20\tbox",
-            "30\tbot"   
+            "30\tbot"
         };
-                
+
         Util.createLocalInputFile(INPUT_FILE, inputData);
 
         myPig.setBatchOn();
@@ -433,39 +434,39 @@ public class TestMultiQuery {
 
         String INPUT_FILE_1 = "set1.txt";
         String INPUT_FILE_2 = "set2.txt";
-        
+
 
         String[] inputData_1 = {
             "login\t0\tjar",
             "login\t1\tbox",
-            "quit\t0\tmany" 
+            "quit\t0\tmany"
         };
-                
+
         Util.createLocalInputFile(INPUT_FILE_1, inputData_1);
-        
+
         String[] inputData_2 = {
             "apple\tlogin\t{(login)}",
             "orange\tlogin\t{(login)}",
-            "strawberry\tquit\t{(login)}"  
+            "strawberry\tquit\t{(login)}"
         };
-                
+
         Util.createLocalInputFile(INPUT_FILE_2, inputData_2);
-            
+
         myPig.setBatchOn();
 
-        myPig.registerQuery("set1 = load '" + INPUT_FILE_1 
+        myPig.registerQuery("set1 = load '" + INPUT_FILE_1
                 + "' USING PigStorage as (a:chararray, b:chararray, c:chararray);");
         myPig.registerQuery("set2 = load '" + INPUT_FILE_2
                 + "' USING PigStorage as (a: chararray, b:chararray, c:bag{});");
-        myPig.registerQuery("set2_1 = FOREACH set2 GENERATE a as f1, b as f2, " 
+        myPig.registerQuery("set2_1 = FOREACH set2 GENERATE a as f1, b as f2, "
                 + "(chararray) 0 as f3;");
         myPig.registerQuery("set2_2 = FOREACH set2 GENERATE a as f1, "
-                + "FLATTEN((IsEmpty(c) ? null : c)) as f2, (chararray) 1 as f3;");  
+                + "FLATTEN((IsEmpty(c) ? null : c)) as f2, (chararray) 1 as f3;");
         myPig.registerQuery("all_set2 = UNION set2_1, set2_2;");
         myPig.registerQuery("joined_sets = JOIN set1 BY (a,b), all_set2 BY (f2,f3);");
-      
+
         List<Tuple> expectedResults = Util.getTuplesFromConstantTupleStrings(
-                new String[] { 
+                new String[] {
                         "('quit','0','many','strawberry','quit','0')",
                         "('login','0','jar','apple','login','0')",
                         "('login','0','jar','orange','login','0')",
@@ -473,7 +474,7 @@ public class TestMultiQuery {
                         "('login','1','box','orange','login','1')",
                         "('login','1','box','strawberry','login','1')"
                 });
-        
+
         Iterator<Tuple> iter = myPig.openIterator("joined_sets");
         int count = 0;
         while (iter.hasNext()) {
@@ -481,11 +482,11 @@ public class TestMultiQuery {
         }
         assertEquals(expectedResults.size(), count);
     }
- 
+
     @Test
     public void testMultiQueryJiraPig1060_2() throws Exception {
 
-        // test case: 
+        // test case:
 
         String INPUT_FILE = "pig-1060.txt";
 
@@ -495,9 +496,9 @@ public class TestMultiQuery {
             "orange\t3",
             "orange\t23",
             "strawberry\t10",
-            "strawberry\t34"  
+            "strawberry\t34"
         };
-                    
+
         Util.createLocalInputFile(INPUT_FILE, inputData);
 
         myPig.setBatchOn();
@@ -530,7 +531,7 @@ public class TestMultiQuery {
         for (ExecJob job : jobs) {
             assertTrue(job.getStatus() == ExecJob.JOB_STATUS.COMPLETED);
         }
-    } 
+    }
 
     @Test
     public void testMultiQueryJiraPig920_2() throws Exception {
@@ -551,27 +552,27 @@ public class TestMultiQuery {
         myPig.registerQuery("g = cogroup d by $0, e by $0;");
         myPig.registerQuery("g1 = foreach g generate group, COUNT(d), COUNT(e);");
         myPig.registerQuery("store g1 into 'output2';");
-         
+
         List<ExecJob> jobs = myPig.executeBatch();
         for (ExecJob job : jobs) {
             assertTrue(job.getStatus() == ExecJob.JOB_STATUS.COMPLETED);
         }
-    }            
-    
+    }
+
     @Test
     public void testMultiQueryJiraPig920_3() throws Exception {
 
         // test case: execution of a simple diamond query
-        
+
         String INPUT_FILE = "pig-920.txt";
-        
+
         String[] inputData = {
             "apple\tapple\t100\t10",
             "apple\tapple\t200\t20",
             "orange\torange\t100\t10",
-            "orange\torange\t300\t20"  
+            "orange\torange\t300\t20"
         };
-                        
+
         Util.createLocalInputFile(INPUT_FILE, inputData);
 
         myPig.setBatchOn();
@@ -582,22 +583,22 @@ public class TestMultiQuery {
         myPig.registerQuery("c = filter a by gid > 10;");
         myPig.registerQuery("d = cogroup c by $0, b by $0;");
         myPig.registerQuery("e = foreach d generate group, COUNT(c), COUNT(b);");
-                               
+
         Iterator<Tuple> iter = myPig.openIterator("e");
 
         List<Tuple> expectedResults = Util.getTuplesFromConstantTupleStrings(
-                new String[] { 
+                new String[] {
                         "('apple',1L,2L)",
                         "('orange',1L,1L)"
                 });
-        
+
         int counter = 0;
         while (iter.hasNext()) {
             assertEquals(expectedResults.get(counter++).toString(), iter.next().toString());
         }
 
         assertEquals(expectedResults.size(), counter);
-    }        
+    }
 
     @Test
     public void testMultiQueryJiraPig976() throws Exception {
@@ -625,7 +626,7 @@ public class TestMultiQuery {
     @Test
     public void testMultiQueryJiraPig976_2() throws Exception {
 
-        // test case: key ('group') isn't part of foreach output 
+        // test case: key ('group') isn't part of foreach output
         // and keys have different types
 
         myPig.setBatchOn();
@@ -671,7 +672,7 @@ public class TestMultiQuery {
     public void testMultiQueryJiraPig976_4() throws Exception {
 
         // test case: group by multi-cols and key ('group') isn't part of output
-     
+
         myPig.setBatchOn();
 
         myPig.registerQuery("a = load 'passwd' " +
@@ -688,7 +689,7 @@ public class TestMultiQuery {
             assertTrue(job.getStatus() == ExecJob.JOB_STATUS.COMPLETED);
         }
     }
-   
+
     @Test
     public void testMultiQueryJiraPig976_5() throws Exception {
 
@@ -718,7 +719,7 @@ public class TestMultiQuery {
         // test case: key ('group') has null values.
 
         String INPUT_FILE = "pig-976.txt";
-        
+
         String[] inputData = {
             "apple\tapple\t100\t10",
             "apple\tapple\t\t20",
@@ -726,9 +727,9 @@ public class TestMultiQuery {
             "orange\torange\t\t20",
             "strawberry\tstrawberry\t300\t10"
         };
-                            
+
         Util.createLocalInputFile(INPUT_FILE, inputData);
-    
+
         myPig.setBatchOn();
 
         myPig.registerQuery("a = load '" + INPUT_FILE +
@@ -742,11 +743,11 @@ public class TestMultiQuery {
 
         List<ExecJob> jobs = myPig.executeBatch();
         assertTrue(jobs.size() == 2);
-        
+
         for (ExecJob job : jobs) {
             assertTrue(job.getStatus() == ExecJob.JOB_STATUS.COMPLETED);
         }
-    }    
+    }
 
     @Test
     public void testMultiQueryJiraPig983_2() throws Exception {
@@ -766,15 +767,15 @@ public class TestMultiQuery {
         myPig.registerQuery("f = group d by c::gid;");
         myPig.registerQuery("f1 = foreach f generate group, SUM(d.c::uid);");
         myPig.registerQuery("store f1 into 'output2';");
-         
+
         List<ExecJob> jobs = myPig.executeBatch();
 
         assertTrue(jobs.size() == 2);
-        
+
         for (ExecJob job : jobs) {
             assertTrue(job.getStatus() == ExecJob.JOB_STATUS.COMPLETED);
         }
-    }     
+    }
 
     // --------------------------------------------------------------------------
     // Helper methods