You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by rd...@apache.org on 2010/08/05 00:29:47 UTC

svn commit: r982423 - in /hadoop/pig/trunk: ./ src/org/apache/pig/ src/org/apache/pig/backend/hadoop/executionengine/ src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/ src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/pla...

Author: rding
Date: Wed Aug  4 22:29:46 2010
New Revision: 982423

URL: http://svn.apache.org/viewvc?rev=982423&view=rev
Log:
PIG-1434: Allow casting relations to scalars

Added:
    hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/plans/ScalarPhyFinder.java
    hadoop/pig/trunk/src/org/apache/pig/impl/builtin/ReadScalars.java
    hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/ScalarFinder.java
    hadoop/pig/trunk/test/org/apache/pig/test/TestScalarAliases.java
Modified:
    hadoop/pig/trunk/CHANGES.txt
    hadoop/pig/trunk/src/org/apache/pig/PigServer.java
    hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java
    hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
    hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java
    hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceOper.java
    hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/LogToPhyTranslationVisitor.java
    hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POUserFunc.java
    hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOStore.java
    hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOUserFunc.java
    hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt
    hadoop/pig/trunk/test/findbugsExcludeFile.xml

Modified: hadoop/pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/CHANGES.txt?rev=982423&r1=982422&r2=982423&view=diff
==============================================================================
--- hadoop/pig/trunk/CHANGES.txt (original)
+++ hadoop/pig/trunk/CHANGES.txt Wed Aug  4 22:29:46 2010
@@ -26,6 +26,8 @@ PIG-1249: Safe-guards against misconfigu
 
 IMPROVEMENTS
 
+PIG-1434: Allow casting relations to scalars (aniket486 via rding)
+
 PIG-1461: support union operation that merges based on column names (thejas)
 
 PIG-1517: Pig needs to support keywords in the package name (aniket486 via olgan)
@@ -112,6 +114,8 @@ PIG-1309: Map-side Cogroup (ashutoshc)
 
 BUG FIXES
 
+PIG-1527: No need to deserialize UDFContext on the client side (rding)
+
 PIG-1516: finalize in bag implementations causes pig to run out of memory in reduce (thejas)
 
 PIG-1521: explain plan does not show correct Physical operator in MR plan when POSortedDistinct, POPackageLite are used (thejas)
@@ -377,8 +381,6 @@ OPTIMIZATIONS
 
 BUG FIXES
 
-PIG-1527: No need to deserialize UDFContext on the client side (rding)
-
 PIG-1507: Full outer join fails while doing a filter on joined data (daijy)
 
 PIG-1493: Column Pruner throw exception "inconsistent pruning" (daijy)

Modified: hadoop/pig/trunk/src/org/apache/pig/PigServer.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/PigServer.java?rev=982423&r1=982422&r2=982423&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/PigServer.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/PigServer.java Wed Aug  4 22:29:46 2010
@@ -58,9 +58,11 @@ import org.apache.pig.builtin.PigStorage
 import org.apache.pig.classification.InterfaceAudience;
 import org.apache.pig.classification.InterfaceStability;
 import org.apache.pig.data.DataBag;
+import org.apache.pig.data.DataType;
 import org.apache.pig.data.Tuple;
 import org.apache.pig.impl.PigContext;
 import org.apache.pig.impl.io.FileLocalizer;
+import org.apache.pig.impl.io.FileSpec;
 import org.apache.pig.impl.io.InterStorage;
 import org.apache.pig.impl.logicalLayer.FrontendException;
 import org.apache.pig.impl.logicalLayer.LOConst;
@@ -72,11 +74,13 @@ import org.apache.pig.impl.logicalLayer.
 import org.apache.pig.impl.logicalLayer.LOSplit;
 import org.apache.pig.impl.logicalLayer.LOSplitOutput;
 import org.apache.pig.impl.logicalLayer.LOStore;
+import org.apache.pig.impl.logicalLayer.LOUserFunc;
 import org.apache.pig.impl.logicalLayer.LOVisitor;
 import org.apache.pig.impl.logicalLayer.LogicalOperator;
 import org.apache.pig.impl.logicalLayer.LogicalPlan;
 import org.apache.pig.impl.logicalLayer.LogicalPlanBuilder;
 import org.apache.pig.impl.logicalLayer.PlanSetter;
+import org.apache.pig.impl.logicalLayer.ScalarFinder;
 import org.apache.pig.impl.logicalLayer.optimizer.LogicalOptimizer;
 import org.apache.pig.impl.logicalLayer.parser.ParseException;
 import org.apache.pig.impl.logicalLayer.parser.QueryParser;
@@ -84,6 +88,7 @@ import org.apache.pig.impl.logicalLayer.
 import org.apache.pig.impl.logicalLayer.validators.LogicalPlanValidationExecutor;
 import org.apache.pig.impl.plan.CompilationMessageCollector;
 import org.apache.pig.impl.plan.DependencyOrderWalker;
+import org.apache.pig.impl.plan.NodeIdGenerator;
 import org.apache.pig.impl.plan.OperatorKey;
 import org.apache.pig.impl.plan.PlanException;
 import org.apache.pig.impl.plan.VisitorException;
@@ -506,7 +511,7 @@ public class PigServer {
         currDAG.registerQuery(query, startLine);
     }
  
-    public LogicalPlan clonePlan(String alias) throws IOException {
+    public Graph getClonedGraph() throws IOException {
         Graph graph = currDAG.clone();
 
         if (graph == null) {
@@ -514,8 +519,7 @@ public class PigServer {
             String msg = "Cloning of plan failed.";
             throw new FrontendException(msg, errCode, PigException.BUG);
         }
-
-        return graph.getPlan(alias);
+        return graph;
     }
     
     /**
@@ -801,7 +805,8 @@ public class PigServer {
         }
 
         try {
-            LogicalPlan lp = clonePlan(id);
+            Graph g = getClonedGraph();
+            LogicalPlan lp = g.getPlan(id);
 
             // MRCompiler needs a store to be the leaf - hence
             // add a store to the plan to explain
@@ -822,7 +827,8 @@ public class PigServer {
             LogicalPlan unCompiledstorePlan = QueryParser.generateStorePlan(
                     scope, lp, filename, func, leaf, leaf.getAlias(),
                     pigContext);
-            LogicalPlan storePlan = compileLp(unCompiledstorePlan, true);
+            LogicalPlan storePlan = compileLp(unCompiledstorePlan, g, true);
+            
             return executeCompiledLogicalPlan(storePlan);
         } catch (Exception e) {
             int errCode = 1002;
@@ -1073,7 +1079,7 @@ public class PigServer {
                 currDAG.execute();
             }
             
-            plan = clonePlan(alias);
+            plan = getClonedGraph().getPlan(alias);
         } catch (IOException e) {
             //Since the original script is parsed anyway, there should not be an
             //error in this parsing. The only reason there can be an error is when
@@ -1085,7 +1091,8 @@ public class PigServer {
     }
 
     private LogicalPlan getStorePlan(String alias) throws IOException {
-        LogicalPlan lp = compileLp(alias);
+        Graph g = getClonedGraph();
+        LogicalPlan lp = g.getPlan(alias);
         
         if (!isBatchOn() || alias != null) {
             // MRCompiler needs a store to be the leaf - hence
@@ -1107,7 +1114,9 @@ public class PigServer {
             lp = QueryParser.generateStorePlan(scope, lp, "fakefile", 
                                                PigStorage.class.getName(), leaf, "fake", pigContext);
         }
-
+        
+        compileLp(lp, g, true);
+        
         return lp;
     }
     
@@ -1165,19 +1174,78 @@ public class PigServer {
         // create a clone of the logical plan and give it
         // to the operations below
         LogicalPlan lpClone;
+        Graph g;
  
         try {
-            lpClone = clonePlan(alias);
+            g = getClonedGraph();
+            lpClone = g.getPlan(alias);
         } catch (IOException e) {
             int errCode = 2001;
             String msg = "Unable to clone plan before compiling";
             throw new FrontendException(msg, errCode, PigException.BUG, e);
         }
-        return compileLp(lpClone, optimize);
+        return compileLp(lpClone, g, optimize);
+    }
+    
+    private void mergeScalars(LogicalPlan lp, Graph g) throws FrontendException {
+        // When we start processing a store we look for scalars to add stores
+        // to respective logical plans and temporary files to the attributes
+        // Here we need to find if there are duplicates so that we do not add
+        // two stores for one plan
+        ScalarFinder scalarFinder = new ScalarFinder(lp);
+        scalarFinder.visit();
+
+        Map<LOUserFunc, LogicalPlan> scalarMap = scalarFinder.getScalarMap();
+
+        try {
+            for(Map.Entry<LOUserFunc, LogicalPlan> scalarEntry: scalarMap.entrySet()) {
+                FileSpec fileSpec;
+                String alias = scalarEntry.getKey().getImplicitReferencedOperator().getAlias();
+                LogicalOperator store;
+
+                LogicalPlan referredPlan = g.getAliases().get(g.getAliasOp().get(alias));
+
+                // If referredPlan already has a store, 
+                // we just use it instead of adding one from our pocket
+                store = referredPlan.getLeaves().get(0);
+                if(store instanceof LOStore) {
+                    // use this store
+                    fileSpec = ((LOStore)store).getOutputFile();
+                }
+                else {
+                    // add new store
+                    FuncSpec funcSpec = new FuncSpec(PigStorage.class.getName() + "()");
+                    fileSpec = new FileSpec(FileLocalizer.getTemporaryPath(pigContext).toString(), funcSpec);
+                    store = new LOStore(referredPlan, new OperatorKey(scope, NodeIdGenerator.getGenerator().getNextNodeId(scope)),
+                            fileSpec, alias);
+                    referredPlan.addAsLeaf(store);
+                    ((LOStore)store).setTmpStore(true);
+                }
+                lp.mergeSharedPlan(referredPlan);
+
+                // Attach a constant operator to the ReadScalar func
+                LogicalPlan innerPlan = scalarEntry.getValue();
+                LOConst rconst = new LOConst(innerPlan, new OperatorKey(scope, NodeIdGenerator.getGenerator().getNextNodeId(scope)), fileSpec.getFileName());
+                rconst.setType(DataType.CHARARRAY);
+
+                innerPlan.add(rconst);
+                innerPlan.connect(rconst, scalarEntry.getKey());
+            }
+        } catch (IOException ioe) {
+            int errCode = 2219;
+            String msg = "Unable to process scalar in the plan";
+            throw new FrontendException(msg, errCode, PigException.BUG, ioe);
+        }
+    }
+    
+    private LogicalPlan compileLp(LogicalPlan lp, Graph g, boolean optimize) throws FrontendException {
+        mergeScalars(lp, g);
+        
+        return compileLp(lp, optimize);
     }
     
     @SuppressWarnings("unchecked")
-    private  LogicalPlan compileLp(LogicalPlan lp, boolean optimize) throws
+    private LogicalPlan compileLp(LogicalPlan lp, boolean optimize) throws
     FrontendException {
         // Set the logical plan values correctly in all the operators
         PlanSetter ps = new PlanSetter(lp);
@@ -1188,7 +1256,6 @@ public class PigServer {
         boolean isBeforeOptimizer = true;
         validate(lp, collector, isBeforeOptimizer);
         
-
         // optimize
         if (optimize && pigContext.getProperties().getProperty("pig.usenewlogicalplan", "false").equals("false")) {
             HashSet<String> optimizerRules = null;
@@ -1526,7 +1593,7 @@ public class PigServer {
             // Set the logical plan values correctly in all the operators
             PlanSetter ps = new PlanSetter(lp);
             ps.visit();
-
+            
             // The following code deals with store/load combination of 
             // intermediate files. In this case we will replace the load operator
             // with a (implicit) split operator, iff the load/store

Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java?rev=982423&r1=982422&r2=982423&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java Wed Aug  4 22:29:46 2010
@@ -258,6 +258,7 @@ public class HExecutionEngine {
                     new LogToPhyTranslationVisitor(plan);
                 translator.setPigContext(pigContext);
                 translator.visit();
+                translator.finish();
                 return translator.getPhysicalPlan();
             }
         } catch (Exception ve) {

Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java?rev=982423&r1=982422&r2=982423&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java Wed Aug  4 22:29:46 2010
@@ -42,6 +42,7 @@ import org.apache.pig.backend.executione
 import org.apache.pig.backend.hadoop.executionengine.HExecutionEngine;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MROpPlanVisitor;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MROperPlan;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.ScalarPhyFinder;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.UDFFinder;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.ConstantExpression;
@@ -172,7 +173,7 @@ public class MRCompiler extends PhyPlanV
     private Map<PhysicalOperator,MapReduceOper> phyToMROpMap;
     
     public static final String USER_COMPARATOR_MARKER = "user.comparator.func:";
-    
+   
     public MRCompiler(PhysicalPlan plan) throws MRCompilerException {
         this(plan,null);
     }
@@ -200,6 +201,18 @@ public class MRCompiler extends PhyPlanV
         phyToMROpMap = new HashMap<PhysicalOperator, MapReduceOper>();
     }
     
+    public void connectScalars() throws PlanException {
+        List<MapReduceOper> mrOpList = new ArrayList<MapReduceOper>();
+        for(MapReduceOper mrOp: MRPlan) {
+            mrOpList.add(mrOp);
+        }
+        for(MapReduceOper mrOp: mrOpList) {
+            for(PhysicalOperator scalar: mrOp.scalars) {
+                MRPlan.connect(phyToMROpMap.get(scalar), mrOp);
+            }
+        }
+    }
+    
     public void randomizeFileLocalizer(){
         FileLocalizer.setR(new Random());
     }
@@ -250,7 +263,7 @@ public class MRCompiler extends PhyPlanV
         for (POStore store: stores) {
             compile(store);
         }
-
+        
         // I'm quite certain this is not the best way to do this.  The issue
         // is that for jobs that take multiple map reduce passes, for
         // non-sort jobs, the POLocalRearrange is being put into the reduce
@@ -656,6 +669,12 @@ public class MRCompiler extends PhyPlanV
                 if (!mergedMap.UDFs.contains(udf))
                     mergedMap.UDFs.add(udf);
             }
+            // We also need to change scalar marking
+            for(PhysicalOperator physOp: rmro.scalars) {
+                if(!mergedMap.scalars.contains(physOp)) {
+                    mergedMap.scalars.add(physOp);
+                }
+            }
             MRPlan.remove(rmro);
         }
         return ret;
@@ -676,8 +695,14 @@ public class MRCompiler extends PhyPlanV
         }
     }
 
-    private void addUDFs(PhysicalPlan plan) throws VisitorException{
+    private void processUDFs(PhysicalPlan plan) throws VisitorException{
         if(plan!=null){
+            //Process Scalars (UDF with referencedOperators)
+            ScalarPhyFinder scalarPhyFinder = new ScalarPhyFinder(plan);
+            scalarPhyFinder.visit();
+            curMROp.scalars.addAll(scalarPhyFinder.getScalars());
+            
+            //Process UDFs
             udfFinder.setPlan(plan);
             udfFinder.visit();
             curMROp.UDFs.addAll(udfFinder.getUDFs());
@@ -745,7 +770,7 @@ public class MRCompiler extends PhyPlanV
     public void visitFilter(POFilter op) throws VisitorException{
         try{
             nonBlocking(op);
-            addUDFs(op.getPlan());
+            processUDFs(op.getPlan());
             phyToMROpMap.put(op, curMROp);
         }catch(Exception e){
             int errCode = 2034;
@@ -896,7 +921,7 @@ public class MRCompiler extends PhyPlanV
             List<PhysicalPlan> plans = op.getPlans();
             if(plans!=null)
                 for(PhysicalPlan ep : plans)
-                    addUDFs(ep);
+                    processUDFs(ep);
             phyToMROpMap.put(op, curMROp);
         }catch(Exception e){
             int errCode = 2034;
@@ -944,7 +969,7 @@ public class MRCompiler extends PhyPlanV
                 List<PhysicalPlan> plans = op.getPlans();
                 if(plans!=null)
                     for(PhysicalPlan ep : plans)
-                        addUDFs(ep);
+                        processUDFs(ep);
                 phyToMROpMap.put(op, curMROp);
             }catch(Exception e){
                 int errCode = 2034;
@@ -971,7 +996,7 @@ public class MRCompiler extends PhyPlanV
             List<PhysicalPlan> plans = op.getInputPlans();
             if(plans!=null)
                 for (PhysicalPlan plan : plans) {
-                    addUDFs(plan);
+                    processUDFs(plan);
                 }
             phyToMROpMap.put(op, curMROp);
         }catch(Exception e){
@@ -1080,7 +1105,7 @@ public class MRCompiler extends PhyPlanV
                 for (List<PhysicalPlan> joinPlan : joinPlans) {
                     if(joinPlan!=null)
                         for (PhysicalPlan plan : joinPlan) {
-                            addUDFs(plan);
+                            processUDFs(plan);
                         }
                 }
             phyToMROpMap.put(op, curMROp);

Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java?rev=982423&r1=982422&r2=982423&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java Wed Aug  4 22:29:46 2010
@@ -63,7 +63,6 @@ import org.apache.pig.impl.plan.VisitorE
 import org.apache.pig.impl.plan.CompilationMessageCollector.MessageType;
 import org.apache.pig.impl.util.ConfigurationValidator;
 import org.apache.pig.impl.util.LogUtils;
-import org.apache.pig.tools.pigstats.PigProgressNotificationListener;
 import org.apache.pig.tools.pigstats.PigStats;
 import org.apache.pig.tools.pigstats.PigStatsUtil;
 import org.apache.pig.tools.pigstats.ScriptState;
@@ -398,6 +397,7 @@ public class MapReduceLauncher extends L
         MRCompiler comp = new MRCompiler(php, pc);
         comp.randomizeFileLocalizer();
         comp.compile();
+        comp.connectScalars();
         MROperPlan plan = comp.getMRPlan();
         
         //display the warning message(s) from the MRCompiler

Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceOper.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceOper.java?rev=982423&r1=982422&r2=982423&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceOper.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceOper.java Wed Aug  4 22:29:46 2010
@@ -24,6 +24,7 @@ import java.util.Set;
 import org.apache.pig.impl.plan.OperatorKey;
 import org.apache.pig.impl.plan.NodeIdGenerator;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MROpPlanVisitor;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POUnion;
 import org.apache.pig.impl.plan.Operator;
@@ -105,6 +106,8 @@ public class MapReduceOper extends Opera
 
     public Set<String> UDFs;
     
+    public Set<PhysicalOperator> scalars;
+    
     // Indicates if a UDF comparator is used
     boolean isUDFComparatorUsed = false;
     
@@ -158,6 +161,7 @@ public class MapReduceOper extends Opera
         combinePlan = new PhysicalPlan();
         reducePlan = new PhysicalPlan();
         UDFs = new HashSet<String>();
+        scalars = new HashSet<PhysicalOperator>();
         nig = NodeIdGenerator.getGenerator();
         scope = k.getScope();
     }

Added: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/plans/ScalarPhyFinder.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/plans/ScalarPhyFinder.java?rev=982423&view=auto
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/plans/ScalarPhyFinder.java (added)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/plans/ScalarPhyFinder.java Wed Aug  4 22:29:46 2010
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POUserFunc;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
+import org.apache.pig.impl.plan.DepthFirstWalker;
+import org.apache.pig.impl.plan.VisitorException;
+
+public class ScalarPhyFinder extends PhyPlanVisitor {
+
+    List<PhysicalOperator> scalars = new ArrayList<PhysicalOperator>();
+    
+    public ScalarPhyFinder(PhysicalPlan plan) {
+        super(plan, new DepthFirstWalker<PhysicalOperator, PhysicalPlan>(plan));
+    }
+
+    public List<PhysicalOperator> getScalars() {
+        return scalars;
+    }
+
+    @Override
+    public void visitUserFunc(POUserFunc userFunc) throws VisitorException {
+        if(userFunc.getReferencedOperator() != null) {
+            scalars.add(userFunc.getReferencedOperator());
+        }
+    }
+}

Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/LogToPhyTranslationVisitor.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/LogToPhyTranslationVisitor.java?rev=982423&r1=982422&r2=982423&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/LogToPhyTranslationVisitor.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/LogToPhyTranslationVisitor.java Wed Aug  4 22:29:46 2010
@@ -72,6 +72,8 @@ public class LogToPhyTranslationVisitor 
     protected NodeIdGenerator nodeGen = NodeIdGenerator.getGenerator();
 
     protected PigContext pc;
+    
+    protected Map<PhysicalOperator, LogicalOperator> scalarAliasMap = new HashMap<PhysicalOperator, LogicalOperator>();
 
     public LogToPhyTranslationVisitor(LogicalPlan plan) {
         super(plan, new DependencyOrderWalker<LogicalOperator, LogicalPlan>(
@@ -81,7 +83,12 @@ public class LogToPhyTranslationVisitor 
         currentPlan = new PhysicalPlan();
         logToPhyMap = new HashMap<LogicalOperator, PhysicalOperator>();
     }
-
+    
+    public void finish() {
+        for(PhysicalOperator physOp: scalarAliasMap.keySet()) {
+            ((POUserFunc)physOp).setReferencedOperator(logToPhyMap.get(scalarAliasMap.get(physOp)));
+        }
+    }
     public void setPigContext(PigContext pc) {
         this.pc = pc;
     }
@@ -1606,6 +1613,11 @@ public class LogToPhyTranslationVisitor 
             }
         }
         logToPhyMap.put(func, p);
+        
+        // We need to track all the scalars
+        if(func.getImplicitReferencedOperator() != null) {
+            scalarAliasMap.put(p, func.getImplicitReferencedOperator());
+        }
 
     }
 
@@ -1651,6 +1663,7 @@ public class LogToPhyTranslationVisitor 
         store.setInputSpec(loStore.getInputSpec());
         store.setSignature(loStore.getSignature());
         store.setSortInfo(loStore.getSortInfo());
+        store.setIsTmpStore(loStore.isTmpStore());
         try {
             // create a new schema for ourselves so that when
             // we serialize we are not serializing objects that

Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POUserFunc.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POUserFunc.java?rev=982423&r1=982422&r2=982423&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POUserFunc.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POUserFunc.java Wed Aug  4 22:29:46 2010
@@ -44,6 +44,7 @@ import org.apache.pig.data.DataType;
 import org.apache.pig.data.Tuple;
 import org.apache.pig.data.TupleFactory;
 import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.logicalLayer.LogicalOperator;
 import org.apache.pig.impl.logicalLayer.schema.Schema;
 import org.apache.pig.impl.plan.NodeIdGenerator;
 import org.apache.pig.impl.plan.OperatorKey;
@@ -66,7 +67,15 @@ public class POUserFunc extends Expressi
     private boolean initialized = false;
     private MonitoredUDFExecutor executor = null;
     
+    private PhysicalOperator referencedOperator = null;
     
+    public PhysicalOperator getReferencedOperator() {
+        return referencedOperator;
+    }
+
+    public void setReferencedOperator(PhysicalOperator referencedOperator) {
+        this.referencedOperator = referencedOperator;
+    }
 
     public POUserFunc(OperatorKey k, int rp, List<PhysicalOperator> inp) {
         super(k, rp);

Added: hadoop/pig/trunk/src/org/apache/pig/impl/builtin/ReadScalars.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/impl/builtin/ReadScalars.java?rev=982423&view=auto
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/impl/builtin/ReadScalars.java (added)
+++ hadoop/pig/trunk/src/org/apache/pig/impl/builtin/ReadScalars.java Wed Aug  4 22:29:46 2010
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.impl.builtin;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+
+import org.apache.pig.EvalFunc;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.data.DataType;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.io.FileLocalizer;
+import org.apache.pig.impl.logicalLayer.schema.Schema;
+
+/**
+ * ReadScalars reads a line from a file and returns it as its value. The
+ * file is only read once, and the same line is returned over and over again.
+ * This is useful for incorporating a result from an agregation into another
+ * evaluation.
+ */
+public class ReadScalars extends EvalFunc<String> {
+    private String scalarfilename = null;
+    private String charset = "UTF-8";
+    private String value = null;
+
+    /**
+     * Java level API
+     * 
+     * @param input
+     *            expects a single constant that is the name of the file to be
+     *            read
+     */
+    @Override
+    public String exec(Tuple input) throws IOException {
+        if (value == null) {
+            if (input == null || input.size() == 0)
+                return null;
+
+            InputStream is;
+            BufferedReader reader;
+            int pos;
+            try {
+                pos = DataType.toInteger(input.get(0));
+                scalarfilename = DataType.toString(input.get(1));
+
+                is = FileLocalizer.openDFSFile(scalarfilename);
+                reader = new BufferedReader(new InputStreamReader(is, charset));
+            } catch (Exception e) {
+                throw new ExecException("Failed to open file '" + scalarfilename
+                        + "'; error = " + e.getMessage());
+            }
+            try {
+                String line = reader.readLine();
+                if(line == null) {
+                    log.warn("No scalar field to read, returning null");
+                    return null;
+                }
+                String[] lineTok = line.split("\t");
+                if(pos > lineTok.length) {
+                    log.warn("No scalar field to read, returning null");
+                    return null;
+                }
+                value = lineTok[pos];
+                if(reader.readLine() != null) {
+                    throw new ExecException("Scalar has more than one row in the output");
+                }
+            } catch (Exception e) {
+                throw new ExecException(e.getMessage());
+            } finally {
+                reader.close();
+                is.close();
+            }
+        }
+        return value;
+    }
+
+    @Override
+    public Schema outputSchema(Schema input) {
+        return new Schema(new Schema.FieldSchema(getSchemaName("ReadScalars", input),
+                DataType.CHARARRAY));
+    }
+}

Modified: hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOStore.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOStore.java?rev=982423&r1=982422&r2=982423&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOStore.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOStore.java Wed Aug  4 22:29:46 2010
@@ -48,7 +48,16 @@ public class LOStore extends RelationalO
 
     transient private StoreFuncInterface mStoreFunc;
     private static Log log = LogFactory.getLog(LOStore.class);
+    private boolean isTmpStore;
     
+    public boolean isTmpStore() {
+        return isTmpStore;
+    }
+
+    public void setTmpStore(boolean isTmpStore) {
+        this.isTmpStore = isTmpStore;
+    }
+
     private SortInfo sortInfo;
 
     public SortInfo getSortInfo() {
@@ -72,6 +81,7 @@ public class LOStore extends RelationalO
         super(plan, key);
 
         mOutputFile = outputFileSpec;
+        isTmpStore = false;
 
         // TODO
         // The code below is commented out as PigContext pulls in

Modified: hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOUserFunc.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOUserFunc.java?rev=982423&r1=982422&r2=982423&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOUserFunc.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOUserFunc.java Wed Aug  4 22:29:46 2010
@@ -25,16 +25,15 @@ import org.apache.pig.FuncSpec;
 import org.apache.pig.PigException;
 import org.apache.pig.impl.PigContext;
 import org.apache.pig.impl.plan.VisitorException;
-import org.apache.pig.impl.logicalLayer.parser.ParseException;
 import org.apache.pig.impl.logicalLayer.schema.Schema;
 import org.apache.pig.data.DataType;
 import org.apache.pig.impl.plan.OperatorKey;
-import org.apache.pig.impl.plan.PlanVisitor;
 
 public class LOUserFunc extends ExpressionOperator {
     private static final long serialVersionUID = 2L;
 
     private FuncSpec mFuncSpec;
+    private LogicalOperator implicitReferencedOperator = null; 
     
     /**
      * @param plan
@@ -56,6 +55,15 @@ public class LOUserFunc extends Expressi
     public FuncSpec getFuncSpec() {
         return mFuncSpec;
     }
+    
+    public LogicalOperator getImplicitReferencedOperator() {
+        return implicitReferencedOperator;
+    }
+
+    public void setImplicitReferencedOperator(
+            LogicalOperator implicitReferencedOperator) {
+        this.implicitReferencedOperator = implicitReferencedOperator;
+    }
 
     public List<ExpressionOperator> getArguments() {
         List<LogicalOperator> preds = getPlan().getPredecessors(this);

Added: hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/ScalarFinder.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/ScalarFinder.java?rev=982423&view=auto
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/ScalarFinder.java (added)
+++ hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/ScalarFinder.java Wed Aug  4 22:29:46 2010
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.impl.logicalLayer;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.pig.impl.plan.DepthFirstWalker;
+import org.apache.pig.impl.plan.VisitorException;
+
+public class ScalarFinder extends LOVisitor {
+
+    Map<LOUserFunc, LogicalPlan> mScalarMap = new HashMap<LOUserFunc, LogicalPlan>();
+
+    /**
+     * @param plan
+     *            logical plan to query the presence of Scalars
+     */
+    public ScalarFinder(LogicalPlan plan) {
+        super(plan, new DepthFirstWalker<LogicalOperator, LogicalPlan>(plan));
+    }
+
+    @Override
+    protected void visit(LOUserFunc scalar) throws VisitorException {
+        if(scalar.getImplicitReferencedOperator() != null) {
+            mScalarMap.put(scalar, mCurrentWalker.getPlan());
+        }
+    }
+
+    /**
+     * @return Map of scalar operators found in the plan
+     */
+    public Map<LOUserFunc, LogicalPlan> getScalarMap() {
+        return mScalarMap;
+    }
+   
+}

Modified: hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt?rev=982423&r1=982422&r2=982423&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt (original)
+++ hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt Wed Aug  4 22:29:46 2010
@@ -76,6 +76,7 @@ import org.apache.pig.backend.executione
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapreduce.Partitioner;
 import org.apache.pig.impl.util.LinkedMultiMap;
+import org.apache.pig.impl.builtin.ReadScalars;
 
 public class QueryParser {
 	private PigContext pigContext;
@@ -87,6 +88,7 @@ public class QueryParser {
 	private Map<String, LogicalOperator> mapAliasOp;
 	private static Log log = LogFactory.getLog(QueryParser.class);
 	private boolean bracketed = false;
+	private boolean scalarFound = false;
     private Map<String, String> fileNameMap;
 	
 	private long getNextId() {
@@ -3333,6 +3335,8 @@ ExpressionOperator BaseEvalSpec(Schema o
 	Schema subSchema = null; 
 	Token t; 
     String key;
+    String nextTok;
+    scalarFound = false;
 	log.trace("Entering BaseEvalSpec");
 }
 {
@@ -3352,12 +3356,20 @@ ExpressionOperator BaseEvalSpec(Schema o
             if(null != fs) {
 			    subSchema = fs.schema; 
             }
+            if(item instanceof LOUserFunc && ((LOUserFunc)item).getImplicitReferencedOperator() != null) {
+            	subSchema = ((LOUserFunc)item).getImplicitReferencedOperator().getSchema();
+			}		
 			log.debug("subSchema: " + subSchema);
 		}
 		( 
 			"." projection = BracketedSimpleProj(subSchema,lp,item) 
 			{
-				assertAtomic(item,false); 
+				if(item instanceof LOUserFunc && ((LOUserFunc)item).getImplicitReferencedOperator() != null) {
+					assertAtomic(item,true);
+				}
+				else {
+					assertAtomic(item,false);
+				}
 				item = projection;
 			}
 		)
@@ -3374,7 +3386,15 @@ ExpressionOperator BaseEvalSpec(Schema o
 	)*	
 	)
 	)
-	{log.trace("Exiting BaseEvalSpec"); return item;}
+	{
+		log.trace("Exiting BaseEvalSpec"); 
+		// Validate Scalar here - scalarFound needs to be false here
+		// We set scalarFound when we find it and then we reset it when we resolve projection
+		if(scalarFound) {
+			throw new ParseException("Scalars can be only used with projections");
+		}
+		return item;
+	}
 }
 
 
@@ -4165,6 +4185,36 @@ ExpressionOperator DollarVar(Schema over
                 throw new ParseException("Out of bound access. Trying to access non-existent column: " + colNum + ". Schema " + over + " has " + over.size() + " column(s).");
             }
         }
+        // Scalar Projections
+		if(bracketed && eOp instanceof LOUserFunc && ((LOUserFunc)eOp).getImplicitReferencedOperator() != null) {
+			try {
+				scalarFound = false;
+				// Projections decides type of scalar, we need to add a Cast operator to track that
+				LOCast loCast = null;
+				if(over != null) {
+					if(over.getField(colNum).type != DataType.BYTEARRAY) {
+						loCast = new LOCast(lp, new OperatorKey(scope, getNextId()), over.getField(colNum).type);
+					} 
+				} 
+				if(loCast == null){
+					// Default type is chararray not bytearray for ReadScalar, as it reads string from the file
+					loCast = new LOCast(lp, new OperatorKey(scope, getNextId()), DataType.CHARARRAY);
+				}
+				lp.add(loCast);
+				lp.connect(eOp, loCast);
+				
+				// We also need to attach LOConst to the userfunc 
+				// so that it can read that projection number in ReadScalars UDF
+				LOConst rconst = new LOConst(lp, new OperatorKey(scope, getNextId()), colNum);
+                rconst.setType(DataType.INTEGER);
+                lp.add(rconst);
+                lp.connect(rconst, eOp);
+                
+                return loCast;
+			} catch(Exception e) {
+				throw new ParseException("Invalid field in scalar" + e);
+			}
+		}
 		ExpressionOperator project = new LOProject(lp, new OperatorKey(scope, getNextId()), eOp, undollar(t1.image));
 		try {
 			log.debug("eOp: " + eOp.getClass().getName() + " " + eOp);
@@ -4247,14 +4297,39 @@ ExpressionOperator AliasFieldOrSpec(Sche
 			log.debug("item == null");
 			if (null == over) log.debug("over is null");
             try {
-			    if ( over == null ||  (i = over.getPosition(t1.image)) == -1) {
-				    log.debug("Invalid alias: " + t1.image + " in " + over);
-				    if(null != over) {
-					    log.debug("Printing out the aliases in the schema");
-					    over.printAliases();
-				    }
-				    throw new ParseException("Invalid alias: " + t1.image + " in " + over);
-			    }
+            	if ( over == null ||  (i = over.getPosition(t1.image)) == -1) {
+            		// We also support Scalar aliases, so we make a check
+            		// whether the alias is a scalar alias and construct 
+            		// the plan to use this scalar alias
+
+            		// We achieve this by storing the scalar alias into a temp directory
+            		// and then we retrieve it using ReadScalars UDF. But, it should be noted 
+            		// that this adds an implicit dependency on the scalar alias, as we need
+            		// it to be stored before UDF tries to read it.
+
+            		LogicalOperator aliasOp = getOp(t1.image);
+            		// for bracketed expression we do not resolve scalar
+            		if(!bracketed && aliasOp != null) {
+            			scalarFound = true;
+            			Schema scalarSchema = aliasOp.getSchema();
+            			log.debug("Scalar alias: " + t1.image + " found");
+            			
+            			// We check whether we already have a scalar logical operator in the plan already
+            			item = new LOUserFunc(lp, new OperatorKey(scope, getNextId()), 
+            					new FuncSpec(ReadScalars.class.getName()), DataType.CHARARRAY);
+            			((LOUserFunc)item).setImplicitReferencedOperator(aliasOp);
+            			
+            			lp.add(item);
+            			log.trace("Exiting AliasFieldOrSpec");
+            			return item;
+            		}
+            		log.debug("Invalid alias: " + t1.image + " in " + over);
+            		if(null != over) {
+            			log.debug("Printing out the aliases in the schema");
+            			over.printAliases();
+            		}
+            		throw new ParseException("Invalid alias: " + t1.image + " in " + over);
+            	}
             } catch (FrontendException fee) {
             	ParseException pe = new ParseException(fee.getMessage());
             	pe.initCause(fee);
@@ -4264,7 +4339,34 @@ ExpressionOperator AliasFieldOrSpec(Sche
 			if(null != over) {
 				log.debug("Printing out the aliases in the schema");
 				over.printAliases();
-			}	
+			}
+			// Scalar Projections
+			if(bracketed && eOp instanceof LOUserFunc && ((LOUserFunc)eOp).getImplicitReferencedOperator() != null) {
+				try {
+					scalarFound = false;
+					// Projections decides type of scalar, we need to add a Cast operator to track that
+					LOCast loCast;
+					if(over.getField(i).type != DataType.BYTEARRAY) {
+						loCast = new LOCast(lp, new OperatorKey(scope, getNextId()), over.getField(i).type);
+					} else {
+						// Default type is chararray not bytearray for ReadScalar, as it reads string from the file
+						loCast = new LOCast(lp, new OperatorKey(scope, getNextId()), DataType.CHARARRAY);
+					}
+					lp.add(loCast);
+					lp.connect(eOp, loCast);
+					
+					// We also need to attach LOConst to the userfunc 
+					// so that it can read that projection number in ReadScalars UDF
+					LOConst rconst = new LOConst(lp, new OperatorKey(scope, getNextId()), i);
+	                rconst.setType(DataType.INTEGER);
+	                lp.add(rconst);
+	                lp.connect(rconst, eOp);
+	                
+	                return loCast;
+				} catch(Exception e) {
+					throw new ParseException("Invalid field in scalar" + e);
+				}
+			}
 			item = new LOProject(lp, new OperatorKey(scope, getNextId()), eOp, i);
 			item.setAlias(t1.image);
 			try {

Modified: hadoop/pig/trunk/test/findbugsExcludeFile.xml
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/test/findbugsExcludeFile.xml?rev=982423&r1=982422&r2=982423&view=diff
==============================================================================
--- hadoop/pig/trunk/test/findbugsExcludeFile.xml (original)
+++ hadoop/pig/trunk/test/findbugsExcludeFile.xml Wed Aug  4 22:29:46 2010
@@ -412,4 +412,9 @@
         <Method name = "init" />
         <Bug pattern="OBL_UNSATISFIED_OBLIGATION" />
     </Match>
+	<Match>
+		<Class name = "org.apache.pig.impl.builtin.ReadScalars" />
+		<Method name = "exec" />
+		<Bug pattern= "RV_DONT_JUST_NULL_CHECK_READLINE" />
+	</Match>
 </FindBugsFilter>

Added: hadoop/pig/trunk/test/org/apache/pig/test/TestScalarAliases.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/TestScalarAliases.java?rev=982423&view=auto
==============================================================================
--- hadoop/pig/trunk/test/org/apache/pig/test/TestScalarAliases.java (added)
+++ hadoop/pig/trunk/test/org/apache/pig/test/TestScalarAliases.java Wed Aug  4 22:29:46 2010
@@ -0,0 +1,515 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.test;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.Random;
+
+import junit.framework.TestCase;
+
+import org.apache.pig.ExecType;
+import org.apache.pig.PigServer;
+import org.apache.pig.data.BagFactory;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
+import org.apache.pig.impl.io.FileLocalizer;
+import org.apache.pig.impl.logicalLayer.FrontendException;
+import org.apache.pig.impl.logicalLayer.validators.TypeCheckerException;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestScalarAliases extends TestCase {
+    static MiniCluster cluster = MiniCluster.buildCluster();
+    private PigServer pigServer;
+
+    TupleFactory mTf = TupleFactory.getInstance();
+    BagFactory mBf = BagFactory.getInstance();
+
+    @Before
+    @Override
+    public void setUp() throws Exception{
+        FileLocalizer.setR(new Random());
+        pigServer = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
+    }
+
+    @AfterClass
+    public static void oneTimeTearDown() throws Exception {
+        cluster.shutDown();
+    }
+
+    // See PIG-1434
+    @Test
+    public void testScalarAliasesBatchNobatch() throws Exception{
+        String[] input = {
+                "1\t5",
+                "2\t10",
+                "3\t20"
+        };
+
+        // Test the use of scalars in expressions
+        Util.createInputFile(cluster, "table_testScalarAliasesBatch", input);
+        // Test in script mode
+        pigServer.setBatchOn();
+        pigServer.registerQuery("A = LOAD 'table_testScalarAliasesBatch' as (a0: long, a1: double);");
+        pigServer.registerQuery("B = group A all;");
+        pigServer.registerQuery("C = foreach B generate COUNT(A) as count, MAX(A.$1) as max;");
+        pigServer.registerQuery("Y = foreach A generate (a0 * C.count), (a1 / C.max);");
+        pigServer.registerQuery("Store Y into 'table_testScalarAliasesDir';");
+        pigServer.executeBatch();
+        // Check output
+        pigServer.registerQuery("Z = LOAD 'table_testScalarAliasesDir' as (a0: int, a1: double);");
+
+        Iterator<Tuple> iter;
+        Tuple t;
+        iter = pigServer.openIterator("Z");
+
+        t = iter.next();
+        assertTrue(t.toString().equals("(3,0.25)"));
+
+        t = iter.next();
+        assertTrue(t.toString().equals("(6,0.5)"));
+
+        t = iter.next();
+        assertTrue(t.toString().equals("(9,1.0)"));
+
+        assertFalse(iter.hasNext());
+
+        iter = pigServer.openIterator("Y");
+
+        t = iter.next();
+        assertTrue(t.toString().equals("(3,0.25)"));
+
+        t = iter.next();
+        assertTrue(t.toString().equals("(6,0.5)"));
+
+        t = iter.next();
+        assertTrue(t.toString().equals("(9,1.0)"));
+
+        assertFalse(iter.hasNext());
+    }
+
+    // See PIG-1434
+    @Test
+    public void testUseScalarMultipleTimes() throws Exception{
+        String[] input = {
+                "1\t5",
+                "2\t10",
+                "3\t20"
+        };
+
+        // Test the use of scalars in expressions
+        Util.createInputFile(cluster, "table_testUseScalarMultipleTimes", input);
+        pigServer.setBatchOn();
+        pigServer.registerQuery("A = LOAD 'table_testUseScalarMultipleTimes' as (a0: long, a1: double);");
+        pigServer.registerQuery("B = group A all;");
+        pigServer.registerQuery("C = foreach B generate COUNT(A) as count, MAX(A.$1) as max;");
+        pigServer.registerQuery("Y = foreach A generate (a0 * C.count), (a1 / C.max);");
+        pigServer.registerQuery("Store Y into 'table_testUseScalarMultipleTimesOutY';");
+        pigServer.registerQuery("Z = foreach A generate (a1 + C.count), (a0 * C.max);");
+        pigServer.registerQuery("Store Z into 'table_testUseScalarMultipleTimesOutZ';");
+        // Test Multiquery store
+        pigServer.executeBatch();
+        
+        // Check output
+        pigServer.registerQuery("M = LOAD 'table_testUseScalarMultipleTimesOutY' as (a0: int, a1: double);");
+
+        Iterator<Tuple> iter;
+        Tuple t;
+        iter = pigServer.openIterator("M");
+
+        t = iter.next();
+        assertTrue(t.toString().equals("(3,0.25)"));
+
+        t = iter.next();
+        assertTrue(t.toString().equals("(6,0.5)"));
+
+        t = iter.next();
+        assertTrue(t.toString().equals("(9,1.0)"));
+
+        assertFalse(iter.hasNext());
+        
+        // Check output
+        pigServer.registerQuery("N = LOAD 'table_testUseScalarMultipleTimesOutZ' as (a0: double, a1: double);");
+        
+        iter = pigServer.openIterator("N");
+
+        t = iter.next();
+        assertTrue(t.toString().equals("(8.0,20.0)"));
+
+        t = iter.next();
+        assertTrue(t.toString().equals("(13.0,40.0)"));
+
+        t = iter.next();
+        assertTrue(t.toString().equals("(23.0,60.0)"));
+
+        assertFalse(iter.hasNext());
+        
+        // Non batch mode
+        iter = pigServer.openIterator("Y");
+
+        t = iter.next();
+        assertTrue(t.toString().equals("(3,0.25)"));
+
+        t = iter.next();
+        assertTrue(t.toString().equals("(6,0.5)"));
+
+        t = iter.next();
+        assertTrue(t.toString().equals("(9,1.0)"));
+
+        assertFalse(iter.hasNext());
+
+        // Check in non-batch mode        
+        iter = pigServer.openIterator("Z");
+
+        t = iter.next();
+        assertTrue(t.toString().equals("(8.0,20.0)"));
+
+        t = iter.next();
+        assertTrue(t.toString().equals("(13.0,40.0)"));
+
+        t = iter.next();
+        assertTrue(t.toString().equals("(23.0,60.0)"));
+
+        assertFalse(iter.hasNext());
+    }
+
+    // See PIG-1434
+    @Test
+    public void testScalarWithNoSchema() throws Exception{
+        String[] scalarInput = {
+                "1\t5"
+        };
+        String[] input = {
+                "1\t5",
+                "2\t10",
+                "3\t20"
+        };
+        Util.createInputFile(cluster, "table_testScalarWithNoSchema", input);
+        Util.createInputFile(cluster, "table_testScalarWithNoSchemaScalar", scalarInput);
+        // Load A as a scalar
+        pigServer.registerQuery("A = LOAD 'table_testScalarWithNoSchema';");
+        pigServer.registerQuery("scalar = LOAD 'table_testScalarWithNoSchemaScalar' as (count, total);");
+        pigServer.registerQuery("B = foreach A generate 5 / scalar.total;");
+
+        try {
+            pigServer.openIterator("B");
+            fail("We do not support no schema scalar without a cast");
+        } catch (FrontendException te) {
+            // In alias B, incompatible types in Division Operator left hand side:int right hand side:chararray
+            assertTrue(((TypeCheckerException)te.getCause().getCause().getCause()).getErrorCode() == 1039);
+        }
+
+        pigServer.registerQuery("C = foreach A generate 5 / (int)scalar.total;");
+
+        Iterator<Tuple> iter = pigServer.openIterator("C");
+
+        Tuple t = iter.next();
+        assertTrue(t.get(0).toString().equals("1"));
+
+        t = iter.next();
+        assertTrue(t.get(0).toString().equals("1"));
+
+        t = iter.next();
+        assertTrue(t.get(0).toString().equals("1"));
+
+        assertFalse(iter.hasNext());
+
+    }
+
+    // See PIG-1434
+    @Test
+    public void testScalarWithTwoBranches() throws Exception{
+        String[] inputA = {
+                "1\t5",
+                "2\t10",
+                "3\t20"
+        };
+
+        String[] inputX = {
+                "pig",
+                "hadoop",
+                "rocks"
+        };
+
+        // Test the use of scalars in expressions
+        Util.createInputFile(cluster, "testScalarWithTwoBranchesA", inputA);
+        Util.createInputFile(cluster, "testScalarWithTwoBranchesX", inputX);
+        // Test in script mode
+        pigServer.setBatchOn();
+        pigServer.registerQuery("A = LOAD 'testScalarWithTwoBranchesA' as (a0: long, a1: double);");
+        pigServer.registerQuery("B = group A all;");
+        pigServer.registerQuery("C = foreach B generate COUNT(A) as count, MAX(A.$1) as max;");
+        pigServer.registerQuery("X = LOAD 'testScalarWithTwoBranchesX' as (names: chararray);");
+        pigServer.registerQuery("Y = foreach X generate names, C.max;");
+        pigServer.registerQuery("Store Y into 'testScalarWithTwoBranchesDir';");
+        pigServer.executeBatch();
+        // Check output
+        pigServer.registerQuery("Z = LOAD 'testScalarWithTwoBranchesDir' as (a0: chararray, a1: double);");
+
+        Iterator<Tuple> iter = pigServer.openIterator("Z");
+
+        Tuple t = iter.next();
+        assertTrue(t.toString().equals("(pig,20.0)"));
+
+        t = iter.next();
+        assertTrue(t.toString().equals("(hadoop,20.0)"));
+
+        t = iter.next();
+        assertTrue(t.toString().equals("(rocks,20.0)"));
+
+        assertFalse(iter.hasNext());
+
+        // Check in non-batch mode        
+        iter = pigServer.openIterator("Y");
+
+        t = iter.next();
+        assertTrue(t.toString().equals("(pig,20.0)"));
+
+        t = iter.next();
+        assertTrue(t.toString().equals("(hadoop,20.0)"));
+
+        t = iter.next();
+        assertTrue(t.toString().equals("(rocks,20.0)"));
+
+        assertFalse(iter.hasNext());
+    }
+
+    // See PIG-1434
+    @Test
+    public void testFilteredScalarDollarProj() throws Exception{
+        String[] input = {
+                "1\t5",
+                "2\t10",
+                "3\t20"
+        };
+
+        // Test the use of scalars in expressions
+        Util.createInputFile(cluster, "table_testFilteredScalarDollarProj", input);
+        // Test in script mode
+        pigServer.setBatchOn();
+        pigServer.registerQuery("A = LOAD 'table_testFilteredScalarDollarProj' as (a0: long, a1: double);");
+        pigServer.registerQuery("B = filter A by $1 < 8;");
+        pigServer.registerQuery("Y = foreach A generate (a0 * B.$0), (a1 / B.$1);");
+        pigServer.registerQuery("Store Y into 'table_testFilteredScalarDollarProjDir';");
+        pigServer.executeBatch();
+        // Check output
+        pigServer.registerQuery("Z = LOAD 'table_testFilteredScalarDollarProjDir' as (a0: int, a1: double);");
+
+        Iterator<Tuple> iter = pigServer.openIterator("Z");
+
+        Tuple t = iter.next();
+        assertTrue(t.toString().equals("(1,1.0)"));
+
+        t = iter.next();
+        assertTrue(t.toString().equals("(2,2.0)"));
+
+        t = iter.next();
+        assertTrue(t.toString().equals("(3,4.0)"));
+
+        assertFalse(iter.hasNext());
+
+        // Check in non-batch mode        
+        iter = pigServer.openIterator("Y");
+
+        t = iter.next();
+        assertTrue(t.toString().equals("(1,1.0)"));
+
+        t = iter.next();
+        assertTrue(t.toString().equals("(2,2.0)"));
+
+        t = iter.next();
+        assertTrue(t.toString().equals("(3,4.0)"));
+
+        assertFalse(iter.hasNext());
+
+    }
+
+    // See PIG-1434
+    @Test
+    public void testScalarWithNoSchemaDollarProj() throws Exception{
+        String[] scalarInput = {
+                "1\t5"
+        };
+        String[] input = {
+                "1\t5",
+                "2\t10",
+                "3\t20"
+        };
+        Util.createInputFile(cluster, "table_testScalarWithNoSchemaDollarProj", input);
+        Util.createInputFile(cluster, "table_testScalarWithNoSchemaDollarProjScalar", scalarInput);
+        // Load A as a scalar
+        pigServer.registerQuery("A = LOAD 'table_testScalarWithNoSchemaDollarProj';");
+        pigServer.registerQuery("scalar = LOAD 'table_testScalarWithNoSchemaDollarProjScalar';");
+        pigServer.registerQuery("B = foreach A generate 5 / scalar.$1;");
+
+        try {
+            pigServer.openIterator("B");
+            fail("We do not support no schema scalar without a cast");
+        } catch (FrontendException te) {
+            // In alias B, incompatible types in Division Operator left hand side:int right hand side:chararray
+            assertTrue(((TypeCheckerException)te.getCause().getCause().getCause()).getErrorCode() == 1039);
+        }
+
+        pigServer.registerQuery("C = foreach A generate 5 / (int)scalar.$1;");
+
+        Iterator<Tuple> iter = pigServer.openIterator("C");
+
+        Tuple t = iter.next();
+        assertTrue(t.get(0).toString().equals("1"));
+
+        t = iter.next();
+        assertTrue(t.get(0).toString().equals("1"));
+
+        t = iter.next();
+        assertTrue(t.get(0).toString().equals("1"));
+
+        assertFalse(iter.hasNext());
+
+    }
+
+    // See PIG-1434
+    @Test
+    public void testScalarAliasesJoinClause() throws Exception{
+        String[] inputA = {
+                "1\t5",
+                "2\t10",
+                "3\t20"
+        };
+        String[] inputB = {
+                "Total3\tthree",
+                "Total2\ttwo",
+                "Total1\tone"
+        };
+
+        // Test the use of scalars in expressions
+        Util.createInputFile(cluster, "table_testScalarAliasesJoinClauseA", inputA);
+        Util.createInputFile(cluster, "table_testScalarAliasesJoinClauseB", inputB);
+        // Test in script mode
+        pigServer.registerQuery("A = LOAD 'table_testScalarAliasesJoinClauseA' as (a0, a1);");
+        pigServer.registerQuery("G = group A all;");
+        pigServer.registerQuery("C = foreach G generate COUNT(A) as count;");
+
+        pigServer.registerQuery("B = LOAD 'table_testScalarAliasesJoinClauseB' as (b0:chararray, b1:chararray);");
+        pigServer.registerQuery("Y = join A by CONCAT('Total', (chararray)C.count), B by $0;");
+
+        Iterator<Tuple> iter = pigServer.openIterator("Y");
+
+        Tuple t = iter.next();
+        assertTrue(t.toString().equals("(1,5,Total3,three)"));
+
+        t = iter.next();
+        assertTrue(t.toString().equals("(2,10,Total3,three)"));
+
+        t = iter.next();
+        assertTrue(t.toString().equals("(3,20,Total3,three)"));
+
+        assertFalse(iter.hasNext());
+    }
+
+    // See PIG-1434
+    @Test
+    public void testScalarAliasesFilterClause() throws Exception{
+        String[] input = {
+                "1\t5",
+                "2\t10",
+                "3\t20",
+                "4\t12",
+                "5\t8"
+        };
+
+        // Test the use of scalars in expressions
+        Util.createInputFile(cluster, "table_testScalarAliasesFilterClause", input);
+        // Test in script mode
+        pigServer.registerQuery("A = LOAD 'table_testScalarAliasesFilterClause' as (a0, a1);");
+        pigServer.registerQuery("G = group A all;");
+        pigServer.registerQuery("C = foreach G generate AVG(A.$1) as average;");
+
+        pigServer.registerQuery("Y = filter A by a1 > C.average;");
+
+        Iterator<Tuple> iter = pigServer.openIterator("Y");
+
+        // Average is 11
+        Tuple t = iter.next();
+        assertTrue(t.toString().equals("(3,20)"));
+
+        t = iter.next();
+        assertTrue(t.toString().equals("(4,12)"));
+
+        assertFalse(iter.hasNext());
+    }
+
+    // See PIG-1434
+    @Test
+    public void testScalarAliasesSplitClause() throws Exception{
+        String[] input = {
+                "1\t5",
+                "2\t10",
+                "3\t20"
+        };
+
+        // Test the use of scalars in expressions
+        Util.createInputFile(cluster, "table_testScalarAliasesSplitClause", input);
+        // Test in script mode
+        pigServer.setBatchOn();
+        pigServer.registerQuery("A = LOAD 'table_testScalarAliasesSplitClause' as (a0: long, a1: double);");
+        pigServer.registerQuery("B = group A all;");
+        pigServer.registerQuery("C = foreach B generate COUNT(A) as count;");
+        pigServer.registerQuery("split A into Y if (2 * C.count) < a1, X if a1 == 5;");
+        pigServer.registerQuery("Store Y into 'table_testScalarAliasesSplitClauseDir';");
+        pigServer.executeBatch();
+        // Check output
+        pigServer.registerQuery("Z = LOAD 'table_testScalarAliasesSplitClauseDir' as (a0: int, a1: double);");
+
+        Iterator<Tuple> iter = pigServer.openIterator("Z");
+
+        // Y gets only last 2 elements
+        Tuple t = iter.next();
+        assertTrue(t.toString().equals("(2,10.0)"));
+
+        t = iter.next();
+        assertTrue(t.toString().equals("(3,20.0)"));
+
+        assertFalse(iter.hasNext());
+    }
+
+    // See PIG-1434
+    @Test
+    public void testScalarAliasesGrammarNegative() throws Exception{
+        String[] input = {
+                "1\t5",
+                "2\t10",
+                "3\t20"
+        };
+
+        Util.createInputFile(cluster, "table_testScalarAliasesGrammar", input);
+        pigServer.registerQuery("A = LOAD 'table_testScalarAliasesGrammar' as (a0: long, a1: double);");
+        pigServer.registerQuery("B = group A all;");
+        pigServer.registerQuery("C = foreach B generate COUNT(A);");
+
+        try {
+            // Only projections of C are supported 
+            pigServer.registerQuery("Y = foreach A generate C;");
+            //Control should not reach here
+            fail("Scalar projections are only supported");
+        } catch (IOException pe){
+            assertTrue(pe.getCause().getMessage().equalsIgnoreCase("Scalars can be only used with projections"));
+        }
+    }
+}