You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ch...@apache.org on 2014/05/11 00:44:37 UTC

svn commit: r1593741 - in /pig/trunk: ./ src/org/apache/pig/ src/org/apache/pig/newplan/logical/ src/org/apache/pig/parser/ test/org/apache/pig/test/

Author: cheolsoo
Date: Sat May 10 22:44:36 2014
New Revision: 1593741

URL: http://svn.apache.org/r1593741
Log:
PIG-3902: PigServer creates cycle (thedatachef via cheolsoo)

Modified:
    pig/trunk/CHANGES.txt
    pig/trunk/src/org/apache/pig/PigServer.java
    pig/trunk/src/org/apache/pig/newplan/logical/Util.java
    pig/trunk/src/org/apache/pig/parser/LogicalPlanBuilder.java
    pig/trunk/test/org/apache/pig/test/TestMultiQuery.java
    pig/trunk/test/org/apache/pig/test/TestNativeMapReduce.java

Modified: pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1593741&r1=1593740&r2=1593741&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Sat May 10 22:44:36 2014
@@ -127,6 +127,8 @@ PIG-3882: Multiquery off mode execution 
  
 BUG FIXES
 
+PIG-3902: PigServer creates cycle (thedatachef via cheolsoo)
+
 PIG-3930: "java.io.IOException: Cannot initialize Cluster" in local mode with hadoopversion=23 dependencies (jira.shegalov via cheolsoo)
 
 PIG-3921: Obsolete entries in piggybank javadoc build script (mrflip via cheolsoo)

Modified: pig/trunk/src/org/apache/pig/PigServer.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/PigServer.java?rev=1593741&r1=1593740&r2=1593741&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/PigServer.java (original)
+++ pig/trunk/src/org/apache/pig/PigServer.java Sat May 10 22:44:36 2014
@@ -73,6 +73,7 @@ import org.apache.pig.impl.util.UriUtil;
 import org.apache.pig.impl.util.Utils;
 import org.apache.pig.newplan.DependencyOrderWalker;
 import org.apache.pig.newplan.Operator;
+import org.apache.pig.newplan.logical.Util;
 import org.apache.pig.newplan.logical.expression.LogicalExpressionPlan;
 import org.apache.pig.newplan.logical.expression.LogicalExpressionVisitor;
 import org.apache.pig.newplan.logical.expression.ScalarExpression;
@@ -1010,6 +1011,7 @@ public class PigServer {
             alias = getLastRel();
         }
         currDAG.parseQuery();
+        currDAG.skipStores(); // skip the stores that have already been processed
         currDAG.buildPlan( alias );
 
         try {
@@ -1273,6 +1275,7 @@ public class PigServer {
                 execute();
             }
             currDAG.parseQuery();
+            currDAG.skipStores();
             currDAG.buildPlan( alias );
             currDAG.compile();
         } catch (IOException e) {
@@ -1455,12 +1458,9 @@ public class PigServer {
         /**
          * Call back method for counting executed stores.
          */
-        private void countExecutedStores() {
-            for( Operator sink : lp.getSinks() ) {
-                if( sink instanceof LOStore ) {
-                    processedStores++;
-                }
-            }
+        private void countExecutedStores() throws FrontendException {
+            List<LOStore> sinks = Util.getLogicalRelationalOperators(lp, LOStore.class);
+            processedStores += sinks.size();
         }
 
         Map<LogicalRelationalOperator, LogicalPlan> getAliases() {
@@ -1532,11 +1532,21 @@ public class PigServer {
                 }
                 queue.add( op );
             } else {
-                List<Operator> sinks = lp.getSinks();
-                if( sinks != null ) {
-                    for( Operator sink : sinks ) {
-                        if( sink instanceof LOStore )
-                            queue.add( sink );
+                List<LOStore> stores = Util.getLogicalRelationalOperators(lp, LOStore.class);
+                for (LOStore op : stores) {
+                    boolean addSink = true;
+                    // Only add if all the successors are loads
+                    List<Operator> succs = lp.getSuccessors(op);
+                    if (succs != null && succs.size() > 0) {
+                        for (Operator succ : succs) {
+                            if (!(succ instanceof LOLoad)) {
+                                addSink = false;
+                                break;
+                            }
+                        }
+                    }
+                    if (addSink) {
+                        queue.add(op);
                     }
                 }
             }
@@ -1584,27 +1594,45 @@ public class PigServer {
          *  Remove stores that have been executed previously from the overall plan.
          */
         private void skipStores() throws IOException {
-            List<Operator> sinks = lp.getSinks();
+            // Get stores specifically
+            List<LOStore> sinks = Util.getLogicalRelationalOperators(lp, LOStore.class);
             List<Operator> sinksToRemove = new ArrayList<Operator>();
             int skipCount = processedStores;
             if( skipCount > 0 ) {
-                for( Operator sink : sinks ) {
-                    if( sink instanceof LOStore ) {
-                        sinksToRemove.add( sink );
-                        skipCount--;
-                        if( skipCount == 0 )
-                            break;
-                    }
+                for( LOStore sink : sinks ) {
+                    sinksToRemove.add( sink );
+                    skipCount--;
+                    if( skipCount == 0 )
+                        break;
                 }
             }
 
             for( Operator op : sinksToRemove ) {
+                // It's fully possible in the multiquery case that
+                // a store that is not a leaf (sink) and therefor has
+                // successors that need to be removed.
+                removeToLoad(op);
                 Operator pred = lp.getPredecessors( op ).get(0);
                 lp.disconnect( pred, op );
                 lp.remove( op );
             }
         }
 
+        private void removeToLoad(Operator toRemove) throws IOException {
+            List<Operator> successors = lp.getSuccessors(toRemove);
+            List<Operator> succToRemove = new ArrayList<Operator>();
+            if (successors != null && successors.size() > 0) {
+                succToRemove.addAll(successors);
+                for (Operator succ : succToRemove) {
+                    lp.disconnect( toRemove, succ );
+                    if (!(succ instanceof LOLoad)) {
+                        removeToLoad(succ);
+                        lp.remove(succ);
+                    }
+                }
+            }
+        }
+
         /**
          * Accumulate the given statement to previous query statements and generate
          * an overall (raw) plan.
@@ -1796,7 +1824,7 @@ public class PigServer {
                 for (LOStore store : storeOps) {
                     String ifile = load.getFileSpec().getFileName();
                     String ofile = store.getFileSpec().getFileName();
-                    if (ofile.compareTo(ifile) == 0) {
+                    if (ofile.equals(ifile)) {
                         // if there is no path from the load to the store,
                         // then connect the store to the load to create the
                         // dependency of the store on the load. If there is

Modified: pig/trunk/src/org/apache/pig/newplan/logical/Util.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/newplan/logical/Util.java?rev=1593741&r1=1593740&r2=1593741&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/newplan/logical/Util.java (original)
+++ pig/trunk/src/org/apache/pig/newplan/logical/Util.java Sat May 10 22:44:36 2014
@@ -18,61 +18,82 @@
 package org.apache.pig.newplan.logical;
 
 import java.util.ArrayList;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Set;
 
 import org.apache.pig.impl.logicalLayer.FrontendException;
 import org.apache.pig.impl.logicalLayer.schema.Schema;
+import org.apache.pig.newplan.DependencyOrderWalker;
 import org.apache.pig.newplan.Operator;
 import org.apache.pig.newplan.logical.expression.LogicalExpressionPlan;
 import org.apache.pig.newplan.logical.expression.ProjectExpression;
+import org.apache.pig.newplan.logical.relational.LOCogroup;
+import org.apache.pig.newplan.logical.relational.LOCross;
+import org.apache.pig.newplan.logical.relational.LOCube;
+import org.apache.pig.newplan.logical.relational.LODistinct;
+import org.apache.pig.newplan.logical.relational.LOFilter;
 import org.apache.pig.newplan.logical.relational.LOForEach;
 import org.apache.pig.newplan.logical.relational.LOGenerate;
 import org.apache.pig.newplan.logical.relational.LOInnerLoad;
+import org.apache.pig.newplan.logical.relational.LOJoin;
+import org.apache.pig.newplan.logical.relational.LOLimit;
+import org.apache.pig.newplan.logical.relational.LOLoad;
+import org.apache.pig.newplan.logical.relational.LONative;
+import org.apache.pig.newplan.logical.relational.LORank;
+import org.apache.pig.newplan.logical.relational.LOSort;
+import org.apache.pig.newplan.logical.relational.LOSplit;
+import org.apache.pig.newplan.logical.relational.LOSplitOutput;
+import org.apache.pig.newplan.logical.relational.LOStore;
+import org.apache.pig.newplan.logical.relational.LOStream;
+import org.apache.pig.newplan.logical.relational.LOUnion;
 import org.apache.pig.newplan.logical.relational.LogicalPlan;
+import org.apache.pig.newplan.logical.relational.LogicalRelationalNodesVisitor;
 import org.apache.pig.newplan.logical.relational.LogicalRelationalOperator;
 import org.apache.pig.newplan.logical.relational.LogicalSchema;
 
+import com.google.common.collect.Lists;
+
 public class Util {
-    public static LogicalSchema translateSchema(Schema schema) {       
+    public static LogicalSchema translateSchema(Schema schema) {
         if (schema == null) {
             return null;
         }
-        
+
         LogicalSchema s2 = new LogicalSchema();
         List<Schema.FieldSchema> ll = schema.getFields();
         for (Schema.FieldSchema f: ll) {
-            LogicalSchema.LogicalFieldSchema f2 = 
+            LogicalSchema.LogicalFieldSchema f2 =
                 new LogicalSchema.LogicalFieldSchema(f.alias, translateSchema(f.schema), f.type);
-                       
+
             s2.addField(f2);
         }
-        
+
         return s2;
     }
-    
-    public static LogicalSchema.LogicalFieldSchema translateFieldSchema(Schema.FieldSchema fs) {      
+
+    public static LogicalSchema.LogicalFieldSchema translateFieldSchema(Schema.FieldSchema fs) {
         LogicalSchema newSchema = null;
         if (fs.schema!=null) {
             newSchema = translateSchema(fs.schema);
         }
-        
+
         LogicalSchema.LogicalFieldSchema newFs = new LogicalSchema.LogicalFieldSchema(fs.alias, newSchema, fs.type);
         return newFs;
     }
-    
+
     /**
      * This function translates the new LogicalSchema into old Schema format required
      * by PhysicalOperators
      * @param schema LogicalSchema to be converted to Schema
      * @return Schema that is converted from LogicalSchema
-     * @throws FrontendException 
+     * @throws FrontendException
      */
-    public static Schema translateSchema(LogicalSchema schema) {       
+    public static Schema translateSchema(LogicalSchema schema) {
         if (schema == null) {
             return null;
         }
-        
+
         Schema s2 = new Schema();
         List<LogicalSchema.LogicalFieldSchema> ll = schema.getFields();
         for (LogicalSchema.LogicalFieldSchema f: ll) {
@@ -84,17 +105,17 @@ public class Util {
             } catch (FrontendException e) {
             }
         }
-        
+
         return s2;
     }
-    
+
     /**
      * If schema argument has fields where a bag does not contain a tuple schema,
      * it inserts a tuple schema. It does so for all inner levels.
-     * eg bag({int}) => bag({(int)}) 
+     * eg bag({int}) => bag({(int)})
      * @param sch
      * @return modified schema
-     * @throws FrontendException 
+     * @throws FrontendException
      */
     public static Schema fixSchemaAddTupleInBag(Schema sch) throws FrontendException{
         LogicalSchema logSch = translateSchema(sch);
@@ -110,7 +131,7 @@ public class Util {
         if (fs.schema!=null) {
             newSchema = translateSchema(fs.schema);
         }
-        
+
         Schema.FieldSchema newFs = null;
         try {
             newFs = new Schema.FieldSchema(null, newSchema, fs.type);
@@ -118,11 +139,11 @@ public class Util {
         }
         return newFs;
     }
-    
+
     public static LOForEach addForEachAfter(LogicalPlan plan, LogicalRelationalOperator op, int branch,
             Set<Integer> columnsToDrop) throws FrontendException {
         LOForEach foreach = new LOForEach(plan);
-        
+
         plan.add(foreach);
         List<Operator> next = plan.getSuccessors(op);
         if (next != null) {
@@ -133,26 +154,26 @@ public class Util {
         else {
             plan.connect(op, foreach);
         }
-        
+
         LogicalPlan innerPlan = new LogicalPlan();
         foreach.setInnerPlan(innerPlan);
-        
+
         LogicalSchema schema = op.getSchema();
-        
+
         // build foreach inner plan
         List<LogicalExpressionPlan> exps = new ArrayList<LogicalExpressionPlan>();
         LOGenerate gen = new LOGenerate(innerPlan, exps, new boolean[schema.size()-columnsToDrop.size()]);
         innerPlan.add(gen);
-        
+
         for (int i=0, j=0; i<schema.size(); i++) {
             if (columnsToDrop.contains(i)) {
                 continue;
             }
-            
+
             LOInnerLoad innerLoad = new LOInnerLoad(innerPlan, foreach, i);
             innerPlan.add(innerLoad);
             innerPlan.connect(innerLoad, gen);
-            
+
             LogicalExpressionPlan exp = new LogicalExpressionPlan();
             ProjectExpression prj = new ProjectExpression(exp, j++, -1, gen);
             exp.add(prj);
@@ -160,4 +181,136 @@ public class Util {
         }
         return foreach;
     }
+
+    /**
+     * Returns a LinkedList of operators contained within the physical plan
+     * which implement the supplied class, in dependency order. Returns an empty
+     * LinkedList of no such operators exist.
+     * @param plan
+     * @param opClass
+     * @return a LinkedList of operators contained within the plan which
+     *         implement the supplied class; empty if no such ops exist.
+     * @throws FrontendException
+     */
+    public static <C extends LogicalRelationalOperator> LinkedList<C> getLogicalRelationalOperators(LogicalPlan plan,
+            Class<C> opClass) throws FrontendException {
+        OpFinder<C> finder = new OpFinder<C>(plan, opClass);
+        finder.visit();
+        return finder.getFoundOps();
+    }
+
+    private static class OpFinder<C extends LogicalRelationalOperator> extends LogicalRelationalNodesVisitor {
+        final Class<C> opClass;
+        private LinkedList<C> foundOps = Lists.newLinkedList();
+
+        public OpFinder(LogicalPlan plan, Class<C> opClass) throws FrontendException {
+            super(plan, new DependencyOrderWalker(plan));
+            this.opClass = opClass;
+        }
+
+        public LinkedList<C> getFoundOps() {
+            return foundOps;
+        }
+
+        @SuppressWarnings("unchecked")
+        private void visitOp(LogicalRelationalOperator op) {
+            if (opClass.isAssignableFrom(op.getClass())) {
+                foundOps.add((C) op);
+            }
+        }
+
+        public void visit(LOLoad load) throws FrontendException {
+            visitOp(load);
+        }
+
+        @Override
+        public void visit(LOFilter filter) throws FrontendException {
+            visitOp(filter);
+        }
+
+        @Override
+        public void visit(LOStore store) throws FrontendException {
+            visitOp(store);
+        }
+
+        @Override
+        public void visit(LOJoin join) throws FrontendException {
+            visitOp(join);
+        }
+
+        @Override
+        public void visit(LOForEach foreach) throws FrontendException {
+            visitOp(foreach);
+        }
+
+        @Override
+        public void visit(LOGenerate gen) throws FrontendException {
+            visitOp(gen);
+        }
+
+        @Override
+        public void visit(LOInnerLoad load) throws FrontendException {
+            visitOp(load);
+        }
+
+        @Override
+        public void visit(LOCube cube) throws FrontendException {
+            visitOp(cube);
+        }
+
+        @Override
+        public void visit(LOCogroup loCogroup) throws FrontendException {
+            visitOp(loCogroup);
+        }
+
+        @Override
+        public void visit(LOSplit loSplit) throws FrontendException {
+            visitOp(loSplit);
+        }
+
+        @Override
+        public void visit(LOSplitOutput loSplitOutput) throws FrontendException {
+            visitOp(loSplitOutput);
+        }
+
+        @Override
+        public void visit(LOUnion loUnion) throws FrontendException {
+            visitOp(loUnion);
+        }
+
+        @Override
+        public void visit(LOSort loSort) throws FrontendException {
+            visitOp(loSort);
+        }
+
+        @Override
+        public void visit(LORank loRank) throws FrontendException{
+            visitOp(loRank);
+        }
+
+        @Override
+        public void visit(LODistinct loDistinct) throws FrontendException {
+            visitOp(loDistinct);
+        }
+
+        @Override
+        public void visit(LOLimit loLimit) throws FrontendException {
+            visitOp(loLimit);
+        }
+
+        @Override
+        public void visit(LOCross loCross) throws FrontendException {
+            visitOp(loCross);
+        }
+
+        @Override
+        public void visit(LOStream loStream) throws FrontendException {
+            visitOp(loStream);
+        }
+
+        @Override
+        public void visit(LONative nativeMR) throws FrontendException{
+            visitOp(nativeMR);
+        }
+    }
 }

Modified: pig/trunk/src/org/apache/pig/parser/LogicalPlanBuilder.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/parser/LogicalPlanBuilder.java?rev=1593741&r1=1593740&r2=1593741&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/parser/LogicalPlanBuilder.java (original)
+++ pig/trunk/src/org/apache/pig/parser/LogicalPlanBuilder.java Sat May 10 22:44:36 2014
@@ -431,22 +431,21 @@ public class LogicalPlanBuilder {
         return alias;
     }
 
-
     LOCube createCubeOp() {
-	return new LOCube(plan);
+        return new LOCube(plan);
     }
 
     String buildCubeOp(SourceLocation loc, LOCube op, String alias, String inputAlias,
-	    List<String> operations, MultiMap<Integer, LogicalExpressionPlan> expressionPlans)
-	    throws ParserValidationException {
+        List<String> operations, MultiMap<Integer, LogicalExpressionPlan> expressionPlans)
+        throws ParserValidationException {
 
-	// check if continuously occurring cube operations be combined
-	combineCubeOperations((ArrayList<String>) operations, expressionPlans);
+        // check if continuously occurring cube operations be combined
+        combineCubeOperations((ArrayList<String>) operations, expressionPlans);
 
-	// set the expression plans for cube operator and build cube operator
-	op.setExpressionPlans(expressionPlans);
-	op.setOperations(operations);
-	buildOp(loc, op, alias, inputAlias, null);
+        // set the expression plans for cube operator and build cube operator
+        op.setExpressionPlans(expressionPlans);
+        op.setOperations(operations);
+        buildOp(loc, op, alias, inputAlias, null);
         try {
             (new ProjectStarExpander(op.getPlan())).visit(op);
             (new ProjStarInUdfExpander(op.getPlan())).visit(op);
@@ -454,100 +453,100 @@ public class LogicalPlanBuilder {
         } catch (FrontendException e ) {
             throw new ParserValidationException( intStream, loc, e );
         }
-	try {
-	    alias = convertCubeToFGPlan(loc, op, inputAlias, operations, expressionPlans);
-	} catch (FrontendException e) {
-	    throw new ParserValidationException(intStream, loc, e);
-	}
-	return alias;
+        try {
+            alias = convertCubeToFGPlan(loc, op, inputAlias, operations, expressionPlans);
+        } catch (FrontendException e) {
+            throw new ParserValidationException(intStream, loc, e);
+        }
+        return alias;
     }
 
     // if multiple CUBE operations occur continuously then it can be combined
     // together CUBE rel BY CUBE(a,b), CUBE(c,d); => CUBE rel BY CUBE(a,b,c,d)
     private void combineCubeOperations(ArrayList<String> operations,
-	    MultiMap<Integer, LogicalExpressionPlan> expressionPlans) {
+        MultiMap<Integer, LogicalExpressionPlan> expressionPlans) {
+
+        int startIdx = -1;
+        int endIdx = -1;
+        int i = 0;
+        boolean isMerged = false;
+
+        // scan and perform merge of column projections
+        for (i = 0; i < operations.size(); i++) {
+            if ((startIdx == -1) && (operations.get(i).equals("CUBE") == true)) {
+                startIdx = i;
+            } else {
+                if (operations.get(i).equals("CUBE") == true) {
+                    endIdx = i;
+                } else {
+                    if (endIdx > startIdx) {
+                        mergeAndMarkForDelete(operations, expressionPlans, startIdx, endIdx);
+                        isMerged = true;
+                        startIdx = -1;
+                        endIdx = -1;
+                    } else {
+                        startIdx = -1;
+                        endIdx = -1;
+                    }
+                }
+            }
+        }
+
+        // this check is required for the case when the sequence of CUBE
+        // operations occurs at the end, like (CUBE, ROLLUP, CUBE, CUBE)
+        // in which case endIdx will be greater than startIdx
+        if (endIdx > startIdx) {
+            isMerged = true;
+            mergeAndMarkForDelete(operations, expressionPlans, startIdx, endIdx);
+        }
 
-	int startIdx = -1;
-	int endIdx = -1;
-	int i = 0;
-	boolean isMerged = false;
-
-	// scan and perform merge of column projections
-	for (i = 0; i < operations.size(); i++) {
-	    if ((startIdx == -1) && (operations.get(i).equals("CUBE") == true)) {
-		startIdx = i;
-	    } else {
-		if (operations.get(i).equals("CUBE") == true) {
-		    endIdx = i;
-		} else {
-		    if (endIdx > startIdx) {
-			mergeAndMarkForDelete(operations, expressionPlans, startIdx, endIdx);
-			isMerged = true;
-			startIdx = -1;
-			endIdx = -1;
-		    } else {
-			startIdx = -1;
-			endIdx = -1;
-		    }
-		}
-	    }
-	}
-
-	// this check is required for the case when the sequence of CUBE
-	// operations occurs at the end, like (CUBE, ROLLUP, CUBE, CUBE)
-	// in which case endIdx will be greater than startIdx
-	if (endIdx > startIdx) {
-	    isMerged = true;
-	    mergeAndMarkForDelete(operations, expressionPlans, startIdx, endIdx);
-	}
-
-	// if merged then remove the column projections that were marked for
-	// deletion
-	if (isMerged) {
-	    performDeletion(expressionPlans, operations);
-	}
+        // if merged then remove the column projections that were marked for
+        // deletion
+        if (isMerged) {
+            performDeletion(expressionPlans, operations);
+        }
     }
 
     private void performDeletion(MultiMap<Integer, LogicalExpressionPlan> expressionPlans,
-	    ArrayList<String> operations) {
+        ArrayList<String> operations) {
+
+        MultiMap<Integer, LogicalExpressionPlan> ep = new MultiMap<Integer, LogicalExpressionPlan>();
+        List<String> op = new ArrayList<String>();
+        int idx = 0;
+        // rearranging indices
+        for (int i = 0; i < operations.size(); i++) {
+            if (operations.get(i) != null) {
+                op.add(idx, operations.get(i));
+            }
+
+            if (expressionPlans.get(i) != null) {
+                ep.put(idx, expressionPlans.get(i));
+                idx++;
+            }
+        }
+
+        // performing deletions
+        operations.clear();
+        operations.addAll(op);
 
-	MultiMap<Integer, LogicalExpressionPlan> ep = new MultiMap<Integer, LogicalExpressionPlan>();
-	List<String> op = new ArrayList<String>();
-	int idx = 0;
-	// rearranging indices
-	for (int i = 0; i < operations.size(); i++) {
-	    if (operations.get(i) != null) {
-		op.add(idx, operations.get(i));
-	    }
-
-	    if (expressionPlans.get(i) != null) {
-		ep.put(idx, expressionPlans.get(i));
-		idx++;
-	    }
-	}
-
-	// performing deletions
-	operations.clear();
-	operations.addAll(op);
-
-	expressionPlans.clear();
-	for (Integer i : ep.keySet()) {
-	    expressionPlans.put(i, ep.get(i));
-	}
+        expressionPlans.clear();
+        for (Integer i : ep.keySet()) {
+            expressionPlans.put(i, ep.get(i));
+        }
     }
 
     // performs merging of dimensions of merged cube operation
     // Ex: CUBE(a,b), CUBE(c,d) ==> CUBE(a,b,c,d)
     // in the above example CUBE operator and dimensions are merged
     private void mergeAndMarkForDelete(ArrayList<String> operations,
-	    MultiMap<Integer, LogicalExpressionPlan> expressionPlans, int startIdx, int endIdx) {
-	// mark for delete
-	for (int i = startIdx + 1; i <= endIdx; i++) {
-	    expressionPlans.put(startIdx, expressionPlans.get(i));
-	    expressionPlans.removeKey(i);
-	    operations.remove(i);
-	    operations.add(i, null);
-	}
+        MultiMap<Integer, LogicalExpressionPlan> expressionPlans, int startIdx, int endIdx) {
+        // mark for delete
+        for (int i = startIdx + 1; i <= endIdx; i++) {
+            expressionPlans.put(startIdx, expressionPlans.get(i));
+            expressionPlans.removeKey(i);
+            operations.remove(i);
+            operations.add(i, null);
+        }
     }
 
     // This function creates logical plan for foreach and groupby operators.
@@ -555,265 +554,265 @@ public class LogicalPlanBuilder {
     // disconnects cube operator from the logical plan. It also connects foreach
     // plan with groupby plan.
     private String convertCubeToFGPlan(SourceLocation loc, LOCube op, String inputAlias,
-	    List<String> operations, MultiMap<Integer, LogicalExpressionPlan> expressionPlans)
-	    throws FrontendException {
+        List<String> operations, MultiMap<Integer, LogicalExpressionPlan> expressionPlans)
+        throws FrontendException {
+
+        LOForEach foreach = new LOForEach(plan);
+        LOCogroup groupby = new LOCogroup(plan);
+        LogicalPlan innerPlan = new LogicalPlan();
+        LogicalRelationalOperator gen = new LOGenerate(innerPlan);
+
+        injectForeachOperator(loc, op, foreach);
+
+        // Get all column attributes from the input relation.
+        // Create ProjectExpression for all columns. Based on the
+        // dimensions specified by the user, specified columns will be attached
+        // to CubeDimension/RollupDimension UDF and rest will be pushed down
+        List<Operator> inpOpers = foreach.getPlan().getPredecessors(foreach);
+        List<LogicalExpressionPlan> allExprPlan = new ArrayList<LogicalExpressionPlan>();
+        for (Operator oper : inpOpers) {
+            LogicalSchema schema = new LogicalSchema();
+            schema = ((LogicalRelationalOperator) oper).getSchema();
+
+            if (schema != null) {
+                ArrayList<LogicalFieldSchema> fields = (ArrayList<LogicalFieldSchema>) schema
+                        .getFields();
+                for (int i = 0; i < fields.size(); i++) {
+                    LogicalExpressionPlan lEplan = new LogicalExpressionPlan();
+                    new ProjectExpression(lEplan, i, fields.get(i).alias, null, gen);
+                    allExprPlan.add(lEplan);
+                }
+            }
+        }
+
+        // iterate over all operations and generate corresponding UDFs
+        for (int operIdx = 0; operIdx < operations.size(); operIdx++) {
+            List<LogicalExpressionPlan> lexpPlanList = new ArrayList<LogicalExpressionPlan>();
+            List<LogicalExpression> lexpList = new ArrayList<LogicalExpression>();
+
+            lexpPlanList.addAll(expressionPlans.get(operIdx));
+
+            // If duplicates exists in the dimension list then exception is
+            // thrown
+            checkDuplicateProject(lexpPlanList);
+
+            // Construct ProjectExpression from the LogicalExpressionPlans
+            lexpList = getProjectExpList(lexpPlanList, gen);
+
+            for (int i = 0; i < lexpList.size(); i++) {
+                // Retain the columns that needs to be pushed down.
+                // Remove the dimension columns from the input column list
+                // as it will be attached to CubeDimension UDF
+                for (int j = 0; j < allExprPlan.size(); j++) {
+                    LogicalExpression lexp = (LogicalExpression) allExprPlan.get(j).getSources()
+                        .get(0);
+                    String colAlias = ((ProjectExpression) lexpList.get(i)).getColAlias();
+                    if (colAlias == null) {
+                        colAlias = ((ProjectExpression) lexpList.get(i)).getFieldSchema().alias;
+                    }
+
+                    String projExpAlias = null;
+                    try {
+                        projExpAlias = ((ProjectExpression) lexp).getColAlias();
+                    } catch (ClassCastException e) {
+                        // if it is not projection then it should be
+                        // UserFuncExpr.
+                        // ignore and continue till next ProjExpr is encountered
+                        continue;
+                    }
+                    if (colAlias.equals(projExpAlias) == true) {
+                        allExprPlan.remove(j);
+                    } else {
+                        // if projected exp alias is a namespaced alias
+                        if (projExpAlias.lastIndexOf(":") != -1) {
+                            projExpAlias = projExpAlias.substring(
+                                projExpAlias.lastIndexOf(":") + 1, projExpAlias.length());
+                            if (colAlias.equals(projExpAlias) == true) {
+                                allExprPlan.remove(j);
+                            }
+                        }
+                    }
+                }
+            }
+
+            // Create UDF with user specified dimensions
+            LogicalExpressionPlan uexpPlan = new LogicalExpressionPlan();
+            if (operations.get(operIdx).equals("CUBE")) {
+                new UserFuncExpression(uexpPlan, new FuncSpec(CubeDimensions.class.getName()),
+                        lexpList);
+            } else {
+                new UserFuncExpression(uexpPlan, new FuncSpec(RollupDimensions.class.getName()),
+                        lexpList);
+            }
 
-	LOForEach foreach = new LOForEach(plan);
-	LOCogroup groupby = new LOCogroup(plan);
-	LogicalPlan innerPlan = new LogicalPlan();
-	LogicalRelationalOperator gen = new LOGenerate(innerPlan);
-
-	injectForeachOperator(loc, op, foreach);
-
-	// Get all column attributes from the input relation.
-	// Create ProjectExpression for all columns. Based on the
-	// dimensions specified by the user, specified columns will be attached
-	// to CubeDimension/RollupDimension UDF and rest will be pushed down
-	List<Operator> inpOpers = foreach.getPlan().getPredecessors(foreach);
-	List<LogicalExpressionPlan> allExprPlan = new ArrayList<LogicalExpressionPlan>();
-	for (Operator oper : inpOpers) {
-	    LogicalSchema schema = new LogicalSchema();
-	    schema = ((LogicalRelationalOperator) oper).getSchema();
-
-	    if (schema != null) {
-		ArrayList<LogicalFieldSchema> fields = (ArrayList<LogicalFieldSchema>) schema
-		        .getFields();
-		for (int i = 0; i < fields.size(); i++) {
-		    LogicalExpressionPlan lEplan = new LogicalExpressionPlan();
-		    new ProjectExpression(lEplan, i, fields.get(i).alias, null, gen);
-		    allExprPlan.add(lEplan);
-		}
-	    }
-	}
-
-	// iterate over all operations and generate corresponding UDFs
-	for (int operIdx = 0; operIdx < operations.size(); operIdx++) {
-	    List<LogicalExpressionPlan> lexpPlanList = new ArrayList<LogicalExpressionPlan>();
-	    List<LogicalExpression> lexpList = new ArrayList<LogicalExpression>();
-
-	    lexpPlanList.addAll(expressionPlans.get(operIdx));
-
-	    // If duplicates exists in the dimension list then exception is
-	    // thrown
-	    checkDuplicateProject(lexpPlanList);
-
-	    // Construct ProjectExpression from the LogicalExpressionPlans
-	    lexpList = getProjectExpList(lexpPlanList, gen);
-
-	    for (int i = 0; i < lexpList.size(); i++) {
-		// Retain the columns that needs to be pushed down.
-		// Remove the dimension columns from the input column list
-		// as it will be attached to CubeDimension UDF
-		for (int j = 0; j < allExprPlan.size(); j++) {
-		    LogicalExpression lexp = (LogicalExpression) allExprPlan.get(j).getSources()
-			    .get(0);
-		    String colAlias = ((ProjectExpression) lexpList.get(i)).getColAlias();
-		    if (colAlias == null) {
-			colAlias = ((ProjectExpression) lexpList.get(i)).getFieldSchema().alias;
-		    }
-
-		    String projExpAlias = null;
-		    try {
-			projExpAlias = ((ProjectExpression) lexp).getColAlias();
-		    } catch (ClassCastException e) {
-			// if it is not projection then it should be
-			// UserFuncExpr.
-			// ignore and continue till next ProjExpr is encountered
-			continue;
-		    }
-		    if (colAlias.equals(projExpAlias) == true) {
-			allExprPlan.remove(j);
-		    } else {
-			// if projected exp alias is a namespaced alias
-			if (projExpAlias.lastIndexOf(":") != -1) {
-			    projExpAlias = projExpAlias.substring(
-				    projExpAlias.lastIndexOf(":") + 1, projExpAlias.length());
-			    if (colAlias.equals(projExpAlias) == true) {
-				allExprPlan.remove(j);
-			    }
-			}
-		    }
-		}
-	    }
-
-	    // Create UDF with user specified dimensions
-	    LogicalExpressionPlan uexpPlan = new LogicalExpressionPlan();
-	    if (operations.get(operIdx).equals("CUBE")) {
-		new UserFuncExpression(uexpPlan, new FuncSpec(CubeDimensions.class.getName()),
-		        lexpList);
-	    } else {
-		new UserFuncExpression(uexpPlan, new FuncSpec(RollupDimensions.class.getName()),
-		        lexpList);
-	    }
-
-	    for (LogicalExpressionPlan lexp : lexpPlanList) {
-		Iterator<Operator> it = lexp.getOperators();
-		while (it.hasNext()) {
-		    uexpPlan.add(it.next());
-		}
-	    }
-	    // Add the UDF to logical expression plan that contains dependent
-	    // attributes (pushed down from input columns)
-	    allExprPlan.add(operIdx, uexpPlan);
-	}
-
-	// If the operator is a UserFuncExpression then set the flatten flags.
-	List<Boolean> flattenFlags = new ArrayList<Boolean>();
-	for (int idx = 0; idx < allExprPlan.size(); idx++) {
-	    List<Operator> opers = allExprPlan.get(idx).getSources();
-	    for (Operator oper : opers) {
-		if (oper instanceof ProjectExpression) {
-		    flattenFlags.add(false);
-		} else if (oper instanceof UserFuncExpression) {
-		    flattenFlags.add(true);
-		}
-	    }
-	}
-
-	// Generate and Foreach operator creation
-	String falias = null;
-	try {
-	    buildGenerateOp(loc, (LOForEach) foreach, (LOGenerate) gen, allExprPlan,
-		    flattenFlags, getUserDefinedSchema(allExprPlan));
-	    falias = buildForeachOp(loc, (LOForEach) foreach, "cube", inputAlias, innerPlan);
-	} catch (ParserValidationException pve) {
-	    throw new FrontendException(pve);
-	}
-
-	List<Boolean> innerFlags = new ArrayList<Boolean>();
-	List<String> inpAliases = new ArrayList<String>();
-	inpAliases.add(falias);
-	innerFlags.add(false);
-
-	// Get the output schema of foreach operator and reconstruct the
-	// LogicalExpressionPlan for each dimensional attributes
-	MultiMap<Integer, LogicalExpressionPlan> exprPlansCopy = new MultiMap<Integer, LogicalExpressionPlan>();
-
-	for (LogicalExpressionPlan exp : expressionPlans.values()) {
-	    LogicalExpression lexp = (LogicalExpression) exp.getSources().get(0);
-	    LogicalExpressionPlan epGrp = new LogicalExpressionPlan();
-	    new ProjectExpression(epGrp, 0, lexp.getFieldSchema().alias, null, groupby);
-	    exprPlansCopy.put(0, epGrp);
-	}
-
-	// build group by operator
-	try {
-	    return buildGroupOp(loc, (LOCogroup) groupby, op.getAlias(), inpAliases, exprPlansCopy,
-		    GROUPTYPE.REGULAR, innerFlags, null);
-	} catch (ParserValidationException pve) {
-	    throw new FrontendException(pve);
-	}
+            for (LogicalExpressionPlan lexp : lexpPlanList) {
+                Iterator<Operator> it = lexp.getOperators();
+                while (it.hasNext()) {
+                    uexpPlan.add(it.next());
+                }
+            }
+            // Add the UDF to logical expression plan that contains dependent
+            // attributes (pushed down from input columns)
+            allExprPlan.add(operIdx, uexpPlan);
+        }
+
+        // If the operator is a UserFuncExpression then set the flatten flags.
+        List<Boolean> flattenFlags = new ArrayList<Boolean>();
+        for (int idx = 0; idx < allExprPlan.size(); idx++) {
+            List<Operator> opers = allExprPlan.get(idx).getSources();
+            for (Operator oper : opers) {
+                if (oper instanceof ProjectExpression) {
+                    flattenFlags.add(false);
+                } else if (oper instanceof UserFuncExpression) {
+                    flattenFlags.add(true);
+                }
+            }
+        }
+
+        // Generate and Foreach operator creation
+        String falias = null;
+        try {
+            buildGenerateOp(loc, (LOForEach) foreach, (LOGenerate) gen, allExprPlan,
+                flattenFlags, getUserDefinedSchema(allExprPlan));
+            falias = buildForeachOp(loc, (LOForEach) foreach, "cube", inputAlias, innerPlan);
+        } catch (ParserValidationException pve) {
+            throw new FrontendException(pve);
+        }
+
+        List<Boolean> innerFlags = new ArrayList<Boolean>();
+        List<String> inpAliases = new ArrayList<String>();
+        inpAliases.add(falias);
+        innerFlags.add(false);
+
+        // Get the output schema of foreach operator and reconstruct the
+        // LogicalExpressionPlan for each dimensional attributes
+        MultiMap<Integer, LogicalExpressionPlan> exprPlansCopy = new MultiMap<Integer, LogicalExpressionPlan>();
+
+        for (LogicalExpressionPlan exp : expressionPlans.values()) {
+            LogicalExpression lexp = (LogicalExpression) exp.getSources().get(0);
+            LogicalExpressionPlan epGrp = new LogicalExpressionPlan();
+            new ProjectExpression(epGrp, 0, lexp.getFieldSchema().alias, null, groupby);
+            exprPlansCopy.put(0, epGrp);
+        }
+
+        // build group by operator
+        try {
+            return buildGroupOp(loc, (LOCogroup) groupby, op.getAlias(), inpAliases, exprPlansCopy,
+                GROUPTYPE.REGULAR, innerFlags, null);
+        } catch (ParserValidationException pve) {
+            throw new FrontendException(pve);
+        }
     }
 
     // User defined schema for generate operator. If not specified output schema
     // of UDF will be used which will prefix "dimensions" namespace to all fields
     private List<LogicalSchema> getUserDefinedSchema(List<LogicalExpressionPlan> allExprPlan)
-	    throws FrontendException {
-	List<LogicalSchema> genOutputSchema = new ArrayList<LogicalSchema>();
-	for (int i = 0; i < allExprPlan.size(); i++) {
-	    List<Operator> opers = allExprPlan.get(i).getSources();
-	    for (Operator oper : opers) {
-
-		// add a logical schema for dimensions that are pushed from
-		// predecessor of cube/rollup
-		if (oper instanceof ProjectExpression) {
-		    LogicalSchema output = new LogicalSchema();
-		    output.addField(new LogicalFieldSchema(
-			    ((ProjectExpression) oper).getColAlias(), null, DataType.NULL));
-		    genOutputSchema.add(output);
-		} else if (oper instanceof UserFuncExpression) {
-		    // add logical schema for dimensions specified in
-		    // cube/rollup operator
-		    LogicalSchema output = new LogicalSchema();
-		    for (Operator op : ((UserFuncExpression) oper).getPlan().getSinks()) {
-			output.addField(new LogicalFieldSchema(((ProjectExpression) op)
-			        .getFieldSchema()));
-		    }
-		    genOutputSchema.add(output);
-		}
-
-	    }
-	}
-	return genOutputSchema;
+        throws FrontendException {
+
+        List<LogicalSchema> genOutputSchema = new ArrayList<LogicalSchema>();
+        for (int i = 0; i < allExprPlan.size(); i++) {
+            List<Operator> opers = allExprPlan.get(i).getSources();
+            for (Operator oper : opers) {
+
+                // add a logical schema for dimensions that are pushed from
+                // predecessor of cube/rollup
+                if (oper instanceof ProjectExpression) {
+                    LogicalSchema output = new LogicalSchema();
+                    output.addField(new LogicalFieldSchema(
+                        ((ProjectExpression) oper).getColAlias(), null, DataType.NULL));
+                    genOutputSchema.add(output);
+                } else if (oper instanceof UserFuncExpression) {
+                    // add logical schema for dimensions specified in
+                    // cube/rollup operator
+                    LogicalSchema output = new LogicalSchema();
+                    for (Operator op : ((UserFuncExpression) oper).getPlan().getSinks()) {
+                        output.addField(new LogicalFieldSchema(((ProjectExpression) op)
+                                .getFieldSchema()));
+                    }
+                    genOutputSchema.add(output);
+                }
+
+            }
+        }
+        return genOutputSchema;
     }
 
     private List<LogicalExpression> getProjectExpList(List<LogicalExpressionPlan> lexpPlanList,
-	    LogicalRelationalOperator lro) throws FrontendException {
+        LogicalRelationalOperator lro) throws FrontendException {
 
-	List<LogicalExpression> leList = new ArrayList<LogicalExpression>();
-	for (int i = 0; i < lexpPlanList.size(); i++) {
-	    LogicalExpressionPlan lexp = lexpPlanList.get(i);
-	    LogicalExpression lex = (LogicalExpression) lexp.getSources().get(0);
-	    Iterator<Operator> opers = lexp.getOperators();
-
-	    // ProjExpr are initially attached to CubeOp. So re-attach it to
-	    // specified operator
-	    while (opers.hasNext()) {
-		Operator oper = opers.next();
-		try {
-		    ((ProjectExpression) oper).setAttachedRelationalOp(lro);
-		} catch (ClassCastException cce) {
-		    throw new FrontendException("Column project expected.", cce);
-		}
-	    }
+        List<LogicalExpression> leList = new ArrayList<LogicalExpression>();
+        for (int i = 0; i < lexpPlanList.size(); i++) {
+            LogicalExpressionPlan lexp = lexpPlanList.get(i);
+            LogicalExpression lex = (LogicalExpression) lexp.getSources().get(0);
+            Iterator<Operator> opers = lexp.getOperators();
+
+            // ProjExpr are initially attached to CubeOp. So re-attach it to
+            // specified operator
+            while (opers.hasNext()) {
+                Operator oper = opers.next();
+                try {
+                    ((ProjectExpression) oper).setAttachedRelationalOp(lro);
+                } catch (ClassCastException cce) {
+                    throw new FrontendException("Column project expected.", cce);
+                }
+            }
 
-	    leList.add(lex);
-	}
+            leList.add(lex);
+        }
 
-	return leList;
+        return leList;
     }
 
     // This method connects the predecessors of cube operator with foreach
     // operator and disconnects the cube operator from its predecessors
     private void injectForeachOperator(SourceLocation loc, LOCube op, LOForEach foreach)
-	    throws FrontendException {
-	// connect the foreach operator with predecessors of cube operator
-	List<Operator> opers = op.getPlan().getPredecessors(op);
-	for (Operator oper : opers) {
-	    OperatorPlan foreachPlan = foreach.getPlan();
-	    foreachPlan.connect(oper, (Operator) foreach);
-	}
-
-	// disconnect the cube operator from the plan
-	opers = foreach.getPlan().getPredecessors(foreach);
-	for (Operator lop : opers) {
-	    List<Operator> succs = lop.getPlan().getSuccessors(lop);
-	    for (Operator succ : succs) {
-		if (succ instanceof LOCube) {
-		    succ.getPlan().disconnect(lop, succ);
-		    succ.getPlan().remove(succ);
-		}
-	    }
-	}
+        throws FrontendException {
+        // connect the foreach operator with predecessors of cube operator
+        List<Operator> opers = op.getPlan().getPredecessors(op);
+        for (Operator oper : opers) {
+            OperatorPlan foreachPlan = foreach.getPlan();
+            foreachPlan.connect(oper, (Operator) foreach);
+        }
+
+        // disconnect the cube operator from the plan
+        opers = foreach.getPlan().getPredecessors(foreach);
+        for (Operator lop : opers) {
+            List<Operator> succs = lop.getPlan().getSuccessors(lop);
+            for (Operator succ : succs) {
+                if (succ instanceof LOCube) {
+                    succ.getPlan().disconnect(lop, succ);
+                    succ.getPlan().remove(succ);
+                }
+            }
+        }
     }
 
     // This methods if the dimensions specified by the user has duplicates
     private void checkDuplicateProject(List<LogicalExpressionPlan> lExprPlan)
-	    throws FrontendException {
+        throws FrontendException {
+
+        for (int i = 0; i < lExprPlan.size(); i++) {
+            for (int j = i + 1; j < lExprPlan.size(); j++) {
+                LogicalExpression outer = (LogicalExpression) lExprPlan.get(i).getSources().get(0);
+                LogicalExpression inner = (LogicalExpression) lExprPlan.get(j).getSources().get(0);
+                String outColAlias = ((ProjectExpression) outer).getColAlias();
+                String inColAlias = ((ProjectExpression) inner).getColAlias();
 
-	for (int i = 0; i < lExprPlan.size(); i++) {
-	    for (int j = i + 1; j < lExprPlan.size(); j++) {
-		LogicalExpression outer = (LogicalExpression) lExprPlan.get(i).getSources().get(0);
-		LogicalExpression inner = (LogicalExpression) lExprPlan.get(j).getSources().get(0);
-		String outColAlias = ((ProjectExpression) outer).getColAlias();
-		String inColAlias = ((ProjectExpression) inner).getColAlias();
-
-		if (outColAlias == null) {
-		    outColAlias = outer.getFieldSchema().alias;
-		}
-
-		if (inColAlias == null) {
-		    inColAlias = inner.getFieldSchema().alias;
-		}
-
-		if (outColAlias.equals(inColAlias) == true) {
-		    lExprPlan.remove(j);
-		    throw new FrontendException("Duplicate dimensions detected. Dimension name: "
-			    + inColAlias);
-		}
-	    }
-	}
+                if (outColAlias == null) {
+                    outColAlias = outer.getFieldSchema().alias;
+                }
+
+                if (inColAlias == null) {
+                    inColAlias = inner.getFieldSchema().alias;
+                }
 
+                if (outColAlias.equals(inColAlias) == true) {
+                    lExprPlan.remove(j);
+                    throw new FrontendException("Duplicate dimensions detected. Dimension name: "
+                        + inColAlias);
+                }
+            }
+        }
     }
 
     LOCogroup createGroupOp() {
@@ -891,39 +890,63 @@ public class LogicalPlanBuilder {
                 loFunc,
                 alias + "_" + newOperatorKey());
         op.setTmpLoad(false);
-        return buildOp( loc, op, alias, new ArrayList<String>(), null );
+
+        // Check if there's a store in the plan already that this load
+        // depends on. If so, add it as an input alias
+        List<String> inputAliases = new ArrayList<String>();
+
+        // Get list of stores. The stores are not all sinks in the plan
+        // if they've already got successors.
+        Iterator<Operator> itr = plan.getOperators();
+        List<LOStore> stores = new ArrayList<LOStore>();
+        while (itr.hasNext()) {
+            Operator lop = itr.next();
+            if (lop instanceof LOStore) {
+                stores.add((LOStore)lop);
+            }
+        }
+
+        for (LOStore store : stores) {
+            String ifile = op.getFileSpec().getFileName();
+            String ofile = store.getFileSpec().getFileName();
+            if (ofile.equals(ifile)) {
+                inputAliases.add( store.getAlias() );
+            }
+        }
+
+        return buildOp( loc, op, alias, inputAliases, null );
     }
 
     private String buildOp(SourceLocation loc, LogicalRelationalOperator op, String alias,
-    		String inputAlias, String partitioner) throws ParserValidationException {
+            String inputAlias, String partitioner) throws ParserValidationException {
         List<String> inputAliases = new ArrayList<String>();
         if( inputAlias != null )
             inputAliases.add( inputAlias );
         return buildOp( loc, op, alias, inputAliases, partitioner );
     }
-    
-    private void checkDuplicateAliases(List<String> inputAliases, SourceLocation loc, 
-    		String opName) throws ParserValidationException {
-        //Keep the count of the number of times the same Alias is used          
-        Map<Operator, Integer> inputAliasesMap = new HashMap<Operator, Integer>(); 
+
+    private void checkDuplicateAliases(List<String> inputAliases, SourceLocation loc,
+            String opName) throws ParserValidationException {
+        //Keep the count of the number of times the same Alias is used
+        Map<Operator, Integer> inputAliasesMap = new HashMap<Operator, Integer>();
         for(String a : inputAliases) {
-            Operator pred = operators.get( a );                
-    	    if (pred == null) {                                                
-    	    	throw new ParserValidationException( intStream, loc, "Unrecognized alias " + a );
-    	    }
-    	    if (inputAliasesMap.containsKey(pred)) {
-    	        throw new ParserValidationException( intStream, loc, 
-    	                "Pig does not accept same alias as input for " + opName + 
-    	                " operation : " + a );
-    	    }
-    	    else {
+            Operator pred = operators.get( a );
+            if (pred == null) {
+                throw new ParserValidationException( intStream, loc, "Unrecognized alias " + a );
+            }
+            if (inputAliasesMap.containsKey(pred)) {
+                throw new ParserValidationException( intStream, loc,
+                        "Pig does not accept same alias as input for " + opName +
+                        " operation : " + a );
+            }
+            else {
                 inputAliasesMap.put(pred, 1);
             }
         }
     }
-    
+
     private String buildOp(SourceLocation loc, LogicalRelationalOperator op, String alias,
-    		List<String> inputAliases, String partitioner) throws ParserValidationException {
+            List<String> inputAliases, String partitioner) throws ParserValidationException {
         setAlias( op, alias );
         setPartitioner( op, partitioner );
         op.setLocation( loc );
@@ -944,7 +967,8 @@ public class LogicalPlanBuilder {
     throws ParserValidationException {
         try {
             // Load StoreFunc class from default properties if funcSpec is null. Fallback on PigStorage if StoreFunc is not specified in properties.
-            funcSpec = funcSpec == null ? new FuncSpec(pigContext.getProperties().getProperty(PigConfiguration.PIG_DEFAULT_STORE_FUNC, PigStorage.class.getName())) : funcSpec;
+            funcSpec = funcSpec == null ? new FuncSpec(pigContext.getProperties().getProperty(
+                    PigConfiguration.PIG_DEFAULT_STORE_FUNC, PigStorage.class.getName())) : funcSpec;
             StoreFuncInterface stoFunc = (StoreFuncInterface)PigContext.instantiateFuncFromSpec(funcSpec);
             String fileNameKey = inputAlias + "_" + (storeIndex++) ;
 
@@ -969,10 +993,10 @@ public class LogicalPlanBuilder {
             throw new ParserValidationException(intStream, loc, ex);
         }
     }
-    
-    String buildAssertOp(SourceLocation loc, LOFilter filterOp, 
+
+    String buildAssertOp(SourceLocation loc, LOFilter filterOp,
             String alias, String inputAlias, LogicalExpression expr, String comment,
-            LogicalExpressionPlan exprPlan) 
+            LogicalExpressionPlan exprPlan)
             throws ParserValidationException {
         try {
             filterOp.setAlias(inputAlias);
@@ -1344,7 +1368,7 @@ public class LogicalPlanBuilder {
      * @throws RecognitionException
      */
     LogicalExpression buildProjectExpr(SourceLocation loc, LogicalExpressionPlan plan, LogicalRelationalOperator op,
-    		Map<String, Operator> operators, Map<String, LogicalExpressionPlan> exprPlans, String colAlias, int col)
+            Map<String, Operator> operators, Map<String, LogicalExpressionPlan> exprPlans, String colAlias, int col)
     throws RecognitionException {
         ProjectExpression result = null;
 
@@ -1478,7 +1502,6 @@ public class LogicalPlanBuilder {
             throw new ParserValidationException(intStream, loc, e);
         }
 
-
         return proj;
     }
 
@@ -1633,7 +1656,7 @@ public class LogicalPlanBuilder {
     }
 
     static LOForEach createNestedForeachOp(LogicalPlan plan) {
-    	return new LOForEach(plan);
+        return new LOForEach(plan);
     }
 
     Operator buildNestedSortOp(SourceLocation loc, LOSort op, LogicalPlan plan, String alias, Operator inputOp,
@@ -1651,12 +1674,11 @@ public class LogicalPlanBuilder {
     }
 
     Operator buildNestedForeachOp(SourceLocation loc, LOForEach op, LogicalPlan plan, String alias,
-    		Operator inputOp, LogicalPlan innerPlan)
-    throws ParserValidationException
-    {
-    	op.setInnerPlan(innerPlan);
-    	buildNestedOp(loc, plan, op, alias, inputOp);
-    	return op;
+            Operator inputOp, LogicalPlan innerPlan)
+    throws ParserValidationException {
+        op.setInnerPlan(innerPlan);
+        buildNestedOp(loc, plan, op, alias, inputOp);
+        return op;
     }
 
     Operator buildNestedProjectOp(SourceLocation loc, LogicalPlan innerPlan, LOForEach foreach,
@@ -1740,19 +1762,19 @@ public class LogicalPlanBuilder {
         String modifier = unquote( hint );
 
         if( modifier.equalsIgnoreCase( "repl" ) || modifier.equalsIgnoreCase( "replicated" ) ) {
-                  return JOINTYPE.REPLICATED;
-          } else if( modifier.equalsIgnoreCase( "hash" ) || modifier.equalsIgnoreCase( "default" ) ) {
-                  return LOJoin.JOINTYPE.HASH;
-          } else if( modifier.equalsIgnoreCase( "skewed" ) ) {
-                 return JOINTYPE.SKEWED;
-          } else if (modifier.equalsIgnoreCase("merge")) {
-                  return JOINTYPE.MERGE;
-          } else if (modifier.equalsIgnoreCase("merge-sparse")) {
-                  return JOINTYPE.MERGESPARSE;
-          } else {
-                  throw new ParserValidationException( intStream, loc,
+            return JOINTYPE.REPLICATED;
+         } else if( modifier.equalsIgnoreCase( "hash" ) || modifier.equalsIgnoreCase( "default" ) ) {
+             return LOJoin.JOINTYPE.HASH;
+         } else if( modifier.equalsIgnoreCase( "skewed" ) ) {
+             return JOINTYPE.SKEWED;
+         } else if (modifier.equalsIgnoreCase("merge")) {
+             return JOINTYPE.MERGE;
+         } else if (modifier.equalsIgnoreCase("merge-sparse")) {
+             return JOINTYPE.MERGESPARSE;
+         } else {
+             throw new ParserValidationException( intStream, loc,
                       "Only REPL, REPLICATED, HASH, SKEWED, MERGE, and MERGE-SPARSE are vaild JOIN modifiers." );
-          }
+         }
     }
 
     void putOperator(String alias, Operator op) {

Modified: pig/trunk/test/org/apache/pig/test/TestMultiQuery.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestMultiQuery.java?rev=1593741&r1=1593740&r2=1593741&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestMultiQuery.java (original)
+++ pig/trunk/test/org/apache/pig/test/TestMultiQuery.java Sat May 10 22:44:36 2014
@@ -37,6 +37,7 @@ import org.junit.BeforeClass;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
+import org.apache.pig.builtin.mock.Storage;
 
 @RunWith(JUnit4.class)
 public class TestMultiQuery {
@@ -53,14 +54,14 @@ public class TestMultiQuery {
         props.setProperty("opt.multiquery", ""+true);
         myPig = new PigServer(ExecType.LOCAL, props);
     }
-    
+
     @AfterClass
     public static void tearDownAfterClass() throws Exception {
         Util.deleteFile(new PigContext(ExecType.LOCAL, new Properties()), "passwd");
         Util.deleteFile(new PigContext(ExecType.LOCAL, new Properties()), "passwd2");
         deleteOutputFiles();
     }
-    
+
     @Before
     public void setUp() throws Exception {
         deleteOutputFiles();
@@ -75,9 +76,9 @@ public class TestMultiQuery {
     public void testMultiQueryJiraPig1438() throws Exception {
 
         // test case: merge multiple distinct jobs
-        
+
         String INPUT_FILE = "abc";
-        
+
         String[] inputData = {
                 "1\t2\t3",
                 "2\t3\t4",
@@ -85,9 +86,9 @@ public class TestMultiQuery {
                 "2\t3\t4",
                 "1\t2\t3"
         };
-        
+
         Util.createLocalInputFile(INPUT_FILE, inputData);
-       
+
         myPig.setBatchOn();
 
         myPig.registerQuery("A = load '" + INPUT_FILE + "' as (col1:int, col2:int, col3:int);");
@@ -98,83 +99,83 @@ public class TestMultiQuery {
         myPig.registerQuery("D1 = foreach C1 generate col1, col2;");
         myPig.registerQuery("D2 = foreach C2 generate col2, col3;");
         myPig.registerQuery("store D1 into 'output1';");
-        myPig.registerQuery("store D2 into 'output2';");            
-        
+        myPig.registerQuery("store D2 into 'output2';");
+
         myPig.executeBatch();
-        
-        myPig.registerQuery("E = load 'output1' as (a:int, b:int);");            
+
+        myPig.registerQuery("E = load 'output1' as (a:int, b:int);");
         Iterator<Tuple> iter = myPig.openIterator("E");
 
         List<Tuple> expectedResults = Util.getTuplesFromConstantTupleStrings(
-                new String[] { 
+                new String[] {
                         "(1,2)",
                         "(2,3)"
                 });
-        
+
         int counter = 0;
         while (iter.hasNext()) {
-            assertEquals(expectedResults.get(counter++).toString(), iter.next().toString());      
+            assertEquals(expectedResults.get(counter++).toString(), iter.next().toString());
         }
         assertEquals(expectedResults.size(), counter);
-                    
-        myPig.registerQuery("E = load 'output2' as (a:int, b:int);");            
+
+        myPig.registerQuery("E = load 'output2' as (a:int, b:int);");
         iter = myPig.openIterator("E");
 
         expectedResults = Util.getTuplesFromConstantTupleStrings(
-                new String[] { 
+                new String[] {
                         "(2,3)",
                         "(3,4)"
                 });
-        
+
         counter = 0;
         while (iter.hasNext()) {
-            assertEquals(expectedResults.get(counter++).toString(), iter.next().toString());      
+            assertEquals(expectedResults.get(counter++).toString(), iter.next().toString());
         }
 
         assertEquals(expectedResults.size(), counter);
     }
-    
+
     @Test
     public void testMultiQueryJiraPig1252() throws Exception {
 
         // test case: Problems with secondary key optimization and multiquery
         // diamond optimization
-        
+
         String INPUT_FILE = "abc";
-        
+
         String[] inputData = {
             "1\t2\t3",
             "2\t3\t4",
             "3\t\t5",
             "5\t6\t6",
-            "6\t\t7"       
+            "6\t\t7"
         };
-        
+
         Util.createLocalInputFile(INPUT_FILE, inputData);
 
         myPig.setBatchOn();
 
         myPig.registerQuery("A = load '" + INPUT_FILE + "' as (col1, col2, col3);");
         myPig.registerQuery("B = foreach A generate (chararray) col1, " +
-        		"(chararray) ((col2 is not null) ?  " +
-        		"col2 : (col3 < 6 ? col3 : '')) as splitcond;");
+                "(chararray) ((col2 is not null) ?  " +
+                "col2 : (col3 < 6 ? col3 : '')) as splitcond;");
         myPig.registerQuery("split B into C if splitcond !=  '', D if splitcond == '';");
         myPig.registerQuery("E = group C by splitcond;");
         myPig.registerQuery("F = foreach E { orderedData = order C by $1, $0; generate flatten(orderedData); };");
-   
+
         Iterator<Tuple> iter = myPig.openIterator("F");
 
         List<Tuple> expectedResults = Util.getTuplesFromConstantTupleStrings(
-                new String[] { 
+                new String[] {
                         "(1,2)",
                         "(2,3)",
                         "(3,5)",
                         "(5,6)"
                 });
-        
+
         int counter = 0;
         while (iter.hasNext()) {
-            assertEquals(expectedResults.get(counter++).toString(), iter.next().toString());                  
+            assertEquals(expectedResults.get(counter++).toString(), iter.next().toString());
         }
 
         assertEquals(expectedResults.size(), counter);
@@ -184,22 +185,22 @@ public class TestMultiQuery {
     public void testMultiQueryJiraPig1169() throws Exception {
 
         // test case: Problems with some top N queries
-        
+
         String INPUT_FILE = "abc";
-        
+
         String[] inputData = {
                 "1\t2\t3",
                 "2\t3\t4",
                 "3\t4\t5",
                 "5\t6\t7",
-                "6\t7\t8"       
+                "6\t7\t8"
         };
-        
+
         Util.createLocalInputFile(INPUT_FILE, inputData);
-       
+
         myPig.setBatchOn();
 
-        myPig.registerQuery("A = load '" + INPUT_FILE 
+        myPig.registerQuery("A = load '" + INPUT_FILE
                 + "' as (a:int, b, c);");
         myPig.registerQuery("A1 = Order A by a desc parallel 3;");
         myPig.registerQuery("A2 = limit A1 2;");
@@ -209,105 +210,105 @@ public class TestMultiQuery {
         myPig.executeBatch();
 
         myPig.registerQuery("B = load 'output2' as (a:int, b, c);");
-        
+
         Iterator<Tuple> iter = myPig.openIterator("B");
 
         List<Tuple> expectedResults = Util.getTuplesFromConstantTupleStrings(
-                new String[] { 
+                new String[] {
                         "(6,7,8)",
                         "(5,6,7)"
                 });
-        
+
         int counter = 0;
         while (iter.hasNext()) {
-            assertEquals(expectedResults.get(counter++).toString(), iter.next().toString());      
+            assertEquals(expectedResults.get(counter++).toString(), iter.next().toString());
         }
 
         assertEquals(expectedResults.size(), counter);
     }
-  
+
     @Test
     public void testMultiQueryJiraPig1171() throws Exception {
 
         // test case: Problems with some top N queries
-        
+
         String INPUT_FILE = "abc";
-        
+
         String[] inputData = {
             "1\tapple\t3",
             "2\torange\t4",
-            "3\tpersimmon\t5"    
+            "3\tpersimmon\t5"
         };
-        
+
         Util.createLocalInputFile(INPUT_FILE, inputData);
 
         myPig.setBatchOn();
 
-        myPig.registerQuery("A = load '" + INPUT_FILE 
+        myPig.registerQuery("A = load '" + INPUT_FILE
                 + "' as (a:long, b, c);");
         myPig.registerQuery("A1 = Order A by a desc;");
         myPig.registerQuery("A2 = limit A1 1;");
-        myPig.registerQuery("B = load '" + INPUT_FILE 
+        myPig.registerQuery("B = load '" + INPUT_FILE
                 + "' as (a:long, b, c);");
         myPig.registerQuery("B1 = Order B by a desc;");
         myPig.registerQuery("B2 = limit B1 1;");
-        
+
         myPig.registerQuery("C = cross A2, B2;");
-        
+
         Iterator<Tuple> iter = myPig.openIterator("C");
 
         List<Tuple> expectedResults = Util.getTuplesFromConstantTupleStrings(
-                new String[] { 
+                new String[] {
                         "(3L,'persimmon',5,3L,'persimmon',5)"
                 });
-        
+
         int counter = 0;
         while (iter.hasNext()) {
-            assertEquals(expectedResults.get(counter++).toString(), iter.next().toString());      
+            assertEquals(expectedResults.get(counter++).toString(), iter.next().toString());
         }
 
         assertEquals(expectedResults.size(), counter);
     }
-    
+
     @Test
     public void testMultiQueryJiraPig1157() throws Exception {
 
         // test case: Sucessive replicated joins do not generate Map Reduce plan and fails due to OOM
-        
+
         String INPUT_FILE = "abc";
         String INPUT_FILE_1 = "abc";
-        
+
         String[] inputData = {
                 "1\tapple\t3",
                 "2\torange\t4",
-                "3\tpersimmon\t5"    
+                "3\tpersimmon\t5"
         };
-            
+
         Util.createLocalInputFile(INPUT_FILE, inputData);
 
         myPig.setBatchOn();
 
-        myPig.registerQuery("A = load '" + INPUT_FILE 
+        myPig.registerQuery("A = load '" + INPUT_FILE
                 + "' as (a:long, b, c);");
         myPig.registerQuery("A1 = FOREACH A GENERATE a;");
         myPig.registerQuery("B = GROUP A1 BY a;");
-        myPig.registerQuery("C = load '" + INPUT_FILE_1 
+        myPig.registerQuery("C = load '" + INPUT_FILE_1
                 + "' as (x:long, y);");
-        myPig.registerQuery("D = JOIN C BY x, B BY group USING 'replicated';");  
-        myPig.registerQuery("E = JOIN A BY a, D by x USING 'replicated';");  
-        
+        myPig.registerQuery("D = JOIN C BY x, B BY group USING 'replicated';");
+        myPig.registerQuery("E = JOIN A BY a, D by x USING 'replicated';");
+
         Iterator<Tuple> iter = myPig.openIterator("E");
 
         List<Tuple> expectedResults = Util.getTuplesFromConstantTupleStrings(
-                new String[] { 
+                new String[] {
                         "(1L,'apple',3,1L,'apple',1L,{(1L)})",
                         "(2L,'orange',4,2L,'orange',2L,{(2L)})",
                         "(3L,'persimmon',5,3L,'persimmon',3L,{(3L)})"
                 });
-        
+
         int counter = 0;
         while (iter.hasNext()) {
-            assertEquals(expectedResults.get(counter++).toString(), iter.next().toString());                  
+            assertEquals(expectedResults.get(counter++).toString(), iter.next().toString());
         }
 
         assertEquals(expectedResults.size(), counter);
@@ -316,7 +317,7 @@ public class TestMultiQuery {
     @Test
     public void testMultiQueryJiraPig1068() throws Exception {
 
-        // test case: COGROUP fails with 'Type mismatch in key from map: 
+        // test case: COGROUP fails with 'Type mismatch in key from map:
         // expected org.apache.pig.impl.io.NullableText, recieved org.apache.pig.impl.io.NullableTuple'
 
         String INPUT_FILE = "pig-1068.txt";
@@ -324,36 +325,36 @@ public class TestMultiQuery {
         String[] inputData = {
             "10\tapple\tlogin\tjar",
             "20\torange\tlogin\tbox",
-            "30\tstrawberry\tquit\tbot"    
+            "30\tstrawberry\tquit\tbot"
         };
-            
+
         Util.createLocalInputFile(INPUT_FILE, inputData);
 
         myPig.setBatchOn();
 
-        myPig.registerQuery("logs = load '" + INPUT_FILE 
+        myPig.registerQuery("logs = load '" + INPUT_FILE
                 + "' as (ts:int, id:chararray, command:chararray, comments:chararray);");
         myPig.registerQuery("SPLIT logs INTO logins IF command == 'login', all_quits IF command == 'quit';");
-        myPig.registerQuery("login_info = FOREACH logins { GENERATE id as id, comments AS client; };");  
+        myPig.registerQuery("login_info = FOREACH logins { GENERATE id as id, comments AS client; };");
         myPig.registerQuery("logins_grouped = GROUP login_info BY (id, client);");
         myPig.registerQuery("count_logins_by_client = FOREACH logins_grouped "
                 + "{ generate group.id AS id, group.client AS client, COUNT($1) AS count; };");
         myPig.registerQuery("all_quits_grouped = GROUP all_quits BY id; ");
         myPig.registerQuery("quits = FOREACH all_quits_grouped { GENERATE FLATTEN(all_quits); };");
         myPig.registerQuery("joined_session_info = COGROUP quits BY id, count_logins_by_client BY id;");
-        
+
         Iterator<Tuple> iter = myPig.openIterator("joined_session_info");
 
         List<Tuple> expectedResults = Util.getTuplesFromConstantTupleStrings(
-                new String[] { 
+                new String[] {
                         "('apple',{},{('apple','jar',1L)})",
                         "('orange',{},{('orange','box',1L)})",
                         "('strawberry',{(30,'strawberry','quit','bot')},{})"
                 });
-        
+
         int counter = 0;
         while (iter.hasNext()) {
-            assertEquals(expectedResults.get(counter++).toString(), iter.next().toString());                
+            assertEquals(expectedResults.get(counter++).toString(), iter.next().toString());
         }
 
         assertEquals(expectedResults.size(), counter);
@@ -364,22 +365,22 @@ public class TestMultiQuery {
 
         myPig.setBatchOn();
 
-        myPig.registerQuery("a = load 'passwd' " 
+        myPig.registerQuery("a = load 'passwd' "
                 + "using PigStorage(':') as (uname:chararray, passwd:chararray, uid:int, gid:int);");
         myPig.registerQuery("split a into plan1 if (uid > 5), plan2 if ( uid < 5);");
         myPig.registerQuery("b = group plan1 by uname;");
-        myPig.registerQuery("c = foreach b { tmp = order plan1 by uid desc; " 
+        myPig.registerQuery("c = foreach b { tmp = order plan1 by uid desc; "
                 + "generate flatten(group) as foo, tmp; };");
         myPig.registerQuery("d = filter c BY foo is not null;");
         myPig.registerQuery("store d into 'output1';");
         myPig.registerQuery("store plan2 into 'output2';");
-         
+
         List<ExecJob> jobs = myPig.executeBatch();
         for (ExecJob job : jobs) {
             assertTrue(job.getStatus() == ExecJob.JOB_STATUS.COMPLETED);
         }
-    }    
-    
+    }
+
     @Test
     public void testMultiQueryJiraPig1114() throws Exception {
 
@@ -390,9 +391,9 @@ public class TestMultiQuery {
         String[] inputData = {
             "10\tjar",
             "20\tbox",
-            "30\tbot"   
+            "30\tbot"
         };
-                
+
         Util.createLocalInputFile(INPUT_FILE, inputData);
 
         myPig.setBatchOn();
@@ -433,39 +434,39 @@ public class TestMultiQuery {
 
         String INPUT_FILE_1 = "set1.txt";
         String INPUT_FILE_2 = "set2.txt";
-        
+
 
         String[] inputData_1 = {
             "login\t0\tjar",
             "login\t1\tbox",
-            "quit\t0\tmany" 
+            "quit\t0\tmany"
         };
-                
+
         Util.createLocalInputFile(INPUT_FILE_1, inputData_1);
-        
+
         String[] inputData_2 = {
             "apple\tlogin\t{(login)}",
             "orange\tlogin\t{(login)}",
-            "strawberry\tquit\t{(login)}"  
+            "strawberry\tquit\t{(login)}"
         };
-                
+
         Util.createLocalInputFile(INPUT_FILE_2, inputData_2);
-            
+
         myPig.setBatchOn();
 
-        myPig.registerQuery("set1 = load '" + INPUT_FILE_1 
+        myPig.registerQuery("set1 = load '" + INPUT_FILE_1
                 + "' USING PigStorage as (a:chararray, b:chararray, c:chararray);");
         myPig.registerQuery("set2 = load '" + INPUT_FILE_2
                 + "' USING PigStorage as (a: chararray, b:chararray, c:bag{});");
-        myPig.registerQuery("set2_1 = FOREACH set2 GENERATE a as f1, b as f2, " 
+        myPig.registerQuery("set2_1 = FOREACH set2 GENERATE a as f1, b as f2, "
                 + "(chararray) 0 as f3;");
         myPig.registerQuery("set2_2 = FOREACH set2 GENERATE a as f1, "
-                + "FLATTEN((IsEmpty(c) ? null : c)) as f2, (chararray) 1 as f3;");  
+                + "FLATTEN((IsEmpty(c) ? null : c)) as f2, (chararray) 1 as f3;");
         myPig.registerQuery("all_set2 = UNION set2_1, set2_2;");
         myPig.registerQuery("joined_sets = JOIN set1 BY (a,b), all_set2 BY (f2,f3);");
-      
+
         List<Tuple> expectedResults = Util.getTuplesFromConstantTupleStrings(
-                new String[] { 
+                new String[] {
                         "('quit','0','many','strawberry','quit','0')",
                         "('login','0','jar','apple','login','0')",
                         "('login','0','jar','orange','login','0')",
@@ -473,7 +474,7 @@ public class TestMultiQuery {
                         "('login','1','box','orange','login','1')",
                         "('login','1','box','strawberry','login','1')"
                 });
-        
+
         Iterator<Tuple> iter = myPig.openIterator("joined_sets");
         int count = 0;
         while (iter.hasNext()) {
@@ -481,11 +482,11 @@ public class TestMultiQuery {
         }
         assertEquals(expectedResults.size(), count);
     }
- 
+
     @Test
     public void testMultiQueryJiraPig1060_2() throws Exception {
 
-        // test case: 
+        // test case:
 
         String INPUT_FILE = "pig-1060.txt";
 
@@ -495,9 +496,9 @@ public class TestMultiQuery {
             "orange\t3",
             "orange\t23",
             "strawberry\t10",
-            "strawberry\t34"  
+            "strawberry\t34"
         };
-                    
+
         Util.createLocalInputFile(INPUT_FILE, inputData);
 
         myPig.setBatchOn();
@@ -530,7 +531,7 @@ public class TestMultiQuery {
         for (ExecJob job : jobs) {
             assertTrue(job.getStatus() == ExecJob.JOB_STATUS.COMPLETED);
         }
-    } 
+    }
 
     @Test
     public void testMultiQueryJiraPig920_2() throws Exception {
@@ -551,27 +552,27 @@ public class TestMultiQuery {
         myPig.registerQuery("g = cogroup d by $0, e by $0;");
         myPig.registerQuery("g1 = foreach g generate group, COUNT(d), COUNT(e);");
         myPig.registerQuery("store g1 into 'output2';");
-         
+
         List<ExecJob> jobs = myPig.executeBatch();
         for (ExecJob job : jobs) {
             assertTrue(job.getStatus() == ExecJob.JOB_STATUS.COMPLETED);
         }
-    }            
-    
+    }
+
     @Test
     public void testMultiQueryJiraPig920_3() throws Exception {
 
         // test case: execution of a simple diamond query
-        
+
         String INPUT_FILE = "pig-920.txt";
-        
+
         String[] inputData = {
             "apple\tapple\t100\t10",
             "apple\tapple\t200\t20",
             "orange\torange\t100\t10",
-            "orange\torange\t300\t20"  
+            "orange\torange\t300\t20"
         };
-                        
+
         Util.createLocalInputFile(INPUT_FILE, inputData);
 
         myPig.setBatchOn();
@@ -582,22 +583,22 @@ public class TestMultiQuery {
         myPig.registerQuery("c = filter a by gid > 10;");
         myPig.registerQuery("d = cogroup c by $0, b by $0;");
         myPig.registerQuery("e = foreach d generate group, COUNT(c), COUNT(b);");
-                               
+
         Iterator<Tuple> iter = myPig.openIterator("e");
 
         List<Tuple> expectedResults = Util.getTuplesFromConstantTupleStrings(
-                new String[] { 
+                new String[] {
                         "('apple',1L,2L)",
                         "('orange',1L,1L)"
                 });
-        
+
         int counter = 0;
         while (iter.hasNext()) {
             assertEquals(expectedResults.get(counter++).toString(), iter.next().toString());
         }
 
         assertEquals(expectedResults.size(), counter);
-    }        
+    }
 
     @Test
     public void testMultiQueryJiraPig976() throws Exception {
@@ -625,7 +626,7 @@ public class TestMultiQuery {
     @Test
     public void testMultiQueryJiraPig976_2() throws Exception {
 
-        // test case: key ('group') isn't part of foreach output 
+        // test case: key ('group') isn't part of foreach output
         // and keys have different types
 
         myPig.setBatchOn();
@@ -671,7 +672,7 @@ public class TestMultiQuery {
     public void testMultiQueryJiraPig976_4() throws Exception {
 
         // test case: group by multi-cols and key ('group') isn't part of output
-     
+
         myPig.setBatchOn();
 
         myPig.registerQuery("a = load 'passwd' " +
@@ -688,7 +689,7 @@ public class TestMultiQuery {
             assertTrue(job.getStatus() == ExecJob.JOB_STATUS.COMPLETED);
         }
     }
-   
+
     @Test
     public void testMultiQueryJiraPig976_5() throws Exception {
 
@@ -718,7 +719,7 @@ public class TestMultiQuery {
         // test case: key ('group') has null values.
 
         String INPUT_FILE = "pig-976.txt";
-        
+
         String[] inputData = {
             "apple\tapple\t100\t10",
             "apple\tapple\t\t20",
@@ -726,9 +727,9 @@ public class TestMultiQuery {
             "orange\torange\t\t20",
             "strawberry\tstrawberry\t300\t10"
         };
-                            
+
         Util.createLocalInputFile(INPUT_FILE, inputData);
-    
+
         myPig.setBatchOn();
 
         myPig.registerQuery("a = load '" + INPUT_FILE +
@@ -742,11 +743,11 @@ public class TestMultiQuery {
 
         List<ExecJob> jobs = myPig.executeBatch();
         assertTrue(jobs.size() == 2);
-        
+
         for (ExecJob job : jobs) {
             assertTrue(job.getStatus() == ExecJob.JOB_STATUS.COMPLETED);
         }
-    }    
+    }
 
     @Test
     public void testMultiQueryJiraPig983_2() throws Exception {
@@ -766,15 +767,50 @@ public class TestMultiQuery {
         myPig.registerQuery("f = group d by c::gid;");
         myPig.registerQuery("f1 = foreach f generate group, SUM(d.c::uid);");
         myPig.registerQuery("store f1 into 'output2';");
-         
+
         List<ExecJob> jobs = myPig.executeBatch();
 
         assertTrue(jobs.size() == 2);
-        
+
         for (ExecJob job : jobs) {
             assertTrue(job.getStatus() == ExecJob.JOB_STATUS.COMPLETED);
         }
-    }     
+    }
+
+    /**
+       This test will fail indeterministically without the PIG-3902 patch. Sometimes
+       the loads will get attached as dependencies to the appropriate stores (during
+       the postProcess() method on PigServer.Graph) and the dag will successfully
+       run, while other times it will fail (if the loads get attached as dependencies
+       of the incorrect stores).
+     */
+    @Test
+    public void testMultiQueryJiraPig3902() throws Exception {
+
+        // test case: Pig Server creates implicit cycle when
+        // loading and storing from same location with an
+        // intermediate store.
+
+        Storage.Data data = Storage.resetData(myPig);
+        data.set("inputLocation", Storage.tuple(1,2,3));
+
+        myPig.setBatchOn();
+        myPig.registerQuery("A = load 'inputLocation' using mock.Storage() as (a:int, b, c);");
+        myPig.registerQuery("A1 = order A by a desc parallel 3;");
+        myPig.registerQuery("A2 = limit A1 2;");
+        myPig.registerQuery("store A1 into 'output1' using mock.Storage();");
+        myPig.registerQuery("A3 = load 'output1' using mock.Storage()as (a:int, b, c);");
+        myPig.registerQuery("A4 = filter A3 by a > 2;");
+        myPig.registerQuery("store A4 into 'inputLocation' using mock.Storage();");
+
+        List<ExecJob> jobs = myPig.executeBatch();
+
+        assertTrue(jobs.size() == 2);
+
+        for (ExecJob job : jobs) {
+            assertTrue(job.getStatus() == ExecJob.JOB_STATUS.COMPLETED);
+        }
+    }
 
     // --------------------------------------------------------------------------
     // Helper methods

Modified: pig/trunk/test/org/apache/pig/test/TestNativeMapReduce.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestNativeMapReduce.java?rev=1593741&r1=1593740&r2=1593741&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestNativeMapReduce.java (original)
+++ pig/trunk/test/org/apache/pig/test/TestNativeMapReduce.java Sat May 10 22:44:36 2014
@@ -162,10 +162,6 @@ public class TestNativeMapReduce  {
 
             assertFalse(iter.hasNext());
 
-            // We have to manually delete intermediate mapreduce files
-            Util.deleteFile(cluster,"table_testNativeMRJobSimple_input");
-            Util.deleteFile(cluster,"table_testNativeMRJobSimple_output");
-
             // check in interactive mode
             iter = pigServer.openIterator("B");
 
@@ -259,9 +255,6 @@ public class TestNativeMapReduce  {
 
             assertFalse(iter.hasNext());
 
-            Util.deleteFile(cluster,"table_testNativeMRJobMultiStoreOnPred_input");
-            Util.deleteFile(cluster,"table_testNativeMRJobMultiStoreOnPred_output");
-
             // check in interactive mode
             iter = pigServer.openIterator("B");