You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by rd...@apache.org on 2010/08/05 00:29:47 UTC
svn commit: r982423 - in /hadoop/pig/trunk: ./ src/org/apache/pig/
src/org/apache/pig/backend/hadoop/executionengine/
src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/
src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/pla...
Author: rding
Date: Wed Aug 4 22:29:46 2010
New Revision: 982423
URL: http://svn.apache.org/viewvc?rev=982423&view=rev
Log:
PIG-1434: Allow casting relations to scalars
Added:
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/plans/ScalarPhyFinder.java
hadoop/pig/trunk/src/org/apache/pig/impl/builtin/ReadScalars.java
hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/ScalarFinder.java
hadoop/pig/trunk/test/org/apache/pig/test/TestScalarAliases.java
Modified:
hadoop/pig/trunk/CHANGES.txt
hadoop/pig/trunk/src/org/apache/pig/PigServer.java
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceOper.java
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/LogToPhyTranslationVisitor.java
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POUserFunc.java
hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOStore.java
hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOUserFunc.java
hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt
hadoop/pig/trunk/test/findbugsExcludeFile.xml
Modified: hadoop/pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/CHANGES.txt?rev=982423&r1=982422&r2=982423&view=diff
==============================================================================
--- hadoop/pig/trunk/CHANGES.txt (original)
+++ hadoop/pig/trunk/CHANGES.txt Wed Aug 4 22:29:46 2010
@@ -26,6 +26,8 @@ PIG-1249: Safe-guards against misconfigu
IMPROVEMENTS
+PIG-1434: Allow casting relations to scalars (aniket486 via rding)
+
PIG-1461: support union operation that merges based on column names (thejas)
PIG-1517: Pig needs to support keywords in the package name (aniket486 via olgan)
@@ -112,6 +114,8 @@ PIG-1309: Map-side Cogroup (ashutoshc)
BUG FIXES
+PIG-1527: No need to deserialize UDFContext on the client side (rding)
+
PIG-1516: finalize in bag implementations causes pig to run out of memory in reduce (thejas)
PIG-1521: explain plan does not show correct Physical operator in MR plan when POSortedDistinct, POPackageLite are used (thejas)
@@ -377,8 +381,6 @@ OPTIMIZATIONS
BUG FIXES
-PIG-1527: No need to deserialize UDFContext on the client side (rding)
-
PIG-1507: Full outer join fails while doing a filter on joined data (daijy)
PIG-1493: Column Pruner throw exception "inconsistent pruning" (daijy)
Modified: hadoop/pig/trunk/src/org/apache/pig/PigServer.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/PigServer.java?rev=982423&r1=982422&r2=982423&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/PigServer.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/PigServer.java Wed Aug 4 22:29:46 2010
@@ -58,9 +58,11 @@ import org.apache.pig.builtin.PigStorage
import org.apache.pig.classification.InterfaceAudience;
import org.apache.pig.classification.InterfaceStability;
import org.apache.pig.data.DataBag;
+import org.apache.pig.data.DataType;
import org.apache.pig.data.Tuple;
import org.apache.pig.impl.PigContext;
import org.apache.pig.impl.io.FileLocalizer;
+import org.apache.pig.impl.io.FileSpec;
import org.apache.pig.impl.io.InterStorage;
import org.apache.pig.impl.logicalLayer.FrontendException;
import org.apache.pig.impl.logicalLayer.LOConst;
@@ -72,11 +74,13 @@ import org.apache.pig.impl.logicalLayer.
import org.apache.pig.impl.logicalLayer.LOSplit;
import org.apache.pig.impl.logicalLayer.LOSplitOutput;
import org.apache.pig.impl.logicalLayer.LOStore;
+import org.apache.pig.impl.logicalLayer.LOUserFunc;
import org.apache.pig.impl.logicalLayer.LOVisitor;
import org.apache.pig.impl.logicalLayer.LogicalOperator;
import org.apache.pig.impl.logicalLayer.LogicalPlan;
import org.apache.pig.impl.logicalLayer.LogicalPlanBuilder;
import org.apache.pig.impl.logicalLayer.PlanSetter;
+import org.apache.pig.impl.logicalLayer.ScalarFinder;
import org.apache.pig.impl.logicalLayer.optimizer.LogicalOptimizer;
import org.apache.pig.impl.logicalLayer.parser.ParseException;
import org.apache.pig.impl.logicalLayer.parser.QueryParser;
@@ -84,6 +88,7 @@ import org.apache.pig.impl.logicalLayer.
import org.apache.pig.impl.logicalLayer.validators.LogicalPlanValidationExecutor;
import org.apache.pig.impl.plan.CompilationMessageCollector;
import org.apache.pig.impl.plan.DependencyOrderWalker;
+import org.apache.pig.impl.plan.NodeIdGenerator;
import org.apache.pig.impl.plan.OperatorKey;
import org.apache.pig.impl.plan.PlanException;
import org.apache.pig.impl.plan.VisitorException;
@@ -506,7 +511,7 @@ public class PigServer {
currDAG.registerQuery(query, startLine);
}
- public LogicalPlan clonePlan(String alias) throws IOException {
+ public Graph getClonedGraph() throws IOException {
Graph graph = currDAG.clone();
if (graph == null) {
@@ -514,8 +519,7 @@ public class PigServer {
String msg = "Cloning of plan failed.";
throw new FrontendException(msg, errCode, PigException.BUG);
}
-
- return graph.getPlan(alias);
+ return graph;
}
/**
@@ -801,7 +805,8 @@ public class PigServer {
}
try {
- LogicalPlan lp = clonePlan(id);
+ Graph g = getClonedGraph();
+ LogicalPlan lp = g.getPlan(id);
// MRCompiler needs a store to be the leaf - hence
// add a store to the plan to explain
@@ -822,7 +827,8 @@ public class PigServer {
LogicalPlan unCompiledstorePlan = QueryParser.generateStorePlan(
scope, lp, filename, func, leaf, leaf.getAlias(),
pigContext);
- LogicalPlan storePlan = compileLp(unCompiledstorePlan, true);
+ LogicalPlan storePlan = compileLp(unCompiledstorePlan, g, true);
+
return executeCompiledLogicalPlan(storePlan);
} catch (Exception e) {
int errCode = 1002;
@@ -1073,7 +1079,7 @@ public class PigServer {
currDAG.execute();
}
- plan = clonePlan(alias);
+ plan = getClonedGraph().getPlan(alias);
} catch (IOException e) {
//Since the original script is parsed anyway, there should not be an
//error in this parsing. The only reason there can be an error is when
@@ -1085,7 +1091,8 @@ public class PigServer {
}
private LogicalPlan getStorePlan(String alias) throws IOException {
- LogicalPlan lp = compileLp(alias);
+ Graph g = getClonedGraph();
+ LogicalPlan lp = g.getPlan(alias);
if (!isBatchOn() || alias != null) {
// MRCompiler needs a store to be the leaf - hence
@@ -1107,7 +1114,9 @@ public class PigServer {
lp = QueryParser.generateStorePlan(scope, lp, "fakefile",
PigStorage.class.getName(), leaf, "fake", pigContext);
}
-
+
+ compileLp(lp, g, true);
+
return lp;
}
@@ -1165,19 +1174,78 @@ public class PigServer {
// create a clone of the logical plan and give it
// to the operations below
LogicalPlan lpClone;
+ Graph g;
try {
- lpClone = clonePlan(alias);
+ g = getClonedGraph();
+ lpClone = g.getPlan(alias);
} catch (IOException e) {
int errCode = 2001;
String msg = "Unable to clone plan before compiling";
throw new FrontendException(msg, errCode, PigException.BUG, e);
}
- return compileLp(lpClone, optimize);
+ return compileLp(lpClone, g, optimize);
+ }
+
+ private void mergeScalars(LogicalPlan lp, Graph g) throws FrontendException {
+ // When we start processing a store we look for scalars to add stores
+ // to respective logical plans and temporary files to the attributes
+ // Here we need to find if there are duplicates so that we do not add
+ // two stores for one plan
+ ScalarFinder scalarFinder = new ScalarFinder(lp);
+ scalarFinder.visit();
+
+ Map<LOUserFunc, LogicalPlan> scalarMap = scalarFinder.getScalarMap();
+
+ try {
+ for(Map.Entry<LOUserFunc, LogicalPlan> scalarEntry: scalarMap.entrySet()) {
+ FileSpec fileSpec;
+ String alias = scalarEntry.getKey().getImplicitReferencedOperator().getAlias();
+ LogicalOperator store;
+
+ LogicalPlan referredPlan = g.getAliases().get(g.getAliasOp().get(alias));
+
+ // If referredPlan already has a store,
+ // we just use it instead of adding one from our pocket
+ store = referredPlan.getLeaves().get(0);
+ if(store instanceof LOStore) {
+ // use this store
+ fileSpec = ((LOStore)store).getOutputFile();
+ }
+ else {
+ // add new store
+ FuncSpec funcSpec = new FuncSpec(PigStorage.class.getName() + "()");
+ fileSpec = new FileSpec(FileLocalizer.getTemporaryPath(pigContext).toString(), funcSpec);
+ store = new LOStore(referredPlan, new OperatorKey(scope, NodeIdGenerator.getGenerator().getNextNodeId(scope)),
+ fileSpec, alias);
+ referredPlan.addAsLeaf(store);
+ ((LOStore)store).setTmpStore(true);
+ }
+ lp.mergeSharedPlan(referredPlan);
+
+ // Attach a constant operator to the ReadScalar func
+ LogicalPlan innerPlan = scalarEntry.getValue();
+ LOConst rconst = new LOConst(innerPlan, new OperatorKey(scope, NodeIdGenerator.getGenerator().getNextNodeId(scope)), fileSpec.getFileName());
+ rconst.setType(DataType.CHARARRAY);
+
+ innerPlan.add(rconst);
+ innerPlan.connect(rconst, scalarEntry.getKey());
+ }
+ } catch (IOException ioe) {
+ int errCode = 2219;
+ String msg = "Unable to process scalar in the plan";
+ throw new FrontendException(msg, errCode, PigException.BUG, ioe);
+ }
+ }
+
+ private LogicalPlan compileLp(LogicalPlan lp, Graph g, boolean optimize) throws FrontendException {
+ mergeScalars(lp, g);
+
+ return compileLp(lp, optimize);
}
@SuppressWarnings("unchecked")
- private LogicalPlan compileLp(LogicalPlan lp, boolean optimize) throws
+ private LogicalPlan compileLp(LogicalPlan lp, boolean optimize) throws
FrontendException {
// Set the logical plan values correctly in all the operators
PlanSetter ps = new PlanSetter(lp);
@@ -1188,7 +1256,6 @@ public class PigServer {
boolean isBeforeOptimizer = true;
validate(lp, collector, isBeforeOptimizer);
-
// optimize
if (optimize && pigContext.getProperties().getProperty("pig.usenewlogicalplan", "false").equals("false")) {
HashSet<String> optimizerRules = null;
@@ -1526,7 +1593,7 @@ public class PigServer {
// Set the logical plan values correctly in all the operators
PlanSetter ps = new PlanSetter(lp);
ps.visit();
-
+
// The following code deals with store/load combination of
// intermediate files. In this case we will replace the load operator
// with a (implicit) split operator, iff the load/store
Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java?rev=982423&r1=982422&r2=982423&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java Wed Aug 4 22:29:46 2010
@@ -258,6 +258,7 @@ public class HExecutionEngine {
new LogToPhyTranslationVisitor(plan);
translator.setPigContext(pigContext);
translator.visit();
+ translator.finish();
return translator.getPhysicalPlan();
}
} catch (Exception ve) {
Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java?rev=982423&r1=982422&r2=982423&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java Wed Aug 4 22:29:46 2010
@@ -42,6 +42,7 @@ import org.apache.pig.backend.executione
import org.apache.pig.backend.hadoop.executionengine.HExecutionEngine;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MROpPlanVisitor;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MROperPlan;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.ScalarPhyFinder;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.UDFFinder;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.ConstantExpression;
@@ -172,7 +173,7 @@ public class MRCompiler extends PhyPlanV
private Map<PhysicalOperator,MapReduceOper> phyToMROpMap;
public static final String USER_COMPARATOR_MARKER = "user.comparator.func:";
-
+
public MRCompiler(PhysicalPlan plan) throws MRCompilerException {
this(plan,null);
}
@@ -200,6 +201,18 @@ public class MRCompiler extends PhyPlanV
phyToMROpMap = new HashMap<PhysicalOperator, MapReduceOper>();
}
+ public void connectScalars() throws PlanException {
+ List<MapReduceOper> mrOpList = new ArrayList<MapReduceOper>();
+ for(MapReduceOper mrOp: MRPlan) {
+ mrOpList.add(mrOp);
+ }
+ for(MapReduceOper mrOp: mrOpList) {
+ for(PhysicalOperator scalar: mrOp.scalars) {
+ MRPlan.connect(phyToMROpMap.get(scalar), mrOp);
+ }
+ }
+ }
+
public void randomizeFileLocalizer(){
FileLocalizer.setR(new Random());
}
@@ -250,7 +263,7 @@ public class MRCompiler extends PhyPlanV
for (POStore store: stores) {
compile(store);
}
-
+
// I'm quite certain this is not the best way to do this. The issue
// is that for jobs that take multiple map reduce passes, for
// non-sort jobs, the POLocalRearrange is being put into the reduce
@@ -656,6 +669,12 @@ public class MRCompiler extends PhyPlanV
if (!mergedMap.UDFs.contains(udf))
mergedMap.UDFs.add(udf);
}
+ // We also need to change scalar marking
+ for(PhysicalOperator physOp: rmro.scalars) {
+ if(!mergedMap.scalars.contains(physOp)) {
+ mergedMap.scalars.add(physOp);
+ }
+ }
MRPlan.remove(rmro);
}
return ret;
@@ -676,8 +695,14 @@ public class MRCompiler extends PhyPlanV
}
}
- private void addUDFs(PhysicalPlan plan) throws VisitorException{
+ private void processUDFs(PhysicalPlan plan) throws VisitorException{
if(plan!=null){
+ //Process Scalars (UDF with referencedOperators)
+ ScalarPhyFinder scalarPhyFinder = new ScalarPhyFinder(plan);
+ scalarPhyFinder.visit();
+ curMROp.scalars.addAll(scalarPhyFinder.getScalars());
+
+ //Process UDFs
udfFinder.setPlan(plan);
udfFinder.visit();
curMROp.UDFs.addAll(udfFinder.getUDFs());
@@ -745,7 +770,7 @@ public class MRCompiler extends PhyPlanV
public void visitFilter(POFilter op) throws VisitorException{
try{
nonBlocking(op);
- addUDFs(op.getPlan());
+ processUDFs(op.getPlan());
phyToMROpMap.put(op, curMROp);
}catch(Exception e){
int errCode = 2034;
@@ -896,7 +921,7 @@ public class MRCompiler extends PhyPlanV
List<PhysicalPlan> plans = op.getPlans();
if(plans!=null)
for(PhysicalPlan ep : plans)
- addUDFs(ep);
+ processUDFs(ep);
phyToMROpMap.put(op, curMROp);
}catch(Exception e){
int errCode = 2034;
@@ -944,7 +969,7 @@ public class MRCompiler extends PhyPlanV
List<PhysicalPlan> plans = op.getPlans();
if(plans!=null)
for(PhysicalPlan ep : plans)
- addUDFs(ep);
+ processUDFs(ep);
phyToMROpMap.put(op, curMROp);
}catch(Exception e){
int errCode = 2034;
@@ -971,7 +996,7 @@ public class MRCompiler extends PhyPlanV
List<PhysicalPlan> plans = op.getInputPlans();
if(plans!=null)
for (PhysicalPlan plan : plans) {
- addUDFs(plan);
+ processUDFs(plan);
}
phyToMROpMap.put(op, curMROp);
}catch(Exception e){
@@ -1080,7 +1105,7 @@ public class MRCompiler extends PhyPlanV
for (List<PhysicalPlan> joinPlan : joinPlans) {
if(joinPlan!=null)
for (PhysicalPlan plan : joinPlan) {
- addUDFs(plan);
+ processUDFs(plan);
}
}
phyToMROpMap.put(op, curMROp);
Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java?rev=982423&r1=982422&r2=982423&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java Wed Aug 4 22:29:46 2010
@@ -63,7 +63,6 @@ import org.apache.pig.impl.plan.VisitorE
import org.apache.pig.impl.plan.CompilationMessageCollector.MessageType;
import org.apache.pig.impl.util.ConfigurationValidator;
import org.apache.pig.impl.util.LogUtils;
-import org.apache.pig.tools.pigstats.PigProgressNotificationListener;
import org.apache.pig.tools.pigstats.PigStats;
import org.apache.pig.tools.pigstats.PigStatsUtil;
import org.apache.pig.tools.pigstats.ScriptState;
@@ -398,6 +397,7 @@ public class MapReduceLauncher extends L
MRCompiler comp = new MRCompiler(php, pc);
comp.randomizeFileLocalizer();
comp.compile();
+ comp.connectScalars();
MROperPlan plan = comp.getMRPlan();
//display the warning message(s) from the MRCompiler
Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceOper.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceOper.java?rev=982423&r1=982422&r2=982423&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceOper.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceOper.java Wed Aug 4 22:29:46 2010
@@ -24,6 +24,7 @@ import java.util.Set;
import org.apache.pig.impl.plan.OperatorKey;
import org.apache.pig.impl.plan.NodeIdGenerator;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MROpPlanVisitor;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POUnion;
import org.apache.pig.impl.plan.Operator;
@@ -105,6 +106,8 @@ public class MapReduceOper extends Opera
public Set<String> UDFs;
+ public Set<PhysicalOperator> scalars;
+
// Indicates if a UDF comparator is used
boolean isUDFComparatorUsed = false;
@@ -158,6 +161,7 @@ public class MapReduceOper extends Opera
combinePlan = new PhysicalPlan();
reducePlan = new PhysicalPlan();
UDFs = new HashSet<String>();
+ scalars = new HashSet<PhysicalOperator>();
nig = NodeIdGenerator.getGenerator();
scope = k.getScope();
}
Added: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/plans/ScalarPhyFinder.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/plans/ScalarPhyFinder.java?rev=982423&view=auto
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/plans/ScalarPhyFinder.java (added)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/plans/ScalarPhyFinder.java Wed Aug 4 22:29:46 2010
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POUserFunc;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
+import org.apache.pig.impl.plan.DepthFirstWalker;
+import org.apache.pig.impl.plan.VisitorException;
+
+public class ScalarPhyFinder extends PhyPlanVisitor {
+
+ List<PhysicalOperator> scalars = new ArrayList<PhysicalOperator>();
+
+ public ScalarPhyFinder(PhysicalPlan plan) {
+ super(plan, new DepthFirstWalker<PhysicalOperator, PhysicalPlan>(plan));
+ }
+
+ public List<PhysicalOperator> getScalars() {
+ return scalars;
+ }
+
+ @Override
+ public void visitUserFunc(POUserFunc userFunc) throws VisitorException {
+ if(userFunc.getReferencedOperator() != null) {
+ scalars.add(userFunc.getReferencedOperator());
+ }
+ }
+}
Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/LogToPhyTranslationVisitor.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/LogToPhyTranslationVisitor.java?rev=982423&r1=982422&r2=982423&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/LogToPhyTranslationVisitor.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/LogToPhyTranslationVisitor.java Wed Aug 4 22:29:46 2010
@@ -72,6 +72,8 @@ public class LogToPhyTranslationVisitor
protected NodeIdGenerator nodeGen = NodeIdGenerator.getGenerator();
protected PigContext pc;
+
+ protected Map<PhysicalOperator, LogicalOperator> scalarAliasMap = new HashMap<PhysicalOperator, LogicalOperator>();
public LogToPhyTranslationVisitor(LogicalPlan plan) {
super(plan, new DependencyOrderWalker<LogicalOperator, LogicalPlan>(
@@ -81,7 +83,12 @@ public class LogToPhyTranslationVisitor
currentPlan = new PhysicalPlan();
logToPhyMap = new HashMap<LogicalOperator, PhysicalOperator>();
}
-
+
+ public void finish() {
+ for(PhysicalOperator physOp: scalarAliasMap.keySet()) {
+ ((POUserFunc)physOp).setReferencedOperator(logToPhyMap.get(scalarAliasMap.get(physOp)));
+ }
+ }
public void setPigContext(PigContext pc) {
this.pc = pc;
}
@@ -1606,6 +1613,11 @@ public class LogToPhyTranslationVisitor
}
}
logToPhyMap.put(func, p);
+
+ // We need to track all the scalars
+ if(func.getImplicitReferencedOperator() != null) {
+ scalarAliasMap.put(p, func.getImplicitReferencedOperator());
+ }
}
@@ -1651,6 +1663,7 @@ public class LogToPhyTranslationVisitor
store.setInputSpec(loStore.getInputSpec());
store.setSignature(loStore.getSignature());
store.setSortInfo(loStore.getSortInfo());
+ store.setIsTmpStore(loStore.isTmpStore());
try {
// create a new schema for ourselves so that when
// we serialize we are not serializing objects that
Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POUserFunc.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POUserFunc.java?rev=982423&r1=982422&r2=982423&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POUserFunc.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POUserFunc.java Wed Aug 4 22:29:46 2010
@@ -44,6 +44,7 @@ import org.apache.pig.data.DataType;
import org.apache.pig.data.Tuple;
import org.apache.pig.data.TupleFactory;
import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.logicalLayer.LogicalOperator;
import org.apache.pig.impl.logicalLayer.schema.Schema;
import org.apache.pig.impl.plan.NodeIdGenerator;
import org.apache.pig.impl.plan.OperatorKey;
@@ -66,7 +67,15 @@ public class POUserFunc extends Expressi
private boolean initialized = false;
private MonitoredUDFExecutor executor = null;
+ private PhysicalOperator referencedOperator = null;
+ public PhysicalOperator getReferencedOperator() {
+ return referencedOperator;
+ }
+
+ public void setReferencedOperator(PhysicalOperator referencedOperator) {
+ this.referencedOperator = referencedOperator;
+ }
public POUserFunc(OperatorKey k, int rp, List<PhysicalOperator> inp) {
super(k, rp);
Added: hadoop/pig/trunk/src/org/apache/pig/impl/builtin/ReadScalars.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/impl/builtin/ReadScalars.java?rev=982423&view=auto
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/impl/builtin/ReadScalars.java (added)
+++ hadoop/pig/trunk/src/org/apache/pig/impl/builtin/ReadScalars.java Wed Aug 4 22:29:46 2010
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.impl.builtin;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+
+import org.apache.pig.EvalFunc;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.data.DataType;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.io.FileLocalizer;
+import org.apache.pig.impl.logicalLayer.schema.Schema;
+
+/**
+ * ReadScalars reads a line from a file and returns it as its value. The
+ * file is only read once, and the same line is returned over and over again.
+ * This is useful for incorporating a result from an agregation into another
+ * evaluation.
+ */
+public class ReadScalars extends EvalFunc<String> {
+ private String scalarfilename = null;
+ private String charset = "UTF-8";
+ private String value = null;
+
+ /**
+ * Java level API
+ *
+ * @param input
+ * expects a single constant that is the name of the file to be
+ * read
+ */
+ @Override
+ public String exec(Tuple input) throws IOException {
+ if (value == null) {
+ if (input == null || input.size() == 0)
+ return null;
+
+ InputStream is;
+ BufferedReader reader;
+ int pos;
+ try {
+ pos = DataType.toInteger(input.get(0));
+ scalarfilename = DataType.toString(input.get(1));
+
+ is = FileLocalizer.openDFSFile(scalarfilename);
+ reader = new BufferedReader(new InputStreamReader(is, charset));
+ } catch (Exception e) {
+ throw new ExecException("Failed to open file '" + scalarfilename
+ + "'; error = " + e.getMessage());
+ }
+ try {
+ String line = reader.readLine();
+ if(line == null) {
+ log.warn("No scalar field to read, returning null");
+ return null;
+ }
+ String[] lineTok = line.split("\t");
+ if(pos > lineTok.length) {
+ log.warn("No scalar field to read, returning null");
+ return null;
+ }
+ value = lineTok[pos];
+ if(reader.readLine() != null) {
+ throw new ExecException("Scalar has more than one row in the output");
+ }
+ } catch (Exception e) {
+ throw new ExecException(e.getMessage());
+ } finally {
+ reader.close();
+ is.close();
+ }
+ }
+ return value;
+ }
+
+ @Override
+ public Schema outputSchema(Schema input) {
+ return new Schema(new Schema.FieldSchema(getSchemaName("ReadScalars", input),
+ DataType.CHARARRAY));
+ }
+}
Modified: hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOStore.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOStore.java?rev=982423&r1=982422&r2=982423&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOStore.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOStore.java Wed Aug 4 22:29:46 2010
@@ -48,7 +48,16 @@ public class LOStore extends RelationalO
transient private StoreFuncInterface mStoreFunc;
private static Log log = LogFactory.getLog(LOStore.class);
+ private boolean isTmpStore;
+ public boolean isTmpStore() {
+ return isTmpStore;
+ }
+
+ public void setTmpStore(boolean isTmpStore) {
+ this.isTmpStore = isTmpStore;
+ }
+
private SortInfo sortInfo;
public SortInfo getSortInfo() {
@@ -72,6 +81,7 @@ public class LOStore extends RelationalO
super(plan, key);
mOutputFile = outputFileSpec;
+ isTmpStore = false;
// TODO
// The code below is commented out as PigContext pulls in
Modified: hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOUserFunc.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOUserFunc.java?rev=982423&r1=982422&r2=982423&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOUserFunc.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOUserFunc.java Wed Aug 4 22:29:46 2010
@@ -25,16 +25,15 @@ import org.apache.pig.FuncSpec;
import org.apache.pig.PigException;
import org.apache.pig.impl.PigContext;
import org.apache.pig.impl.plan.VisitorException;
-import org.apache.pig.impl.logicalLayer.parser.ParseException;
import org.apache.pig.impl.logicalLayer.schema.Schema;
import org.apache.pig.data.DataType;
import org.apache.pig.impl.plan.OperatorKey;
-import org.apache.pig.impl.plan.PlanVisitor;
public class LOUserFunc extends ExpressionOperator {
private static final long serialVersionUID = 2L;
private FuncSpec mFuncSpec;
+ private LogicalOperator implicitReferencedOperator = null;
/**
* @param plan
@@ -56,6 +55,15 @@ public class LOUserFunc extends Expressi
public FuncSpec getFuncSpec() {
return mFuncSpec;
}
+
+ public LogicalOperator getImplicitReferencedOperator() {
+ return implicitReferencedOperator;
+ }
+
+ public void setImplicitReferencedOperator(
+ LogicalOperator implicitReferencedOperator) {
+ this.implicitReferencedOperator = implicitReferencedOperator;
+ }
public List<ExpressionOperator> getArguments() {
List<LogicalOperator> preds = getPlan().getPredecessors(this);
Added: hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/ScalarFinder.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/ScalarFinder.java?rev=982423&view=auto
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/ScalarFinder.java (added)
+++ hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/ScalarFinder.java Wed Aug 4 22:29:46 2010
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.impl.logicalLayer;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.pig.impl.plan.DepthFirstWalker;
+import org.apache.pig.impl.plan.VisitorException;
+
+public class ScalarFinder extends LOVisitor {
+
+ Map<LOUserFunc, LogicalPlan> mScalarMap = new HashMap<LOUserFunc, LogicalPlan>();
+
+ /**
+ * @param plan
+ * logical plan to query the presence of Scalars
+ */
+ public ScalarFinder(LogicalPlan plan) {
+ super(plan, new DepthFirstWalker<LogicalOperator, LogicalPlan>(plan));
+ }
+
+ @Override
+ protected void visit(LOUserFunc scalar) throws VisitorException {
+ if(scalar.getImplicitReferencedOperator() != null) {
+ mScalarMap.put(scalar, mCurrentWalker.getPlan());
+ }
+ }
+
+ /**
+ * @return Map of scalar operators found in the plan
+ */
+ public Map<LOUserFunc, LogicalPlan> getScalarMap() {
+ return mScalarMap;
+ }
+
+}
Modified: hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt?rev=982423&r1=982422&r2=982423&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt (original)
+++ hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt Wed Aug 4 22:29:46 2010
@@ -76,6 +76,7 @@ import org.apache.pig.backend.executione
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.Partitioner;
import org.apache.pig.impl.util.LinkedMultiMap;
+import org.apache.pig.impl.builtin.ReadScalars;
public class QueryParser {
private PigContext pigContext;
@@ -87,6 +88,7 @@ public class QueryParser {
private Map<String, LogicalOperator> mapAliasOp;
private static Log log = LogFactory.getLog(QueryParser.class);
private boolean bracketed = false;
+ private boolean scalarFound = false;
private Map<String, String> fileNameMap;
private long getNextId() {
@@ -3333,6 +3335,8 @@ ExpressionOperator BaseEvalSpec(Schema o
Schema subSchema = null;
Token t;
String key;
+ String nextTok;
+ scalarFound = false;
log.trace("Entering BaseEvalSpec");
}
{
@@ -3352,12 +3356,20 @@ ExpressionOperator BaseEvalSpec(Schema o
if(null != fs) {
subSchema = fs.schema;
}
+ if(item instanceof LOUserFunc && ((LOUserFunc)item).getImplicitReferencedOperator() != null) {
+ subSchema = ((LOUserFunc)item).getImplicitReferencedOperator().getSchema();
+ }
log.debug("subSchema: " + subSchema);
}
(
"." projection = BracketedSimpleProj(subSchema,lp,item)
{
- assertAtomic(item,false);
+ if(item instanceof LOUserFunc && ((LOUserFunc)item).getImplicitReferencedOperator() != null) {
+ assertAtomic(item,true);
+ }
+ else {
+ assertAtomic(item,false);
+ }
item = projection;
}
)
@@ -3374,7 +3386,15 @@ ExpressionOperator BaseEvalSpec(Schema o
)*
)
)
- {log.trace("Exiting BaseEvalSpec"); return item;}
+ {
+ log.trace("Exiting BaseEvalSpec");
+ // Validate Scalar here - scalarFound needs to be false here
+ // We set scalarFound when we find it and then we reset it when we resolve projection
+ if(scalarFound) {
+ throw new ParseException("Scalars can be only used with projections");
+ }
+ return item;
+ }
}
@@ -4165,6 +4185,36 @@ ExpressionOperator DollarVar(Schema over
throw new ParseException("Out of bound access. Trying to access non-existent column: " + colNum + ". Schema " + over + " has " + over.size() + " column(s).");
}
}
+ // Scalar Projections
+ if(bracketed && eOp instanceof LOUserFunc && ((LOUserFunc)eOp).getImplicitReferencedOperator() != null) {
+ try {
+ scalarFound = false;
+ // Projections decides type of scalar, we need to add a Cast operator to track that
+ LOCast loCast = null;
+ if(over != null) {
+ if(over.getField(colNum).type != DataType.BYTEARRAY) {
+ loCast = new LOCast(lp, new OperatorKey(scope, getNextId()), over.getField(colNum).type);
+ }
+ }
+ if(loCast == null){
+ // Default type is chararray not bytearray for ReadScalar, as it reads string from the file
+ loCast = new LOCast(lp, new OperatorKey(scope, getNextId()), DataType.CHARARRAY);
+ }
+ lp.add(loCast);
+ lp.connect(eOp, loCast);
+
+ // We also need to attach LOConst to the userfunc
+ // so that it can read that projection number in ReadScalars UDF
+ LOConst rconst = new LOConst(lp, new OperatorKey(scope, getNextId()), colNum);
+ rconst.setType(DataType.INTEGER);
+ lp.add(rconst);
+ lp.connect(rconst, eOp);
+
+ return loCast;
+ } catch(Exception e) {
+ throw new ParseException("Invalid field in scalar" + e);
+ }
+ }
ExpressionOperator project = new LOProject(lp, new OperatorKey(scope, getNextId()), eOp, undollar(t1.image));
try {
log.debug("eOp: " + eOp.getClass().getName() + " " + eOp);
@@ -4247,14 +4297,39 @@ ExpressionOperator AliasFieldOrSpec(Sche
log.debug("item == null");
if (null == over) log.debug("over is null");
try {
- if ( over == null || (i = over.getPosition(t1.image)) == -1) {
- log.debug("Invalid alias: " + t1.image + " in " + over);
- if(null != over) {
- log.debug("Printing out the aliases in the schema");
- over.printAliases();
- }
- throw new ParseException("Invalid alias: " + t1.image + " in " + over);
- }
+ if ( over == null || (i = over.getPosition(t1.image)) == -1) {
+ // We also support Scalar aliases, so we make a check
+ // whether the alias is a scalar alias and construct
+ // the plan to use this scalar alias
+
+ // We achieve this by storing the scalar alias into a temp directory
+ // and then we retrieve it using ReadScalars UDF. But, it should be noted
+ // that this adds an implicit dependency on the scalar alias, as we need
+ // it to be stored before UDF tries to read it.
+
+ LogicalOperator aliasOp = getOp(t1.image);
+ // for bracketed expression we do not resolve scalar
+ if(!bracketed && aliasOp != null) {
+ scalarFound = true;
+ Schema scalarSchema = aliasOp.getSchema();
+ log.debug("Scalar alias: " + t1.image + " found");
+
+ // We check whether we already have a scalar logical operator in the plan already
+ item = new LOUserFunc(lp, new OperatorKey(scope, getNextId()),
+ new FuncSpec(ReadScalars.class.getName()), DataType.CHARARRAY);
+ ((LOUserFunc)item).setImplicitReferencedOperator(aliasOp);
+
+ lp.add(item);
+ log.trace("Exiting AliasFieldOrSpec");
+ return item;
+ }
+ log.debug("Invalid alias: " + t1.image + " in " + over);
+ if(null != over) {
+ log.debug("Printing out the aliases in the schema");
+ over.printAliases();
+ }
+ throw new ParseException("Invalid alias: " + t1.image + " in " + over);
+ }
} catch (FrontendException fee) {
ParseException pe = new ParseException(fee.getMessage());
pe.initCause(fee);
@@ -4264,7 +4339,34 @@ ExpressionOperator AliasFieldOrSpec(Sche
if(null != over) {
log.debug("Printing out the aliases in the schema");
over.printAliases();
- }
+ }
+ // Scalar Projections
+ if(bracketed && eOp instanceof LOUserFunc && ((LOUserFunc)eOp).getImplicitReferencedOperator() != null) {
+ try {
+ scalarFound = false;
+ // Projections decides type of scalar, we need to add a Cast operator to track that
+ LOCast loCast;
+ if(over.getField(i).type != DataType.BYTEARRAY) {
+ loCast = new LOCast(lp, new OperatorKey(scope, getNextId()), over.getField(i).type);
+ } else {
+ // Default type is chararray not bytearray for ReadScalar, as it reads string from the file
+ loCast = new LOCast(lp, new OperatorKey(scope, getNextId()), DataType.CHARARRAY);
+ }
+ lp.add(loCast);
+ lp.connect(eOp, loCast);
+
+ // We also need to attach LOConst to the userfunc
+ // so that it can read that projection number in ReadScalars UDF
+ LOConst rconst = new LOConst(lp, new OperatorKey(scope, getNextId()), i);
+ rconst.setType(DataType.INTEGER);
+ lp.add(rconst);
+ lp.connect(rconst, eOp);
+
+ return loCast;
+ } catch(Exception e) {
+ throw new ParseException("Invalid field in scalar" + e);
+ }
+ }
item = new LOProject(lp, new OperatorKey(scope, getNextId()), eOp, i);
item.setAlias(t1.image);
try {
Modified: hadoop/pig/trunk/test/findbugsExcludeFile.xml
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/test/findbugsExcludeFile.xml?rev=982423&r1=982422&r2=982423&view=diff
==============================================================================
--- hadoop/pig/trunk/test/findbugsExcludeFile.xml (original)
+++ hadoop/pig/trunk/test/findbugsExcludeFile.xml Wed Aug 4 22:29:46 2010
@@ -412,4 +412,9 @@
<Method name = "init" />
<Bug pattern="OBL_UNSATISFIED_OBLIGATION" />
</Match>
+ <Match>
+ <Class name = "org.apache.pig.impl.builtin.ReadScalars" />
+ <Method name = "exec" />
+ <Bug pattern= "RV_DONT_JUST_NULL_CHECK_READLINE" />
+ </Match>
</FindBugsFilter>
Added: hadoop/pig/trunk/test/org/apache/pig/test/TestScalarAliases.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/TestScalarAliases.java?rev=982423&view=auto
==============================================================================
--- hadoop/pig/trunk/test/org/apache/pig/test/TestScalarAliases.java (added)
+++ hadoop/pig/trunk/test/org/apache/pig/test/TestScalarAliases.java Wed Aug 4 22:29:46 2010
@@ -0,0 +1,515 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.test;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.Random;
+
+import junit.framework.TestCase;
+
+import org.apache.pig.ExecType;
+import org.apache.pig.PigServer;
+import org.apache.pig.data.BagFactory;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
+import org.apache.pig.impl.io.FileLocalizer;
+import org.apache.pig.impl.logicalLayer.FrontendException;
+import org.apache.pig.impl.logicalLayer.validators.TypeCheckerException;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestScalarAliases extends TestCase {
+ static MiniCluster cluster = MiniCluster.buildCluster();
+ private PigServer pigServer;
+
+ TupleFactory mTf = TupleFactory.getInstance();
+ BagFactory mBf = BagFactory.getInstance();
+
+ @Before
+ @Override
+ public void setUp() throws Exception{
+ FileLocalizer.setR(new Random());
+ pigServer = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
+ }
+
+ @AfterClass
+ public static void oneTimeTearDown() throws Exception {
+ cluster.shutDown();
+ }
+
+ // See PIG-1434
+ @Test
+ public void testScalarAliasesBatchNobatch() throws Exception{
+ String[] input = {
+ "1\t5",
+ "2\t10",
+ "3\t20"
+ };
+
+ // Test the use of scalars in expressions
+ Util.createInputFile(cluster, "table_testScalarAliasesBatch", input);
+ // Test in script mode
+ pigServer.setBatchOn();
+ pigServer.registerQuery("A = LOAD 'table_testScalarAliasesBatch' as (a0: long, a1: double);");
+ pigServer.registerQuery("B = group A all;");
+ pigServer.registerQuery("C = foreach B generate COUNT(A) as count, MAX(A.$1) as max;");
+ pigServer.registerQuery("Y = foreach A generate (a0 * C.count), (a1 / C.max);");
+ pigServer.registerQuery("Store Y into 'table_testScalarAliasesDir';");
+ pigServer.executeBatch();
+ // Check output
+ pigServer.registerQuery("Z = LOAD 'table_testScalarAliasesDir' as (a0: int, a1: double);");
+
+ Iterator<Tuple> iter;
+ Tuple t;
+ iter = pigServer.openIterator("Z");
+
+ t = iter.next();
+ assertTrue(t.toString().equals("(3,0.25)"));
+
+ t = iter.next();
+ assertTrue(t.toString().equals("(6,0.5)"));
+
+ t = iter.next();
+ assertTrue(t.toString().equals("(9,1.0)"));
+
+ assertFalse(iter.hasNext());
+
+ iter = pigServer.openIterator("Y");
+
+ t = iter.next();
+ assertTrue(t.toString().equals("(3,0.25)"));
+
+ t = iter.next();
+ assertTrue(t.toString().equals("(6,0.5)"));
+
+ t = iter.next();
+ assertTrue(t.toString().equals("(9,1.0)"));
+
+ assertFalse(iter.hasNext());
+ }
+
+ // See PIG-1434
+ @Test
+ public void testUseScalarMultipleTimes() throws Exception{
+ String[] input = {
+ "1\t5",
+ "2\t10",
+ "3\t20"
+ };
+
+ // Test the use of scalars in expressions
+ Util.createInputFile(cluster, "table_testUseScalarMultipleTimes", input);
+ pigServer.setBatchOn();
+ pigServer.registerQuery("A = LOAD 'table_testUseScalarMultipleTimes' as (a0: long, a1: double);");
+ pigServer.registerQuery("B = group A all;");
+ pigServer.registerQuery("C = foreach B generate COUNT(A) as count, MAX(A.$1) as max;");
+ pigServer.registerQuery("Y = foreach A generate (a0 * C.count), (a1 / C.max);");
+ pigServer.registerQuery("Store Y into 'table_testUseScalarMultipleTimesOutY';");
+ pigServer.registerQuery("Z = foreach A generate (a1 + C.count), (a0 * C.max);");
+ pigServer.registerQuery("Store Z into 'table_testUseScalarMultipleTimesOutZ';");
+ // Test Multiquery store
+ pigServer.executeBatch();
+
+ // Check output
+ pigServer.registerQuery("M = LOAD 'table_testUseScalarMultipleTimesOutY' as (a0: int, a1: double);");
+
+ Iterator<Tuple> iter;
+ Tuple t;
+ iter = pigServer.openIterator("M");
+
+ t = iter.next();
+ assertTrue(t.toString().equals("(3,0.25)"));
+
+ t = iter.next();
+ assertTrue(t.toString().equals("(6,0.5)"));
+
+ t = iter.next();
+ assertTrue(t.toString().equals("(9,1.0)"));
+
+ assertFalse(iter.hasNext());
+
+ // Check output
+ pigServer.registerQuery("N = LOAD 'table_testUseScalarMultipleTimesOutZ' as (a0: double, a1: double);");
+
+ iter = pigServer.openIterator("N");
+
+ t = iter.next();
+ assertTrue(t.toString().equals("(8.0,20.0)"));
+
+ t = iter.next();
+ assertTrue(t.toString().equals("(13.0,40.0)"));
+
+ t = iter.next();
+ assertTrue(t.toString().equals("(23.0,60.0)"));
+
+ assertFalse(iter.hasNext());
+
+ // Non batch mode
+ iter = pigServer.openIterator("Y");
+
+ t = iter.next();
+ assertTrue(t.toString().equals("(3,0.25)"));
+
+ t = iter.next();
+ assertTrue(t.toString().equals("(6,0.5)"));
+
+ t = iter.next();
+ assertTrue(t.toString().equals("(9,1.0)"));
+
+ assertFalse(iter.hasNext());
+
+ // Check in non-batch mode
+ iter = pigServer.openIterator("Z");
+
+ t = iter.next();
+ assertTrue(t.toString().equals("(8.0,20.0)"));
+
+ t = iter.next();
+ assertTrue(t.toString().equals("(13.0,40.0)"));
+
+ t = iter.next();
+ assertTrue(t.toString().equals("(23.0,60.0)"));
+
+ assertFalse(iter.hasNext());
+ }
+
+ // See PIG-1434
+ @Test
+ public void testScalarWithNoSchema() throws Exception{
+ String[] scalarInput = {
+ "1\t5"
+ };
+ String[] input = {
+ "1\t5",
+ "2\t10",
+ "3\t20"
+ };
+ Util.createInputFile(cluster, "table_testScalarWithNoSchema", input);
+ Util.createInputFile(cluster, "table_testScalarWithNoSchemaScalar", scalarInput);
+ // Load A as a scalar
+ pigServer.registerQuery("A = LOAD 'table_testScalarWithNoSchema';");
+ pigServer.registerQuery("scalar = LOAD 'table_testScalarWithNoSchemaScalar' as (count, total);");
+ pigServer.registerQuery("B = foreach A generate 5 / scalar.total;");
+
+ try {
+ pigServer.openIterator("B");
+ fail("We do not support no schema scalar without a cast");
+ } catch (FrontendException te) {
+ // In alias B, incompatible types in Division Operator left hand side:int right hand side:chararray
+ assertTrue(((TypeCheckerException)te.getCause().getCause().getCause()).getErrorCode() == 1039);
+ }
+
+ pigServer.registerQuery("C = foreach A generate 5 / (int)scalar.total;");
+
+ Iterator<Tuple> iter = pigServer.openIterator("C");
+
+ Tuple t = iter.next();
+ assertTrue(t.get(0).toString().equals("1"));
+
+ t = iter.next();
+ assertTrue(t.get(0).toString().equals("1"));
+
+ t = iter.next();
+ assertTrue(t.get(0).toString().equals("1"));
+
+ assertFalse(iter.hasNext());
+
+ }
+
+ // See PIG-1434
+ @Test
+ public void testScalarWithTwoBranches() throws Exception{
+ String[] inputA = {
+ "1\t5",
+ "2\t10",
+ "3\t20"
+ };
+
+ String[] inputX = {
+ "pig",
+ "hadoop",
+ "rocks"
+ };
+
+ // Test the use of scalars in expressions
+ Util.createInputFile(cluster, "testScalarWithTwoBranchesA", inputA);
+ Util.createInputFile(cluster, "testScalarWithTwoBranchesX", inputX);
+ // Test in script mode
+ pigServer.setBatchOn();
+ pigServer.registerQuery("A = LOAD 'testScalarWithTwoBranchesA' as (a0: long, a1: double);");
+ pigServer.registerQuery("B = group A all;");
+ pigServer.registerQuery("C = foreach B generate COUNT(A) as count, MAX(A.$1) as max;");
+ pigServer.registerQuery("X = LOAD 'testScalarWithTwoBranchesX' as (names: chararray);");
+ pigServer.registerQuery("Y = foreach X generate names, C.max;");
+ pigServer.registerQuery("Store Y into 'testScalarWithTwoBranchesDir';");
+ pigServer.executeBatch();
+ // Check output
+ pigServer.registerQuery("Z = LOAD 'testScalarWithTwoBranchesDir' as (a0: chararray, a1: double);");
+
+ Iterator<Tuple> iter = pigServer.openIterator("Z");
+
+ Tuple t = iter.next();
+ assertTrue(t.toString().equals("(pig,20.0)"));
+
+ t = iter.next();
+ assertTrue(t.toString().equals("(hadoop,20.0)"));
+
+ t = iter.next();
+ assertTrue(t.toString().equals("(rocks,20.0)"));
+
+ assertFalse(iter.hasNext());
+
+ // Check in non-batch mode
+ iter = pigServer.openIterator("Y");
+
+ t = iter.next();
+ assertTrue(t.toString().equals("(pig,20.0)"));
+
+ t = iter.next();
+ assertTrue(t.toString().equals("(hadoop,20.0)"));
+
+ t = iter.next();
+ assertTrue(t.toString().equals("(rocks,20.0)"));
+
+ assertFalse(iter.hasNext());
+ }
+
+ // See PIG-1434
+ @Test
+ public void testFilteredScalarDollarProj() throws Exception{
+ String[] input = {
+ "1\t5",
+ "2\t10",
+ "3\t20"
+ };
+
+ // Test the use of scalars in expressions
+ Util.createInputFile(cluster, "table_testFilteredScalarDollarProj", input);
+ // Test in script mode
+ pigServer.setBatchOn();
+ pigServer.registerQuery("A = LOAD 'table_testFilteredScalarDollarProj' as (a0: long, a1: double);");
+ pigServer.registerQuery("B = filter A by $1 < 8;");
+ pigServer.registerQuery("Y = foreach A generate (a0 * B.$0), (a1 / B.$1);");
+ pigServer.registerQuery("Store Y into 'table_testFilteredScalarDollarProjDir';");
+ pigServer.executeBatch();
+ // Check output
+ pigServer.registerQuery("Z = LOAD 'table_testFilteredScalarDollarProjDir' as (a0: int, a1: double);");
+
+ Iterator<Tuple> iter = pigServer.openIterator("Z");
+
+ Tuple t = iter.next();
+ assertTrue(t.toString().equals("(1,1.0)"));
+
+ t = iter.next();
+ assertTrue(t.toString().equals("(2,2.0)"));
+
+ t = iter.next();
+ assertTrue(t.toString().equals("(3,4.0)"));
+
+ assertFalse(iter.hasNext());
+
+ // Check in non-batch mode
+ iter = pigServer.openIterator("Y");
+
+ t = iter.next();
+ assertTrue(t.toString().equals("(1,1.0)"));
+
+ t = iter.next();
+ assertTrue(t.toString().equals("(2,2.0)"));
+
+ t = iter.next();
+ assertTrue(t.toString().equals("(3,4.0)"));
+
+ assertFalse(iter.hasNext());
+
+ }
+
+ // See PIG-1434
+ @Test
+ public void testScalarWithNoSchemaDollarProj() throws Exception{
+ String[] scalarInput = {
+ "1\t5"
+ };
+ String[] input = {
+ "1\t5",
+ "2\t10",
+ "3\t20"
+ };
+ Util.createInputFile(cluster, "table_testScalarWithNoSchemaDollarProj", input);
+ Util.createInputFile(cluster, "table_testScalarWithNoSchemaDollarProjScalar", scalarInput);
+ // Load A as a scalar
+ pigServer.registerQuery("A = LOAD 'table_testScalarWithNoSchemaDollarProj';");
+ pigServer.registerQuery("scalar = LOAD 'table_testScalarWithNoSchemaDollarProjScalar';");
+ pigServer.registerQuery("B = foreach A generate 5 / scalar.$1;");
+
+ try {
+ pigServer.openIterator("B");
+ fail("We do not support no schema scalar without a cast");
+ } catch (FrontendException te) {
+ // In alias B, incompatible types in Division Operator left hand side:int right hand side:chararray
+ assertTrue(((TypeCheckerException)te.getCause().getCause().getCause()).getErrorCode() == 1039);
+ }
+
+ pigServer.registerQuery("C = foreach A generate 5 / (int)scalar.$1;");
+
+ Iterator<Tuple> iter = pigServer.openIterator("C");
+
+ Tuple t = iter.next();
+ assertTrue(t.get(0).toString().equals("1"));
+
+ t = iter.next();
+ assertTrue(t.get(0).toString().equals("1"));
+
+ t = iter.next();
+ assertTrue(t.get(0).toString().equals("1"));
+
+ assertFalse(iter.hasNext());
+
+ }
+
+ // See PIG-1434
+ @Test
+ public void testScalarAliasesJoinClause() throws Exception{
+ String[] inputA = {
+ "1\t5",
+ "2\t10",
+ "3\t20"
+ };
+ String[] inputB = {
+ "Total3\tthree",
+ "Total2\ttwo",
+ "Total1\tone"
+ };
+
+ // Test the use of scalars in expressions
+ Util.createInputFile(cluster, "table_testScalarAliasesJoinClauseA", inputA);
+ Util.createInputFile(cluster, "table_testScalarAliasesJoinClauseB", inputB);
+ // Test in script mode
+ pigServer.registerQuery("A = LOAD 'table_testScalarAliasesJoinClauseA' as (a0, a1);");
+ pigServer.registerQuery("G = group A all;");
+ pigServer.registerQuery("C = foreach G generate COUNT(A) as count;");
+
+ pigServer.registerQuery("B = LOAD 'table_testScalarAliasesJoinClauseB' as (b0:chararray, b1:chararray);");
+ pigServer.registerQuery("Y = join A by CONCAT('Total', (chararray)C.count), B by $0;");
+
+ Iterator<Tuple> iter = pigServer.openIterator("Y");
+
+ Tuple t = iter.next();
+ assertTrue(t.toString().equals("(1,5,Total3,three)"));
+
+ t = iter.next();
+ assertTrue(t.toString().equals("(2,10,Total3,three)"));
+
+ t = iter.next();
+ assertTrue(t.toString().equals("(3,20,Total3,three)"));
+
+ assertFalse(iter.hasNext());
+ }
+
+ // See PIG-1434
+ @Test
+ public void testScalarAliasesFilterClause() throws Exception{
+ String[] input = {
+ "1\t5",
+ "2\t10",
+ "3\t20",
+ "4\t12",
+ "5\t8"
+ };
+
+ // Test the use of scalars in expressions
+ Util.createInputFile(cluster, "table_testScalarAliasesFilterClause", input);
+ // Test in script mode
+ pigServer.registerQuery("A = LOAD 'table_testScalarAliasesFilterClause' as (a0, a1);");
+ pigServer.registerQuery("G = group A all;");
+ pigServer.registerQuery("C = foreach G generate AVG(A.$1) as average;");
+
+ pigServer.registerQuery("Y = filter A by a1 > C.average;");
+
+ Iterator<Tuple> iter = pigServer.openIterator("Y");
+
+ // Average is 11
+ Tuple t = iter.next();
+ assertTrue(t.toString().equals("(3,20)"));
+
+ t = iter.next();
+ assertTrue(t.toString().equals("(4,12)"));
+
+ assertFalse(iter.hasNext());
+ }
+
+ // See PIG-1434
+ @Test
+ public void testScalarAliasesSplitClause() throws Exception{
+ String[] input = {
+ "1\t5",
+ "2\t10",
+ "3\t20"
+ };
+
+ // Test the use of scalars in expressions
+ Util.createInputFile(cluster, "table_testScalarAliasesSplitClause", input);
+ // Test in script mode
+ pigServer.setBatchOn();
+ pigServer.registerQuery("A = LOAD 'table_testScalarAliasesSplitClause' as (a0: long, a1: double);");
+ pigServer.registerQuery("B = group A all;");
+ pigServer.registerQuery("C = foreach B generate COUNT(A) as count;");
+ pigServer.registerQuery("split A into Y if (2 * C.count) < a1, X if a1 == 5;");
+ pigServer.registerQuery("Store Y into 'table_testScalarAliasesSplitClauseDir';");
+ pigServer.executeBatch();
+ // Check output
+ pigServer.registerQuery("Z = LOAD 'table_testScalarAliasesSplitClauseDir' as (a0: int, a1: double);");
+
+ Iterator<Tuple> iter = pigServer.openIterator("Z");
+
+ // Y gets only last 2 elements
+ Tuple t = iter.next();
+ assertTrue(t.toString().equals("(2,10.0)"));
+
+ t = iter.next();
+ assertTrue(t.toString().equals("(3,20.0)"));
+
+ assertFalse(iter.hasNext());
+ }
+
+ // See PIG-1434
+ @Test
+ public void testScalarAliasesGrammarNegative() throws Exception{
+ String[] input = {
+ "1\t5",
+ "2\t10",
+ "3\t20"
+ };
+
+ Util.createInputFile(cluster, "table_testScalarAliasesGrammar", input);
+ pigServer.registerQuery("A = LOAD 'table_testScalarAliasesGrammar' as (a0: long, a1: double);");
+ pigServer.registerQuery("B = group A all;");
+ pigServer.registerQuery("C = foreach B generate COUNT(A);");
+
+ try {
+ // Only projections of C are supported
+ pigServer.registerQuery("Y = foreach A generate C;");
+ //Control should not reach here
+ fail("Scalar projections are only supported");
+ } catch (IOException pe){
+ assertTrue(pe.getCause().getMessage().equalsIgnoreCase("Scalars can be only used with projections"));
+ }
+ }
+}