You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ga...@apache.org on 2008/05/08 23:25:54 UTC
svn commit: r654629 [3/4] - in /incubator/pig/branches/types: ./
src/org/apache/pig/impl/eval/ src/org/apache/pig/impl/logicalLayer/
src/org/apache/pig/impl/logicalLayer/parser/
src/org/apache/pig/impl/logicalLayer/schema/ src/org/apache/pig/impl/logic...
Modified: incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt?rev=654629&r1=654628&r2=654629&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt Thu May 8 14:25:22 2008
@@ -33,17 +33,18 @@
import java.util.*;
import org.apache.pig.impl.logicalLayer.*;
import org.apache.pig.impl.logicalLayer.schema.*;
-import org.apache.pig.impl.eval.*;
-import org.apache.pig.impl.eval.window.*;
-import org.apache.pig.impl.eval.cond.*;
-import org.apache.pig.*;
-import org.apache.pig.data.*;
+import org.apache.pig.data.DataType;
import org.apache.pig.impl.PigContext;
-import org.apache.pig.PigServer.ExecType;
+import org.apache.pig.ExecType;
import org.apache.pig.impl.io.*;
-import org.apache.pig.builtin.*;
-import org.apache.pig.impl.builtin.*;
+import org.apache.pig.builtin.PigStorage;
+import org.apache.pig.impl.builtin.GFAny;
import org.apache.pig.impl.logicalLayer.LogicalPlan;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.pig.impl.plan.MultiMap;
+import org.apache.pig.impl.plan.PlanException;
+
public class QueryParser {
private PigContext pigContext;
@@ -51,32 +52,33 @@
private Map<OperatorKey, LogicalOperator> opTable;
private String scope;
private NodeIdGenerator nodeIdGen;
-
+ //a map of alias to logical operator for a quick lookup
+ private Map<String, LogicalOperator> mapAliasOp;
+ private static Log log = LogFactory.getLog(QueryParser.class);
+
private long getNextId() {
return nodeIdGen.getNextNodeId(scope);
}
- public QueryParser(InputStream in,
+ public QueryParser(InputStream in,
PigContext pigContext,
String scope,
Map<String, LogicalPlan> aliases,
- Map<OperatorKey, LogicalOperator> opTable) {
+ Map<OperatorKey, LogicalOperator> opTable,
+ Map<String, LogicalOperator> aliasOp) {
this(in);
this.pigContext = pigContext;
this.aliases = aliases;
this.opTable = opTable;
this.scope = scope;
this.nodeIdGen = NodeIdGenerator.getGenerator();
+ this.mapAliasOp = aliasOp;
}
-
- public class EvalSpecAndSchema{
- EvalSpec spec;
- TupleSchema schema;
- }
-
+
public class CogroupInput {
- public OperatorKey op;
- public EvalSpec spec;
+ public LogicalOperator op;
+ public ArrayList<LogicalPlan> plans;
+ public boolean isInner;
}
private static String removeQuotes(String str) {
@@ -100,16 +102,14 @@
long storeNodeId = NodeIdGenerator.getGenerator().getNextNodeId(scope);
- LogicalOperator root = new LOStore(opTable,
- scope,
- storeNodeId,
- readFrom.getRoot(),
- new FileSpec(fileName, func),
- false);
+ LogicalOperator root = new LOStore(readFrom,
+ new OperatorKey(scope, storeNodeId),
+ new FileSpec(fileName, func));
+
- LogicalPlan storePlan = new LogicalPlan(root.getOperatorKey(), opTable, pigContext);
-
- return storePlan;
+ readFrom.add(root);
+
+ return readFrom;
}
static String unquote(String s) {
@@ -120,108 +120,136 @@
return Integer.parseInt(s.substring(1, s.length()));
}
-
-
+
String massageFilename(String filename, PigContext pigContext) throws IOException, ParseException {
if (pigContext.getExecType() != ExecType.LOCAL) {
if (filename.startsWith(FileLocalizer.LOCAL_PREFIX)) {
filename = FileLocalizer.hadoopify(filename, pigContext);
}
- else
- {
- // make sure that dfs file exists
- if (!FileLocalizer.fileExists(filename, pigContext))
- {
- throw new ParseException(FileLocalizer.fullPath(filename, pigContext) + " does not exist");
- }
- }
+ // Removed check for file existence, that will be handled later by InputOutputFileValidator
}
return filename;
}
- LogicalOperator parseCogroup(ArrayList<CogroupInput> gis) throws ParseException{
+ LogicalOperator parseCogroup(ArrayList<CogroupInput> gis, LogicalPlan lp) throws ParseException, PlanException{
+
+ log.trace("Entering parseCogroup");
+ log.debug("LogicalPlan: " + lp);
+
int n = gis.size();
+ log.debug("Number of cogroup inputs = " + n);
+
+ ArrayList<LogicalOperator> los = new ArrayList<LogicalOperator>();
+ ArrayList<ArrayList<LogicalPlan>> plans = new ArrayList<ArrayList<LogicalPlan>>();
+ MultiMap<LogicalOperator, LogicalPlan> groupByPlans = new MultiMap<LogicalOperator, LogicalPlan>();
+ //Map<LogicalOperator, LogicalPlan> groupByPlans = new HashMap<LogicalOperator, LogicalPlan>();
+ boolean[] isInner = new boolean[n];
- List<OperatorKey> los = new ArrayList<OperatorKey>();
- ArrayList<EvalSpec> specs = new ArrayList<EvalSpec>();
+ int arity = gis.get(0).plans.size();
for (int i = 0; i < n ; i++){
CogroupInput gi = gis.get(i);
los.add(gi.op);
- specs.add(gi.spec);
+ ArrayList<LogicalPlan> planList = gi.plans;
+ plans.add(gi.plans);
+ int numGrpByOps = planList.size();
+ log.debug("Number of group by operators = " + numGrpByOps);
+
+ if(arity != numGrpByOps) {
+ throw new ParseException("The arity of the group by columns do not match.");
+ }
+ for(int j = 0; j < numGrpByOps; ++j) {
+ groupByPlans.put(gi.op, planList.get(j));
+ for(LogicalOperator root: planList.get(j).getRoots()) {
+ log.debug("Cogroup input plan root: " + root);
+ }
+ }
+ isInner[i] = gi.isInner;
}
- return new LOCogroup(opTable, scope, getNextId(), los, specs);
+ LogicalOperator cogroup = new LOCogroup(lp, new OperatorKey(scope, getNextId()), los, groupByPlans, isInner);
+ lp.add(cogroup);
+ log.debug("Added operator " + cogroup.getClass().getName() + " object " + cogroup + " to the logical plan " + lp);
+
+ for(LogicalOperator op: los) {
+ lp.connect(op, cogroup);
+ log.debug("Connected operator " + op.getClass().getName() + " to " + cogroup.getClass().getName() + " in the logical plan");
+ }
+
+ log.trace("Exiting parseCogroup");
+ return cogroup;
}
-
- LogicalOperator rewriteCross(ArrayList<OperatorKey> inputs) throws IOException, ParseException{
- ArrayList<CogroupInput> gis = new ArrayList<CogroupInput>();
- int n = inputs.size();
+ /**
+ * The join operator is translated to foreach
+ */
+ LogicalOperator rewriteJoin(ArrayList<CogroupInput> gis, LogicalPlan lp) throws ParseException, PlanException{
- for (int i=0; i< n; i++){
- CogroupInput gi = new CogroupInput();
- gis.add(gi);
-
- gi.op = inputs.get(i);
-
- ArrayList<EvalSpec> argsColumns = new ArrayList<EvalSpec>();
- argsColumns.add(new ConstSpec(n+""));
- argsColumns.add(new ConstSpec(i+""));
- GenerateSpec args = new GenerateSpec(argsColumns);
- FuncEvalSpec fes = new FuncEvalSpec(pigContext, GFCross.class.getName(), args);
- fes.setFlatten(true);
- gi.spec = new GenerateSpec(fes).getGroupBySpec();
+ log.trace("Entering rewriteJoin");
+ log.debug("LogicalPlan: " + lp);
+ int n = gis.size();
+ ArrayList<ExpressionOperator> flattenedColumns = new ArrayList<ExpressionOperator>();
+ ArrayList<LogicalPlan> generatePlans = new ArrayList<LogicalPlan>();
+ ArrayList<Boolean> flattenList = new ArrayList<Boolean>();
+
+ /*
+ * Construct the projection operators required for the generate
+ * Make sure that the operators are flattened
+ */
+
+
+ LogicalPlan foreachPlan = new LogicalPlan();
+ for (int i = 0; i < n; i++) {
+ LogicalPlan projectPlan = new LogicalPlan();
+ LogicalOperator projectInput = gis.get(i).op;
+ ExpressionOperator column = new LOProject(projectPlan, new OperatorKey(scope, getNextId()), projectInput, i+1);
+ ((LOProject)column).setStar(true);
+ flattenList.add(true);
+ flattenedColumns.add(column);
+ (gis.get(i)).isInner = true;
+ projectPlan.add(column);
+ log.debug("parseCogroup: Added operator " + column.getClass().getName() + " " + column + " to logical plan " + projectPlan);
+ generatePlans.add(projectPlan);
}
-
- return rewriteJoin(gis);
- }
- LogicalOperator rewriteDistinct(OperatorKey input){
- //First group the input on *
- ArrayList<OperatorKey> inputs = new ArrayList<OperatorKey>();
- inputs.add(input);
+ //Construct the cogroup operator and add it to the logical plan
- ArrayList<EvalSpec> groupSpecs = new ArrayList<EvalSpec>();
-
- groupSpecs.add(new GenerateSpec(new StarSpec()).getGroupBySpec());
+ LogicalOperator cogroup = parseCogroup(gis, lp);
+ lp.add(cogroup);
+ log.debug("Added operator " + cogroup.getClass().getName() + " to the logical plan");
- LogicalOperator groupedInput = new LOCogroup(opTable, scope, getNextId(), inputs, groupSpecs);
-
- //then generate the flattened group
- EvalSpec projectSpec = new ProjectSpec(0);
- projectSpec.setFlatten(true);
-
- return new LOEval(opTable, scope, getNextId(), groupedInput.getOperatorKey() , new GenerateSpec(projectSpec));
- }
-
-
-
- LogicalOperator rewriteJoin(ArrayList<CogroupInput> gis) throws IOException, ParseException{
- int n = gis.size();
- ArrayList<EvalSpec> flattenedColumns = new ArrayList<EvalSpec>();
+ //Construct the generate operator from the list of projection plans
+ //Add the generate operator to the foreach logical plan
+ LogicalOperator generate = new LOGenerate(lp, new OperatorKey(scope, getNextId()), generatePlans, flattenList);
+ foreachPlan.add(generate);
+ log.debug("Added operator " + generate.getClass().getName() + " to the logical plan " + lp);
+
- for (int i = 0; i < n; i++) {
- EvalSpec column = new ProjectSpec(i+1);
- column.setFlatten(true);
- flattenedColumns.add(column);
- }
+ /*
+ * Construct the foreach operator from the foreach logical plan
+ * Add the foreach operator to the top level logical plan
+ */
+
+ LogicalOperator foreach = new LOForEach(lp, new OperatorKey(scope, getNextId()), foreachPlan);
+ lp.add(foreach);
+ log.debug("Added operator " + foreach.getClass().getName() + " to the logical plan");
+ lp.connect(cogroup, foreach);
+ log.debug("Connected operator " + cogroup.getClass().getName() + " to opeator " + foreach.getClass().getName() + " in the logical plan " + lp);
- return new LOEval(opTable, scope, getNextId(), parseCogroup(gis).getOperatorKey(),new GenerateSpec(flattenedColumns));
+ log.trace("Exiting rewriteJoin");
+ return foreach;
}
-
- void assertAtomic(EvalSpec spec, boolean desiredAtomic) throws ParseException{
+
+ void assertAtomic(LogicalOperator spec, boolean desiredAtomic) throws ParseException{
Boolean isAtomic = null;
- if (spec instanceof CompositeEvalSpec)
- spec = ((CompositeEvalSpec)spec).getSpecs().get(0);
- if ( spec instanceof ConstSpec ||
- (spec instanceof FuncEvalSpec &&
- DataType.isAtomic(DataType.findType(((FuncEvalSpec)spec).getReturnType()))))
+ if ( spec instanceof LOConst ||
+ (spec instanceof LOUserFunc &&
+ DataType.isAtomic(DataType.findType(((LOUserFunc)spec).getType()))))
isAtomic = true;
- else if (spec instanceof FuncEvalSpec)
+ else if (spec instanceof LOUserFunc)
isAtomic = false;
if (isAtomic != null && isAtomic != desiredAtomic){
@@ -232,17 +260,65 @@
}
}
- EvalSpec copyItemAndAddSpec(EvalSpec spec, EvalSpec successor) throws ParseException{
- assertAtomic(spec,false);
- spec = spec.copy(pigContext);
- return spec.addSpec(successor);
- }
-
- void addSplitOutput(LogicalPlan lp, LOSplit splitOp, String alias, Cond cond, int index){
- splitOp.addCond(cond);
- LOSplitOutput splitOut = new LOSplitOutput(opTable, scope, getNextId(), lp.getRoot(), index);
- aliases.put(alias, new LogicalPlan(splitOut.getOperatorKey(), opTable, pigContext));
+ void addSplitOutput(LogicalPlan lp, LOSplit splitOp, String alias, LogicalPlan condPlan, int index) throws PlanException{
+ LogicalOperator splitOutput = new LOSplitOutput(lp, new OperatorKey(scope, getNextId()), index);
+ splitOp.addOutput(splitOutput);
+ splitOp.addOutputAlias(alias, condPlan);
+ addAlias(alias, splitOutput);
+
+ lp.add(splitOutput);
+ log.debug("Added alias: " + splitOutput.getAlias() + " class: "
+ + splitOutput.getClass().getName() + " to the logical plan");
+
+ lp.connect(splitOp, splitOutput);
+ log.debug("Connected " + splitOp.getClass().getName() + " to class: "
+ + splitOutput.getClass().getName() + " in the logical plan");
+
+ }
+
+ void addAlias(String alias, LogicalOperator lOp) {
+ mapAliasOp.put(alias, lOp);
}
+
+ LogicalOperator getOp(String alias) {
+ return mapAliasOp.get(alias);
+ }
+
+ //BEGIN
+ //I am maintaining state about the operators that should
+ //serve as the inputs to generate in the foreach logical
+ //plan. I did not want to pass this structure around for
+ //the entire parse tree
+
+ private boolean insideGenerate = false; //to check if we are parsing inside a generate statement
+ private List<LogicalOperator> generateInputs = new ArrayList<LogicalOperator>();
+
+ boolean insideGenerate() {
+ return insideGenerate;
+ }
+
+ void setInsideGenerate(boolean b) {
+ insideGenerate = b;
+ }
+
+ List<LogicalOperator> getGenerateInputs() {
+ return generateInputs;
+ }
+
+ void resetGenerateInputs() {
+ generateInputs.clear();
+ }
+
+ void addGenerateInput(LogicalOperator op) {
+ generateInputs.add(op);
+ }
+
+ void resetGenerateState() {
+ insideGenerate = false;
+ resetGenerateInputs();
+ }
+
+ //END
}
@@ -290,15 +366,11 @@
TOKEN : { <AND : "and"> }
TOKEN : { <OR : "or"> }
TOKEN : { <NOT : "not"> }
-TOKEN : { <CONTINUOUSLY : "continuously"> }
-TOKEN : { <WINDOW : "window"> }
-TOKEN : { <SECONDS : "seconds"> }
-TOKEN : { <MINUTES : "minutes"> }
-TOKEN : { <HOURS : "hours"> }
-TOKEN : { <TUPLES : "tuples"> }
TOKEN : { <GENERATE : "generate"> }
TOKEN : { <FLATTEN : "flatten"> }
TOKEN : { <EVAL : "eval"> }
+TOKEN : { <ASC : "asc"> }
+TOKEN : { <DESC : "desc"> }
TOKEN:
{
@@ -321,129 +393,195 @@
TOKEN : { <DOLLARVAR : "$" <INTEGER> > }
// Parse is the Starting function.
-LogicalPlan Parse() : {LogicalOperator root; Token t1;}
+LogicalPlan Parse() :
+{
+ LogicalOperator root;
+ Token t1;
+ LogicalPlan lp = new LogicalPlan();
+ log.trace("Entering Parse");
+}
{
(
LOOKAHEAD(2)
- (t1 = <IDENTIFIER> "=" root = Expr() ";" {root.setAlias(t1.image);})
-| (root = Expr() ";")
-| (<SPLIT> root = SplitClause() ";")
+ (t1 = <IDENTIFIER> "=" root = Expr(lp) ";" {root.setAlias(t1.image); addAlias(t1.image, root);})
+| (root = Expr(lp) ";")
+| (<SPLIT> root = SplitClause(lp) ";")
)
- { return new LogicalPlan(root.getOperatorKey(), opTable, pigContext); }
+ {
+ if(null != root.getSchema()) {
+ log.debug("Root: " + root.getClass().getName() + " schema aliases");
+ root.getSchema().printAliases();
+ }
+
+ log.trace("Exiting Parse");
+ return lp;
+ }
}
-LogicalOperator SplitClause():
-{LogicalOperator input; Cond cond; Token alias; LOSplit splitOp; LogicalPlan lp; int i=0;}
+LogicalOperator SplitClause(LogicalPlan lp):
+{
+ LogicalOperator input;
+ ExpressionOperator cond;
+ Token alias;
+ LOSplit splitOp;
+ int index = 0;
+ LogicalPlan condPlan;
+ log.trace("Entering SplitClause");
+}
{
(
- input = NestedExpr() <INTO>
+ input = NestedExpr(lp) <INTO>
{
- splitOp = new LOSplit(opTable, scope, getNextId(), input.getOperatorKey());
- lp = new LogicalPlan(splitOp.getOperatorKey(), opTable, pigContext);
+ splitOp = new LOSplit(lp, input.getOperatorKey(), new ArrayList<LogicalOperator>(), new HashMap<String,LogicalPlan>());
+ lp.add(splitOp);
+ log.debug("Adding operator " + splitOp.getClass().getName() + " to the logical plan");
}
- alias = <IDENTIFIER> <IF> cond = PCond(input.outputSchema(),null)
+ alias = <IDENTIFIER> <IF> cond = PCond(input.getSchema(), null, condPlan = new LogicalPlan(), input)
{
- addSplitOutput(lp, splitOp, alias.image, cond, i);
- i++;
+ addSplitOutput(lp, splitOp, alias.image, condPlan, index);
+ ++index;
+ log.debug("Added splitoutput");
}
(
- "," alias = <IDENTIFIER> <IF> cond = PCond(input.outputSchema(),null)
+ "," alias = <IDENTIFIER> <IF> cond = PCond(input.getSchema(), null, condPlan = new LogicalPlan(), input)
{
- addSplitOutput(lp, splitOp, alias.image, cond, i);
- i++;
+ addSplitOutput(lp, splitOp, alias.image, condPlan, index);
+ ++index;
+ log.debug("Added splitoutput");
}
)+
)
- {return splitOp;}
+ {log.trace("Exiting SplitClause"); return splitOp;}
}
-LogicalOperator Expr() : {LogicalOperator op; TupleSchema schema;}
+LogicalOperator Expr(LogicalPlan lp) :
+{
+ LogicalOperator op;
+ Schema schema;
+ log.trace("Entering Expr");
+}
{
(
- ( op = NestedExpr() [ <AS> schema = SchemaTuple() {op.setSchema(schema);} ] )
-| op = BaseExpr()
+ ( op = NestedExpr(lp) [ <AS> "(" schema = TupleSchema() ")" {op.setSchema(schema);} ] )
+| op = BaseExpr(lp)
)
- {return op;}
+ {log.trace("Exiting Expr"); return op;}
}
-LogicalOperator NestedExpr() : {LogicalOperator op;}
+LogicalOperator NestedExpr(LogicalPlan lp) :
+{
+ LogicalOperator op;
+ ExpressionOperator eOp;
+ Map<String, LogicalOperator> specs = null;
+ log.trace("Entering NestedExpr");
+}
{
(
- (op = Alias())
-| LOOKAHEAD(2) ( "(" op = NestedExpr() ")" )
-| ( "(" op = BaseExpr() ")" )
+ (op = Alias(lp))
+| LOOKAHEAD(2) ( "(" op = NestedExpr(lp) ")" )
+| ( "(" op = BaseExpr(lp) ")" )
)
- {return op;}
+ {log.trace("Exiting NestedExpr"); return op;}
}
// A reference to an alias
-LogicalOperator Alias() : {Token t1; LogicalOperator op;}
+LogicalOperator Alias(LogicalPlan lp) :
+{
+ Token t1;
+ LogicalOperator op;
+ log.trace("Entering Alias");
+}
{
t1 = <IDENTIFIER>
{
LogicalOperator aliasOp;
String alias = t1.image;
- aliasOp = opTable.get(aliases.get(alias).getRoot());
-
+ aliasOp = getOp(alias);
if (aliasOp == null) {
throw new ParseException("Unrecognized alias " + alias);
}
+ addAlias(alias, aliasOp);
+ log.debug("Added " + alias + " to aliasOp");
+ lp.add(aliasOp);
+ log.debug("Added operator: " + aliasOp.getClass().getName() + " to the logical plan " + lp);
+ log.trace("Exiting Alias");
return aliasOp;
}
}
-
-
-
-
-LogicalOperator BaseExpr() : {LogicalOperator op; TupleSchema schema; Token t1, t2;}
+LogicalOperator BaseExpr(LogicalPlan lp) :
+{
+ LogicalOperator op;
+ Schema schema;
+ Token t1, t2;
+ Schema.FieldSchema fs;
+ log.trace("Entering BaseExpr");
+}
{
(
(
- (<LOAD> op = LoadClause() [<AS> schema = SchemaTuple() {op.setSchema(schema);} ])
-| ((<GROUP> | <COGROUP>) op = CogroupClause())
-| (<FILTER> op = FilterClause())
-| (<ORDER> op = OrderClause())
-| (<DISTINCT> op = NestedExpr() {op = rewriteDistinct(op.getOperatorKey());})
-| (<CROSS> op = CrossClause())
-| (<JOIN> op = JoinClause())
-| (<UNION> op = UnionClause())
-| (<FOREACH> op = ForEachClause())
+ (<LOAD> op = LoadClause(lp) [<AS> (LOOKAHEAD(2) "(" schema = TupleSchema() ")" {op.setSchema(schema);log.debug("Load as schema()");schema.printAliases();} | fs = AtomSchema() {schema = new Schema(fs); op.setSchema(schema); log.debug("Load as atomschema()");schema.printAliases();}) ])
+| ((<GROUP> | <COGROUP>) op = CogroupClause(lp))
+| (<FILTER> op = FilterClause(lp))
+| (<ORDER> op = OrderClause(lp))
+| (<DISTINCT> op = NestedExpr(lp)
+ {
+ LogicalOperator distinct = new LODistinct(lp, new OperatorKey(scope, getNextId()), op);
+ lp.add(distinct);
+ log.debug("Added operator: " + distinct.getClass().getName() + " to the logical plan");
+ lp.connect(op, distinct);
+ log.debug("Connected alias: " + op.getAlias() + " operator " + op.getClass().getName() + " to operator " + distinct.getClass().getName());
+ })
+| (<CROSS> op = CrossClause(lp))
+| (<JOIN> op = JoinClause(lp))
+| (<UNION> op = UnionClause(lp))
+| (<FOREACH> op = ForEachClause(lp))
)
[<PARALLEL> t2=<NUMBER> { op.setRequestedParallelism(Integer.parseInt(t2.image));} ]
)
- {return op;}
+ {log.trace("Exiting BaseExpr"); return op;}
}
-LogicalOperator LoadClause() : {Token t1, t2; String filename; String funcName,funcArgs, funcSpec=null;
- LOLoad lo=null; boolean continuous=false;}
+LogicalOperator LoadClause(LogicalPlan lp) :
+{
+ Token t1, t2;
+ String filename;
+ String funcName,funcArgs, funcSpec=null;
+ LOLoad lo=null;
+ log.trace("Entering LoadClause");
+}
{
( filename = FileName()
(
<USING> funcName = QualifiedFunction() "(" funcArgs = StringList() ")"
{
funcSpec = funcName + "(" + funcArgs + ")";
+ log.debug("LoadClause: funcSpec = + funcSpec");
}
)?
)
- [ <CONTINUOUSLY> {continuous=true;} ]
{
if (funcSpec == null){
- funcSpec = PigStorage.class.getName();
- funcSpec += continuous ? "('\t','\n','0')" : "()";
+ funcSpec = PigStorage.class.getName() + "()";
}
- lo = new LOLoad(opTable, scope, getNextId(), new FileSpec(massageFilename(filename, pigContext), funcSpec));
- if (continuous)
- lo.setOutputType(LogicalOperator.MONOTONE);
+ lo = new LOLoad(lp, new OperatorKey(scope, getNextId()), new FileSpec(massageFilename(filename, pigContext), funcSpec), null);
+ lp.add(lo);
+ log.debug("Added operator " + lo.getClass().getName() + " to the logical plan");
+
+ log.trace("Exiting LoadClause");
return lo;
}
}
-String StringList() : {StringBuilder sb = new StringBuilder(); Token t;}
+String StringList() :
+{
+ StringBuilder sb = new StringBuilder();
+ Token t;
+}
{
(
(
@@ -452,563 +590,1201 @@
)
| {}
)
- {return sb.toString();}
+ {log.debug("StringList: " + sb.toString()); return sb.toString();}
}
-String FileName(): {Token t;}
+String FileName():
+{
+ Token t;
+}
{
t = <QUOTEDSTRING>
- {return unquote(t.image);}
+ {log.debug("FileName: " + unquote(t.image)); return unquote(t.image);}
}
-LogicalOperator FilterClause():
-{Cond cond; LogicalOperator input;}
+LogicalOperator FilterClause(LogicalPlan lp):
+{
+ ExpressionOperator cond; LogicalOperator input;
+ LogicalPlan conditionPlan = new LogicalPlan();
+ log.trace("Entering FilterClause");
+}
{
- input = NestedExpr()
- <BY> cond = PCond(input.outputSchema(),null)
+ (
+ input = NestedExpr(lp) {log.debug("Filter input: " + input);}
+ <BY> cond = PCond(input.getSchema(),null,conditionPlan,input)
+ )
{
- return new LOEval(opTable, scope, getNextId(), input.getOperatorKey(), new FilterSpec(cond));
+ LogicalOperator filter = new LOFilter(lp, new OperatorKey(scope, getNextId()), conditionPlan, input);
+ addAlias(input.getAlias(), input);
+ lp.add(filter);
+ log.debug("Added operator " + filter.getClass().getName() + " to the logical plan");
+
+ lp.connect(input, filter);
+ log.debug("Connected alias " + input.getAlias() + " operator " + input.getClass().getName() + " to operator " + filter.getClass().getName() +" in the logical plan");
+
+ log.trace("Exiting FilterClause");
+ return filter;
}
}
-Cond PCond(Schema over, Map<String, EvalSpec> specs) : {Cond cond = null;}
+ExpressionOperator PCond(Schema over, Map<String, LogicalOperator> specs, LogicalPlan lp, LogicalOperator input) :
+{
+ ExpressionOperator cond = null;
+ log.trace("Entering PCond");
+ log.debug("PCond Input: " + input);
+}
{
- cond = POrCond(over,specs)
- {return cond;}
+ cond = POrCond(over,specs,lp, input)
+ {log.trace("Exiting PCond"); return cond;}
}
-Cond POrCond(Schema over, Map<String, EvalSpec> specs) : {Cond cond; List<Cond> cList = new ArrayList<Cond>();}
+ExpressionOperator POrCond(Schema over, Map<String, LogicalOperator> specs, LogicalPlan lp, LogicalOperator input) :
+{
+ ExpressionOperator lhsCond, rhsCond;
+ log.trace("Entering POrCond");
+ log.debug("POrCond Input: " + input);
+}
{
(
- cond = PAndCond(over,specs) {cList.add(cond);}
- ( <OR> cond = PAndCond(over,specs) {cList.add(cond);})*
+ lhsCond = PAndCond(over,specs,lp,input)
+ (
+ <OR> rhsCond = PAndCond(over,specs,lp,input)
+ {
+ ExpressionOperator exprOp = new LOOr(lp, new OperatorKey(scope, getNextId()), lhsCond, rhsCond);
+ lp.add(exprOp);
+ log.debug("POrCond: Added operator " + exprOp.getClass().getName() + " " + exprOp + " to logical plan " + lp);
+ lp.connect(lhsCond, exprOp);
+ log.debug("POrCond: Connected operator " + lhsCond.getClass().getName() + " " + lhsCond + " to " + exprOp + " logical plan " + lp);
+ lp.connect(rhsCond, exprOp);
+ log.debug("POrCond: Connected operator " + rhsCond.getClass().getName() + " " + rhsCond + " to " + exprOp + " logical plan " + lp);
+ lhsCond = exprOp;
+ }
+ )*
)
{
- if (cList.size()==1)
- return cond;
- else
- return new OrCond(cList);
+ log.trace("Exiting POrCond");
+ return lhsCond;
}
}
-Cond PAndCond(Schema over, Map<String, EvalSpec> specs) : {Cond cond = null; List<Cond> cList = new ArrayList<Cond>();}
+ExpressionOperator PAndCond(Schema over, Map<String, LogicalOperator> specs, LogicalPlan lp, LogicalOperator input) :
+{
+ ExpressionOperator lhsCond, rhsCond;
+ log.trace("Entering PAndCond");
+ log.debug("PAndCond Input: " + input);
+}
{
(
- cond = PUnaryCond(over,specs) {cList.add(cond);}
- ( <AND> cond = PUnaryCond(over,specs) {cList.add(cond);} )*
+ lhsCond = PUnaryCond(over,specs,lp,input)
+ (
+ <AND> rhsCond = PUnaryCond(over,specs,lp,input)
+ {
+ ExpressionOperator exprOp = new LOAnd(lp, new OperatorKey(scope, getNextId()), lhsCond, rhsCond);
+ lp.add(exprOp);
+ log.debug("PAndCond: Added operator " + exprOp.getClass().getName() + " " + exprOp + " to logical plan " + lp);
+ lp.connect(lhsCond, exprOp);
+ log.debug("PAndCond: Connected operator " + lhsCond.getClass().getName() + " " + lhsCond + " to " + exprOp + " logical plan " + lp);
+ lp.connect(rhsCond, exprOp);
+ log.debug("PAndCond: Connected operator " + rhsCond.getClass().getName() + " " + rhsCond + " to " + exprOp + " logical plan " + lp);
+ lhsCond = exprOp;
+ }
+ )*
)
{
- if (cList.size()==1)
- return cond;
- else
- return new AndCond(cList);
+ log.trace("Exiting PAndCond");
+ return lhsCond;
}
}
-Cond PUnaryCond(Schema over, Map<String, EvalSpec> specs) : {Cond cond = null; EvalSpec c1, c2; Token t1; String funcName; GenerateSpec args;}
+ExpressionOperator PUnaryCond(Schema over, Map<String, LogicalOperator> specs, LogicalPlan lp, LogicalOperator input) :
+{
+ ExpressionOperator cond = null;
+ ExpressionOperator lhs, rhs;
+ Token t1;
+ String funcName;
+ List<ExpressionOperator> args;
+ log.trace("Entering PUnaryCond");
+}
{
(
- LOOKAHEAD("(" PCond(over,specs) ")")
- ("(" cond = PCond(over,specs) ")")
-| LOOKAHEAD(InfixExpr(over,specs) <FILTEROP>)
- (c1=InfixExpr(over,specs) t1=<FILTEROP> c2=InfixExpr(over,specs) {cond = new CompCond(c1, t1.image, c2);})
-| LOOKAHEAD(InfixExpr(over,specs) <MATCHES>)
- (c1=InfixExpr(over,specs) <MATCHES> t1=<QUOTEDSTRING> {cond = new RegexpCond(c1, unquote(t1.image));})
-| LOOKAHEAD(FilterFunction() "(") (funcName=FilterFunction() "(" args=EvalArgs(over,specs) ")" {cond = new FuncCond(pigContext, funcName, args);})
-| cond = PNotCond(over,specs)
+ LOOKAHEAD("(" PCond(over,specs,lp,input) ")")
+ ("(" cond = PCond(over,specs,lp,input) ")")
+| LOOKAHEAD(InfixExpr(over,specs,lp,input) <FILTEROP>)
+ (lhs=InfixExpr(over,specs,lp,input) t1=<FILTEROP> rhs=InfixExpr(over,specs,lp,input)
+ {
+ //the long switch case to instantiate the right operator
+ //I have the long switch case from CompCond
+ String op = t1.image;
+ op = op.toLowerCase();
+
+ char op1 = op.charAt(0);
+ char op2 = op.length() >= 2 ? op.charAt(1) : '0';
+ char op3 = op.length() == 3 ? op.charAt(2) : '0';
+
+ switch (op1) {
+ // numeric ops first
+ case '=':
+ if (op2 == '=') {
+ cond = new LOEqual(lp, new OperatorKey(scope, getNextId()), lhs, rhs);
+ } else {
+ throw new ParseException("Internal error: Invalid filter operator: " + op);
+ }
+ break;
+ case '<':
+ if (op2 == '=') {
+ cond = new LOLesserThanEqual(lp, new OperatorKey(scope, getNextId()), lhs, rhs);
+ } else {
+ cond = new LOLesserThan(lp, new OperatorKey(scope, getNextId()), lhs, rhs);
+ }
+ break;
+ case '>':
+ if (op2 == '=') {
+ cond = new LOGreaterThanEqual(lp, new OperatorKey(scope, getNextId()), lhs, rhs);
+ } else {
+ cond = new LOGreaterThan(lp, new OperatorKey(scope, getNextId()), lhs, rhs);
+ }
+ break;
+ case '!':
+ if (op2 == '=') {
+ cond = new LONotEqual(lp, new OperatorKey(scope, getNextId()), lhs, rhs);
+ } else {
+ throw new ParseException("Internal error: Invalid filter operator: " + op);
+ }
+ break;
+ // now string ops
+ case 'e':
+ if (op2 == 'q') {
+ cond = new LOEqual(lp, new OperatorKey(scope, getNextId()), lhs, rhs);
+ } else {
+ throw new ParseException("Internal error: Invalid filter operator: " + op);
+ }
+ break;
+ case 'l':
+ if (op2 == 't' && op3 == 'e') {
+ cond = new LOLesserThanEqual(lp, new OperatorKey(scope, getNextId()), lhs, rhs);
+ } else {
+ cond = new LOLesserThan(lp, new OperatorKey(scope, getNextId()), lhs, rhs);
+ }
+ break;
+ case 'g':
+ if (op2 == 't' && op3 == 'e') {
+ cond = new LOGreaterThanEqual(lp, new OperatorKey(scope, getNextId()), lhs, rhs);
+ } else {
+ cond = new LOGreaterThan(lp, new OperatorKey(scope, getNextId()), lhs, rhs);
+ }
+ break;
+ case 'n':
+ if (op2 == 'e' && op3 == 'q') {
+ cond = new LONotEqual(lp, new OperatorKey(scope, getNextId()), lhs, rhs);
+ } else {
+ throw new ParseException("Internal error: Invalid filter operator: " + op);
+ }
+ break;
+ default:
+ throw new ParseException("Internal error: Invalid filter operator: " + op);
+ }
+
+ lp.add(cond);
+ log.debug("PUnaryCond: Added operator " + cond.getClass().getName() + " " + cond + " to logical plan " + lp);
+ lp.connect(lhs, cond);
+ log.debug("PUnaryCond: Connected operator " + lhs.getClass().getName() + " " + lhs+ " to " + cond + " logical plan " + lp);
+ lp.connect(rhs, cond);
+ log.debug("PUnaryCond: Connected operator " + rhs.getClass().getName() + " " + rhs+ " to " + cond + " logical plan " + lp);
+ }
+ )
+| LOOKAHEAD(InfixExpr(over,specs,lp,input) <MATCHES>)
+ (lhs=InfixExpr(over,specs,lp,input) <MATCHES> t1=<QUOTEDSTRING>
+ {
+ cond = new LORegexp(lp, new OperatorKey(scope, getNextId()), lhs, unquote(t1.image));
+ lp.add(cond);
+ log.debug("PUnaryCond: Added operator " + cond.getClass().getName() + " " + cond + " to logical plan " + lp);
+ lp.connect(lhs, cond);
+ log.debug("PUnaryCond: Connected operator " + cond.getClass().getName() + " " + cond + " to " + lhs + " logical plan " + lp);
+ }
+ )
+| LOOKAHEAD(EvalFunction() "(")
+ (funcName=EvalFunction() "(" args=EvalArgs(over,specs,lp,input) ")"
+ {
+ cond = new LOUserFunc(lp, new OperatorKey(scope, getNextId()), funcName, args, DataType.BOOLEAN);
+ lp.add(cond);
+ log.debug("PUnaryCond: Added operator " + cond.getClass().getName() + " " + cond + " to logical plan " + lp);
+ for(ExpressionOperator exprOp: args) {
+ lp.connect(exprOp, cond);
+ log.debug("PUnaryCond: Added operator " + exprOp.getClass().getName() + " " + cond + " to logical plan " + lp);
+ }
+ }
+ )
+| cond = PNotCond(over,specs,lp,input)
)
- {return cond;}
+ {log.trace("Exiting PUnaryCond"); return cond;}
}
-Cond PNotCond(Schema over, Map<String, EvalSpec> specs) : {Cond c1;}
+ExpressionOperator PNotCond(Schema over, Map<String, LogicalOperator> specs,LogicalPlan lp,LogicalOperator input) :
{
- <NOT> c1=PUnaryCond(over,specs)
- {return new NotCond(c1);}
+ ExpressionOperator c1;
+ log.trace("Entering PNotCond");
+}
+{
+ <NOT> c1=PUnaryCond(over,specs,lp,input)
+ {
+ ExpressionOperator eOp = new LONot(lp, new OperatorKey(scope, getNextId()), c1);
+ lp.add(eOp);
+ log.debug("PNotCond: Added operator " + eOp.getClass().getName() + " " + eOp + " to logical plan " + lp);
+ lp.connect(c1, eOp);
+ log.debug("PNotCond: Connected operator " + eOp.getClass().getName() + " " + eOp + " to " + c1 + " logical plan " + lp);
+ log.trace("Exiting PNotCond");
+ return eOp;
+ }
}
-LogicalOperator CogroupClause() : {CogroupInput gi; ArrayList<CogroupInput> gis = new ArrayList<CogroupInput>();}
+LogicalOperator CogroupClause(LogicalPlan lp) :
{
- (gi = GroupItem() { gis.add(gi); }
- ("," gi = GroupItem() { gis.add(gi); })*)
- {return parseCogroup(gis);}
+ CogroupInput gi;
+ ArrayList<CogroupInput> gis = new ArrayList<CogroupInput>();
+ LogicalOperator cogroup;
+ log.trace("Entering CoGroupClause");
}
+{
+ (gi = GroupItem(lp) { gis.add(gi); }
+ ("," gi = GroupItem(lp) { gis.add(gi); })*)
+ {
+ cogroup = parseCogroup(gis, lp);
+ log.trace("Exiting CoGroupClause");
+ return cogroup;
+ }
-CogroupInput GroupItem() : {LogicalOperator op; GenerateSpec gs; EvalSpec es; LogicalOperator cgOp; EvalSpec cgSpec;}
+}
+
+CogroupInput GroupItem(LogicalPlan lp) :
+{
+ ExpressionOperator es;
+ LogicalOperator cgOp;
+ boolean isInner = false;
+ ArrayList<LogicalPlan> listPlans = new ArrayList<LogicalPlan>();
+ LogicalPlan groupByPlan;
+ ArrayList<Boolean> flattenList = new ArrayList<Boolean>();
+ log.trace("Entering GroupItem");
+ log.debug("LogicalPlan: " + lp);
+}
{
(
- cgOp = NestedExpr()
- (
- ( <BY>
- (
- LOOKAHEAD ( "(" FlattenedGenerateItemList(cgOp.outputSchema()) ")" )
- ( "(" gs = FlattenedGenerateItemList(cgOp.outputSchema(), null) ")" )
-| (es = FlattenedGenerateItem(cgOp.outputSchema(), null) {gs = new GenerateSpec(es);})
- )
- )
-| <ALL> {gs = new GenerateSpec(new ConstSpec("all"));}
-| <ANY> {gs = new GenerateSpec(new FuncEvalSpec(pigContext, GFAny.class.getName(), null));}
- )
- {
- cgSpec = gs.getGroupBySpec();
- }
- [<INNER> {cgSpec.setInner(true);} | <OUTER>]
+ cgOp = NestedExpr(lp)
+ (
+ ( <BY>
+ (
+ LOOKAHEAD ( "(" FlattenedGenerateItemList(cgOp.getSchema(), null, groupByPlan, cgOp) ")" )
+ ( "(" es = FlattenedGenerateItem(cgOp.getSchema(), null, groupByPlan = new LogicalPlan(), cgOp, flattenList)
+ {listPlans.add(groupByPlan);}
+ (
+ "," es = FlattenedGenerateItem(cgOp.getSchema(), null, groupByPlan = new LogicalPlan(), cgOp, flattenList)
+ {listPlans.add(groupByPlan);}
+ )*
+ ")"
+ )
+ | (
+ es = FlattenedGenerateItem(cgOp.getSchema(), null, groupByPlan = new LogicalPlan(), cgOp, flattenList)
+ {listPlans.add(groupByPlan);}
+ )
+ )
+ )
+ | <ALL> {
+ es = new LOConst(groupByPlan = new LogicalPlan(), new OperatorKey(scope, getNextId()), "all");
+ groupByPlan.add(es);
+ log.debug("GroupItem: Added operator " + es.getClass().getName() + " " + es + " to logical plan " + groupByPlan);
+ listPlans.add(groupByPlan);
+ }
+ | <ANY> {
+ es = new LOUserFunc(groupByPlan = new LogicalPlan(), new OperatorKey(scope, getNextId()), GFAny.class.getName(), null, DataType.INTEGER);
+ groupByPlan.add(es);
+ log.debug("GroupItem: Added operator " + es.getClass().getName() + " " + es + " to logical plan " + groupByPlan);
+ listPlans.add(groupByPlan);
+ }
+ )
+ [<INNER> {isInner = true;} | <OUTER>]
)
{
CogroupInput cogroupInput = new CogroupInput();
- cogroupInput.spec = cgSpec;
- cogroupInput.op = cgOp.getOperatorKey();
+ cogroupInput.plans = listPlans;
+ cogroupInput.op = cgOp;
+ cogroupInput.isInner = isInner;
+ log.trace("Exiting GroupItem");
return cogroupInput;
}
}
-
-LogicalOperator OrderClause() : {LogicalOperator op; EvalSpec sortSpec = null; ProjectSpec projSpec; String funcName;}
+LogicalOperator OrderClause(LogicalPlan lp) :
+{
+ LogicalOperator op;
+ ExpressionOperator col;
+ boolean star = false;
+ ArrayList<ExpressionOperator> sortCols = new ArrayList<ExpressionOperator>();
+ ArrayList<LogicalPlan> sortColPlans = new ArrayList<LogicalPlan>();
+ ArrayList<Boolean> ascOrder = new ArrayList<Boolean>();
+ boolean asc = true;
+ String funcName = null;
+ log.trace("Entering OrderClause");
+}
{
(
- op = NestedExpr() <BY>
+ op = NestedExpr(lp) <BY>
(
(
- (projSpec = SimpleProj(op.outputSchema()) | ( "(" projSpec = SimpleProj(op.outputSchema()) ")"))
- {
- projSpec.setWrapInTuple(true);
- projSpec.setFlatten(true);
- sortSpec = new GenerateSpec(projSpec);
- }
+ (
+ col = SortCol(op.getSchema(), lp, op, ascOrder, sortColPlans)
+ ("," col = SortCol(op.getSchema(), lp, op, ascOrder, sortColPlans))*
)
- | (sortSpec = Star() {sortSpec = new GenerateSpec(sortSpec);})
+ )
+ | <STAR> {star = true;} [<ASC> | <DESC> {asc = false;}]
+ {
+ if(asc) {
+ ascOrder.add(true);
+ } else {
+ ascOrder.add(false);
+ }
+ }
)
(
<USING> funcName = QualifiedFunction()
- {
- try {
- sortSpec.setComparatorName(funcName);
- } catch (Exception e){
- throw new ParseException(e.getMessage());
- }
- }
)?
)
{
- return new LOSort(opTable, scope, getNextId(), op.getOperatorKey(), sortSpec);
+ LOSort sort = new LOSort(lp, new OperatorKey(scope, getNextId()), op, sortColPlans, ascOrder, funcName);
+ sort.setStar(star);
+ lp.add(sort);
+ log.debug("Added operator " + sort.getClass().getName() + " to the logical plan");
+
+ lp.connect(op, sort);
+ log.debug("Connecting sort input alias " + op.getAlias() + " operator " + op.getClass().getName() + " to operator " + sort.getClass().getName() + " in the logical plan");
+
+ log.trace("Exiting OrderClause");
+ return sort;
}
}
-
-LogicalOperator CrossClause() : {LogicalOperator op; ArrayList<OperatorKey> inputs = new ArrayList<OperatorKey>();}
+
+ExpressionOperator SortCol(Schema over, LogicalPlan lp, LogicalOperator op, ArrayList<Boolean> ascOrder, ArrayList<LogicalPlan> sortColPlans) :
+{
+ ExpressionOperator col;
+ boolean asc = true;
+ LogicalPlan sortColPlan = new LogicalPlan();
+ log.trace("Entering SortCol");}
+{
+ (
+ col = ColOrSpec(op.getSchema(), null, sortColPlan, op) [<ASC> | <DESC> {asc = false;}]
+ {
+ if(asc) {
+ log.debug("Ascending");
+ ascOrder.add(true);
+ } else {
+ log.debug("Descending");
+ ascOrder.add(false);
+ }
+ sortColPlans.add(sortColPlan);
+ }
+ |
+ (
+ "(" col = ColOrSpec(op.getSchema(), null, sortColPlan, op) ")" [<ASC> | <DESC> {asc = false;}]
+ {
+ if(asc) {
+ log.debug("Ascending");
+ ascOrder.add(true);
+ } else {
+ log.debug("Descending");
+ ascOrder.add(false);
+ }
+ sortColPlans.add(sortColPlan);
+ }
+ )
+ )
+ {
+ log.trace("Exiting SortCol");
+ return col;
+ }
+}
+
+int ColName(Schema over) :
+{
+ Token t;
+ log.trace("Entering ColName");
+}
{
(
- op = NestedExpr() { inputs.add(op.getOperatorKey()); }
- ("," op = NestedExpr() { inputs.add(op.getOperatorKey()); })*
+ t = <DOLLARVAR> {return undollar(t.image);}
+ |
+ t = <IDENTIFIER>
+ { int i;
+
+ if ( over == null || (i = over.getPosition(t.image)) == -1) {
+ throw new ParseException("Invalid alias: " + t.image + " in " + over);
+ }
+
+ log.trace("Exiting ColName");
+ return i;
+ }
)
- {return rewriteCross(inputs);}
+}
+
+LogicalOperator CrossClause(LogicalPlan lp) :
+{
+ LogicalOperator op;
+ ArrayList<LogicalOperator> inputs = new ArrayList<LogicalOperator>();
+ log.trace("Entering CrossClause");
+}
+{
+ (
+ op = NestedExpr(lp) { inputs.add(op); }
+ ("," op = NestedExpr(lp) { inputs.add(op); })*
+ )
+ {
+ LogicalOperator cross = new LOCross(lp, new OperatorKey(scope, getNextId()), inputs);
+ lp.add(cross);
+ log.debug("Added operator " + cross.getClass().getName() + " to the logical plan");
+
+ for (LogicalOperator lop: inputs) {
+ lp.connect(lop, cross);
+ log.debug("Connected operator " + lop.getClass().getName() + " " + lop + " to " + cross + " logical plan " + lp);
+ }
+ log.debug("Connected cross inputs to the cross operator");
+
+ log.trace("Exiting CrossClause");
+ return cross;
+ }
}
-LogicalOperator JoinClause() : {CogroupInput gi; ArrayList<CogroupInput> gis = new ArrayList<CogroupInput>();}
+LogicalOperator JoinClause(LogicalPlan lp) :
{
- (gi = GroupItem() { gis.add(gi); }
- ("," gi = GroupItem() { gis.add(gi); })*)
- {return rewriteJoin(gis);}
+ CogroupInput gi;
+ ArrayList<CogroupInput> gis = new ArrayList<CogroupInput>();
+ log.trace("Entering JoinClause");
+ log.debug("LogicalPlan: " + lp);
+}
+{
+ (gi = GroupItem(lp) { gis.add(gi); }
+ ("," gi = GroupItem(lp) { gis.add(gi); })*)
+ {log.trace("Exiting JoinClause"); return rewriteJoin(gis, lp);}
+
}
-LogicalOperator UnionClause() : {LogicalOperator op; ArrayList<OperatorKey> inputs = new ArrayList<OperatorKey>();}
+LogicalOperator UnionClause(LogicalPlan lp) :
{
- (op = NestedExpr() { inputs.add(op.getOperatorKey()); }
- ("," op = NestedExpr() { inputs.add(op.getOperatorKey()); })*)
- {return new LOUnion(opTable, scope, getNextId(), inputs);}
+ LogicalOperator op;
+ ArrayList<LogicalOperator> inputs = new ArrayList<LogicalOperator>();
+ log.trace("Entering UnionClause");
+}
+{
+ (op = NestedExpr(lp){inputs.add(op);}
+ ("," op = NestedExpr(lp) {inputs.add(op);})*)
+ {
+ LogicalOperator union = new LOUnion(lp, new OperatorKey(scope, getNextId()), inputs);
+ lp.add(union);
+ log.debug("Added operator " + union.getClass().getName() + " to the logical plan");
+
+ for (LogicalOperator lop: inputs) {
+ lp.connect(lop, union);
+ log.debug("Connected union input operator " + lop.getClass().getName() + " to operator " + lop.getClass().getName() + " in the logical plan");
+ }
+
+ log.trace("Exiting UnionClause");
+ return union;
+ }
}
-LogicalOperator ForEachClause() : {EvalSpec spec = null; LogicalOperator input, op; }
+LogicalOperator ForEachClause(LogicalPlan lp) :
+{
+ ArrayList<LogicalOperator> specList = new ArrayList<LogicalOperator>();
+ LogicalOperator input, foreach;
+ LogicalPlan foreachPlan = new LogicalPlan();
+ log.trace("Entering ForEachClause");
+}
{
(
- input = NestedExpr()
- spec = NestedBlock(input.outputSchema())
+ input = NestedExpr(lp)
+ specList = NestedBlock(input.getSchema(), specList, foreachPlan, input)
)
{
- op = new LOEval(opTable, scope, getNextId(), input.getOperatorKey(), spec);
- return op;
+ foreach = new LOForEach(lp, new OperatorKey(scope, getNextId()), foreachPlan);
+ try {
+ lp.add(foreach);
+ log.debug("Added operator " + foreach.getClass().getName() + " to the logical plan");
+
+ lp.connect(input, foreach);
+ log.debug("Connected alias " + input.getAlias() + " operator " + input.getClass().getName() + " object " + input + " to operator " + foreach.getClass().getName() + " in the logical plan");
+ } catch (PlanException planException) {
+ ParseException parseException = new ParseException(planException.getMessage());
+ throw parseException;
+ }
+
+ log.trace("Exiting ForEachClause");
+ return foreach;
}
}
-EvalSpec NestedBlock(Schema over):
-{EvalSpec spec; Map<String, EvalSpec> specs = new HashMap<String, EvalSpec>();}
+ArrayList<LogicalOperator> NestedBlock(Schema over, ArrayList<LogicalOperator> specList, LogicalPlan lp, LogicalOperator input):
+{
+ LogicalOperator spec;
+ Map<String, LogicalOperator> specs = new HashMap<String, LogicalOperator>();
+ log.trace("Entering NestedBlock");
+}
{
(
- spec = GenerateStatement(over,specs)
-| ("{" (NestedCommand(over,specs) ";")* spec = GenerateStatement(over,specs) ";" "}")
+ spec = GenerateStatement(over,specs, lp, input) {specList.add(spec);}
+| ("{" (NestedCommand(over,specs,specList, lp, input) ";")* spec = GenerateStatement(over,specs,lp,input) ";" "}")
+ {specList.add(spec);}
)
- {return spec;}
+ {log.trace("Exiting NestedBlock"); return specList;}
}
-void NestedCommand(Schema over, Map<String, EvalSpec> specs):
-{Token t; EvalSpec item;}
+void NestedCommand(Schema over, Map<String, LogicalOperator> specs, List<LogicalOperator> specList, LogicalPlan lp, LogicalOperator input):
+{
+ Token t;
+ LogicalOperator item;
+ ExpressionOperator eOp = null;
+ log.trace("Entering NestedCommand");
+}
{
(
t = <IDENTIFIER> "="
(
- item = InfixExpr(over,specs)
-| item = NestedFilter(over,specs)
-| item = NestedSortOrArrange(over,specs)
-| item = NestedDistinct(over,specs)
+ eOp = InfixExpr(over,specs,lp,input)
+ {
+ item = eOp;
+ lp.add(eOp);
+ log.debug("Added operator " + eOp.getClass().getName() + " " + eOp + " to the logical plan " + lp);
+ }
+| item = NestedFilter(over,specs,lp, input)
+| item = NestedSortOrArrange(over,specs,lp, input)
+| item = NestedDistinct(over,specs,lp, input)
)
)
- {specs.put(t.image,item);}
+ {
+ String alias = t.image;
+ item.setAlias(alias);
+ specs.put(alias,item);
+ log.debug("Added " + alias + " to the specs map");
+ specList.add(item);
+ log.debug("Added " + alias + " to the specList");
+
+ log.trace("Exiting NestedCommand");
+ }
}
-EvalSpec NestedFilter(Schema over, Map<String, EvalSpec> specs):
-{Cond cond; EvalSpec item; Schema subSchema = null;}
+LogicalOperator NestedFilter(Schema over, Map<String, LogicalOperator> specs, LogicalPlan lp, LogicalOperator input):
{
- <FILTER> item = BaseEvalSpec(over,specs) { subSchema = item.getOutputSchemaForPipe(over); }
- <BY> cond = PCond(subSchema,null)
- { return copyItemAndAddSpec(item,new FilterSpec(cond)); }
+ ExpressionOperator cond;
+ Schema subSchema = null;
+ ExpressionOperator eOp;
+ LogicalPlan conditionPlan = new LogicalPlan();
+ log.trace("Entering NestedFilter");
+}
+{
+ (
+ <FILTER> eOp = BaseEvalSpec(over, specs, lp, input)
+ {subSchema = eOp.getSchema();}
+ <BY> cond = PCond(subSchema,null,conditionPlan,input)
+ )
+ {
+ lp.add(eOp);
+ log.debug("Added " + eOp.getAlias() + " to the logical plan");
+ LogicalOperator filter = new LOFilter(lp, new OperatorKey(scope, getNextId()), conditionPlan, eOp);
+ lp.add(filter);
+ log.debug("Added nested filter operator " + filter.getClass().getName() + " to the logical plan");
+
+ lp.connect(eOp, filter);
+ log.debug("Connected the filter input to the filter");
+
+ log.trace("Exiting NestedFilter");
+ return filter;
+ }
+
}
-EvalSpec NestedSortOrArrange(Schema over, Map<String, EvalSpec> specs):
-{EvalSpec sortSpec; ProjectSpec projSpec; EvalSpec item; Schema subSchema = null; Token t; String funcName;}
+LogicalOperator NestedSortOrArrange(Schema over, Map<String, LogicalOperator> specs, LogicalPlan lp, LogicalOperator input):
+{
+ ExpressionOperator col;
+ boolean star = false;
+ ArrayList<LogicalPlan> sortColPlans = new ArrayList<LogicalPlan>();
+ ArrayList<Boolean> ascOrder = new ArrayList<Boolean>();
+ String funcName = null;
+ ExpressionOperator eOp;
+ Token t;
+ boolean asc = true;
+ log.trace("Entering NestedSortOrArrange");}
{
(
( t = <ORDER> | t = <ARRANGE> )
- item = BaseEvalSpec(over,specs) { subSchema = item.getOutputSchemaForPipe(over); }
- <BY> ( (projSpec = SimpleProj(subSchema) {sortSpec = projSpec;})
- | sortSpec = Star() )
+ (
+ eOp = BaseEvalSpec(over, specs, lp, input)
+ {
+ try{
+ eOp.getSchema();
+ }catch(FrontendException fe) {
+ ParseException pe = new ParseException(fe.getMessage());
+ throw pe;
+ }
+ log.debug("Before BY");
+ }
+ )
+ <BY>
+ (
+ (
+ col = SortCol(eOp.getSchema(), lp, eOp, ascOrder, sortColPlans)
+ ("," col = SortCol(eOp.getSchema(), lp, eOp, ascOrder, sortColPlans) )*
+
+ )
+ | <STAR> {star = true;} [<ASC> | <DESC> {asc = false;}]
+ {
+ if(asc) {
+ ascOrder.add(true);
+ } else {
+ ascOrder.add(false);
+ }
+ }
+ )
(
<USING> funcName = QualifiedFunction()
- {
- try {
- sortSpec.setComparatorName(funcName);
- } catch (Exception e){
- throw new ParseException(e.getMessage());
- }
- }
)?
)
- { return copyItemAndAddSpec(item,new SortDistinctSpec(false, sortSpec)); }
+ {
+ log.debug("Before creating LOSort");
+ LOSort sort = new LOSort(lp, new OperatorKey(scope, getNextId()), eOp, sortColPlans, ascOrder, funcName);
+ sort.setStar(star);
+ log.debug("After creating LOSort");
+ try {
+ lp.add(eOp);
+ log.debug("Added " + eOp + " " + eOp.getClass().getName() + " to the logical plan");
+ lp.add(sort);
+ log.debug("Added operator " + sort.getClass().getName() + " to the logical plan");
+
+ lp.connect(eOp, sort);
+ log.debug("Connected alias " + eOp.getAlias() + " operator " + eOp.getClass().getName() + " to operator " + sort.getClass().getName() + " the logical plan");
+ } catch (PlanException planException) {
+ ParseException parseException = new ParseException(planException.getMessage());
+ throw parseException;
+ }
+
+ log.trace("Exiting NestedSortOrArrange");
+ return sort;
+ }
}
-EvalSpec NestedDistinct(Schema over, Map<String, EvalSpec> specs):
-{EvalSpec item; LogicalOperator subOp = null; Token t;}
+LogicalOperator NestedDistinct(Schema over, Map<String, LogicalOperator> specs, LogicalPlan lp, LogicalOperator input):
+{
+ Token t;
+ ExpressionOperator eOp;
+ log.trace("Entering NestedDistinct");
+}
{
(
- <DISTINCT>
- item = BaseEvalSpec(over,specs)
+ <DISTINCT> eOp = BaseEvalSpec(over, specs, lp, input)
)
{
- return copyItemAndAddSpec(item,new SortDistinctSpec(true, null));
+ lp.add(eOp);
+ log.debug("Added " + eOp.getAlias() + " to the logical plan");
+ LogicalOperator distinct = new LODistinct(lp, new OperatorKey(scope, getNextId()), eOp);
+ lp.add(distinct);
+ log.debug("Added operator " + distinct.getClass().getName() + " to the logical plan");
+
+ lp.connect(eOp, distinct);
+ log.debug("Connected alias " + input.getAlias() + " operator " + input.getClass().getName() + " to operator " + distinct.getClass().getName() + " in the logical plan");
+
+ log.trace("Exiting NestedDistinct");
+ return distinct;
}
}
-GenerateSpec GenerateStatement(Schema over, Map<String, EvalSpec> specs):
-{GenerateSpec spec = null; TupleSchema schema;}
+LogicalOperator GenerateStatement(Schema over, Map<String, LogicalOperator> specs, LogicalPlan lp, LogicalOperator input):
+{
+ LogicalOperator spec = null;
+ Schema schema;
+ setInsideGenerate(true);
+ log.trace("Entering GenerateStatement");
+}
{
(
<GENERATE>
- spec = FlattenedGenerateItemList(over,specs)
+ spec = FlattenedGenerateItemList(over,specs,lp,input)
)
{
+ log.debug("Connecting generate inputs");
+ for(LogicalOperator op: getGenerateInputs()) {
+ lp.connect(op, spec);
+ log.debug("Connected operator: " + op.getClass().getName() + " to " + op + " " + spec + " in logical plan " + lp);
+ }
+ resetGenerateState();
+ log.trace("Exiting GenerateStatement");
return spec;
}
}
-GenerateSpec FlattenedGenerateItemList(Schema over, Map<String, EvalSpec> specs):
-{ArrayList<EvalSpec> specList = new ArrayList<EvalSpec>(); EvalSpec item;}
+LogicalOperator FlattenedGenerateItemList(Schema over, Map<String, LogicalOperator> specs, LogicalPlan lp, LogicalOperator input):
+{
+ ArrayList<LogicalPlan> generatePlans = new ArrayList<LogicalPlan>();
+ ArrayList<Boolean> flattenList = new ArrayList<Boolean>();
+ ExpressionOperator item;
+ LogicalPlan generatePlan;
+ log.trace("Entering FlattenedGenerateItemList");
+}
{
(
- item = FlattenedGenerateItem(over,specs) {specList.add(item);}
- ("," item = FlattenedGenerateItem(over,specs) {specList.add(item);})*
+ item = FlattenedGenerateItem(over, specs, generatePlan = new LogicalPlan(), input, flattenList) {generatePlans.add(generatePlan);}
+ ("," item = FlattenedGenerateItem(over, specs, generatePlan = new LogicalPlan(), input, flattenList) {generatePlans.add(generatePlan);})*
+
)
- {return new GenerateSpec(specList);}
+ {
+ LogicalOperator generate = new LOGenerate(lp, new OperatorKey(scope, getNextId()), generatePlans, flattenList);
+ lp.add(generate);
+ log.debug("Added operator " + generate.getClass().getName() + " to the logical plan");
+ log.trace("Exiting FlattenedGenerateItemList");
+ return generate;
+ }
}
-EvalSpec FlattenedGenerateItem(Schema over, Map<String, EvalSpec> specs):
-{EvalSpec item; Schema schema = null;}
+ExpressionOperator FlattenedGenerateItem(Schema over, Map<String, LogicalOperator> specs, LogicalPlan lp, LogicalOperator input, ArrayList<Boolean> flattenList):
+{
+ ExpressionOperator item;
+ Schema schema = null;
+ Token t;
+ Schema.FieldSchema fs;
+ boolean flatten = false;
+ log.trace("Entering FlattenedGenerateItem");
+}
{
(
(
- ( <FLATTEN> "(" item = InfixExpr(over,specs) ")"
+ (
+ <FLATTEN> "(" item = InfixExpr(over,specs,lp,input) ")"
{
- item.setFlatten(true);
+ flatten = true;
+ }
+ )
+| (item = InfixExpr(over,specs,lp,input))
+| ( <STAR>
+ {
+ LOProject project = new LOProject(lp, new OperatorKey(scope, getNextId()), input, -1);
+ project.setStar(true);
+ item = project;
+ lp.add(project);
+ log.debug("FGItem: Added operator " + project.getClass().getName() + " " + project + " to logical plan " + lp);
}
)
-| (item = InfixExpr(over,specs))
-| (item = Star())
)
- [ <AS> schema = Schema() ]
+ [ <AS> (LOOKAHEAD(2) "(" schema = TupleSchema() ")" | fs = AtomSchema() {schema = new Schema(fs);})]
)
{
- item.setSchema(schema);
+ log.debug("item: " + item.getClass().getName());
+ if(null != schema) {
+ item.setSchema(schema);
+ log.debug("Printing Schema Aliases from parser");
+ schema.printAliases();
+ log.debug("Printing Schema Aliases from item");
+ item.getSchema().printAliases();
+ }
+ flattenList.add(flatten);
+ log.trace("Exiting FlattenedGenerateItem");
return item;
}
}
-EvalSpec InfixExpr(Schema over, Map<String, EvalSpec> specs) : { EvalSpec expr; }
+ExpressionOperator InfixExpr(Schema over, Map<String, LogicalOperator> specs, LogicalPlan lp, LogicalOperator input) :
{
- expr = AdditiveExpr(over,specs)
- {return expr;}
+ ExpressionOperator expr;
+ log.trace("Entering InFixExpr");
+}
+{
+ expr = AdditiveExpr(over,specs,lp,input)
+ {log.trace("Exiting InFixExpr");return expr;}
}
-EvalSpec AdditiveExpr(Schema over, Map<String, EvalSpec> specs) : { Token t; EvalSpec lhs, rhs; GenerateSpec args; }
+ExpressionOperator AdditiveExpr(Schema over, Map<String, LogicalOperator> specs, LogicalPlan lp, LogicalOperator input) :
+{
+ Token t;
+ ExpressionOperator lhs, rhs, exprOp;
+ log.trace("Entering AdditiveExpr");
+}
{
(
- lhs = MultiplicativeExpr(over,specs)
+ lhs = MultiplicativeExpr(over,specs,lp,input)
(
- ( t = "+" | t = "-" ) rhs = MultiplicativeExpr(over,specs)
+ ( t = "+" | t = "-" ) rhs = MultiplicativeExpr(over,specs,lp,input)
{
assertAtomic(lhs,true);
assertAtomic(rhs,true);
- ArrayList<EvalSpec> argsList = new ArrayList<EvalSpec>();
- argsList.add(lhs);
- argsList.add(rhs);
- args = new GenerateSpec(argsList);
if (t.image.equals("+")){
- lhs = new FuncEvalSpec(pigContext, ADD.class.getName(), args);
+ exprOp = new LOAdd(lp, new OperatorKey(scope, getNextId()), lhs, rhs);
}else{
- lhs = new FuncEvalSpec(pigContext, SUBTRACT.class.getName(), args);
+ exprOp = new LOSubtract(lp, new OperatorKey(scope, getNextId()), lhs, rhs);
}
+ lp.add(exprOp);
+ log.debug("AdditiveExpr: Added operator " + exprOp.getClass().getName() + " " + exprOp + " to logical plan " + lp);
+ lp.connect(lhs, exprOp);
+ log.debug("AdditiveExpr: Connected operator " + lhs.getClass().getName() + " " + lhs+ " to " + exprOp + " logical plan " + lp);
+ lp.connect(rhs, exprOp);
+ log.debug("AdditiveExpr: Connected operator " + rhs.getClass().getName() + " " + rhs+ " to " + exprOp + " logical plan " + lp);
+ lhs = exprOp;
}
)*
)
- {return lhs;}
+ {
+ log.trace("Exiting AdditiveExpr");
+ return lhs;
+ }
}
-EvalSpec MultiplicativeExpr(Schema over, Map<String, EvalSpec> specs) : { Token t; EvalSpec lhs, rhs; GenerateSpec args; }
+ExpressionOperator MultiplicativeExpr(Schema over, Map<String, LogicalOperator> specs, LogicalPlan lp, LogicalOperator input) :
+{
+ Token t;
+ ExpressionOperator lhs, rhs, exprOp;
+ log.trace("Entering MultiplicativeExpr");
+}
{
(
- lhs = UnaryExpr(over,specs)
+ lhs = UnaryExpr(over,specs,lp,input)
(
- ( t = <STAR> | t = "/" ) rhs = UnaryExpr(over,specs)
+ ( t = <STAR> | t = "/" | t = "%") rhs = UnaryExpr(over,specs,lp,input)
{
assertAtomic(lhs,true);
assertAtomic(rhs,true);
- ArrayList<EvalSpec> argsList = new ArrayList<EvalSpec>();
- argsList.add(lhs);
- argsList.add(rhs);
- args = new GenerateSpec(argsList);
if (t.image.equals("*")){
- lhs = new FuncEvalSpec(pigContext, MULTIPLY.class.getName(), args);
- }else{
- lhs = new FuncEvalSpec(pigContext, DIVIDE.class.getName(), args);
+ exprOp = new LOMultiply(lp, new OperatorKey(scope, getNextId()), lhs, rhs);
+ }else if (t.image.equals("/")){
+ exprOp = new LODivide(lp, new OperatorKey(scope, getNextId()), lhs, rhs);
+ }else {
+ exprOp = new LOMod(lp, new OperatorKey(scope, getNextId()), lhs, rhs);
}
+ lp.add(exprOp);
+ log.debug("MultiplicativeExpr: Added operator " + exprOp.getClass().getName() + " " + exprOp + " to logical plan " + lp);
+ lp.connect(lhs, exprOp);
+ log.debug("MultiplicativeExpr: Connected operator " + lhs.getClass().getName() + " " + lhs+ " to " + exprOp + " logical plan " + lp);
+ lp.connect(rhs, exprOp);
+ log.debug("MultiplicativeExpr: Connected operator " + rhs.getClass().getName() + " " + rhs+ " to " + exprOp + " logical plan " + lp);
+ lhs = exprOp;
}
)*
)
- {return lhs;}
+ {
+ log.trace("Exiting MultiplicativeExpr");
+ return lhs;
+ }
}
-EvalSpec UnaryExpr(Schema over, Map<String, EvalSpec> specs) : { EvalSpec expr; }
+ExpressionOperator UnaryExpr(Schema over, Map<String, LogicalOperator> specs, LogicalPlan lp, LogicalOperator input) :
+{
+ ExpressionOperator expr;
+ log.trace("Entering UnaryExpr");
+}
{
(
- LOOKAHEAD(BaseEvalSpec(over,specs)) expr = BaseEvalSpec(over,specs)
-| ( "(" expr = InfixExpr(over,specs) ")" )
+ LOOKAHEAD(BaseEvalSpec(over,specs,lp,input)) expr = BaseEvalSpec(over,specs,lp,input)
+| ( "(" expr = InfixExpr(over,specs,lp,input) ")" )
+| expr = NegativeExpr(over,specs,lp,input)
+
)
- {return expr;}
+ {log.trace("Exiting UnaryExpr");return expr;}
}
+ExpressionOperator NegativeExpr(Schema over, Map<String, LogicalOperator> specs, LogicalPlan lp, LogicalOperator input) :
+{
+ ExpressionOperator c1;
+ LogicalPlan exprPlan = new LogicalPlan();
+ log.trace("Entering NegativeExpr");
+}
+{
+ "-" c1=UnaryExpr(over,specs,lp,input)
+ {
+ ExpressionOperator eOp = new LONegative(lp, new OperatorKey(scope, getNextId()), c1);
+ lp.add(eOp);
+ log.debug("NegativeExpr: Added operator " + eOp.getClass().getName() + " " + eOp + " to logical plan " + lp);
+ lp.connect(c1, eOp);
+ log.trace("Exiting NegativeExpr");
+ return eOp;
+ }
+}
+
+
-EvalSpec BaseEvalSpec(Schema over, Map<String, EvalSpec> specs) :
-{EvalSpec item;EvalSpec projection; Schema subSchema = null; Token t;}
+ExpressionOperator BaseEvalSpec(Schema over, Map<String, LogicalOperator> specs, LogicalPlan lp, LogicalOperator input) :
+{
+ ExpressionOperator item;
+ ExpressionOperator projection;
+ Schema subSchema = null;
+ Token t;
+ log.trace("Entering BaseEvalSpec");
+}
{
(
- item = Const()
+ item = Const(lp)
| (
(
- LOOKAHEAD(FuncEvalSpec(over,specs))
- item = FuncEvalSpec(over,specs)
- | item = ColOrSpec(over,specs)
- | item = BinCond(over,specs)
+ LOOKAHEAD(FuncEvalSpec(over,specs,lp,input))
+ item = FuncEvalSpec(over,specs,lp,input)
+ | item = ColOrSpec(over,specs,lp,input)
+ | item = BinCond(over,specs,lp,input)
+
)
- {item = item.copy(pigContext);}
(
- { subSchema = item.getOutputSchemaForPipe(over); }
+ {
+ subSchema = item.getSchema();
+ //TODO
+ //HACK for the schema problems with LOProject
+ //Check the schema to see if the constituent
+ //field is a bag or a tuple/ If so, then get
+ //that schema and send it out instead of the
+ //actual schema
+ log.debug("Printing subSchema Aliases");
+ subSchema.printAliases();
+
+ log.debug("Printing the field schemas of subSchema");
+ for(Schema.FieldSchema fs: subSchema.getFields()) {
+ log.debug("fs: " + fs);
+ subSchema = fs.schema;
+ }
+
+ }
(
- "." projection = BracketedSimpleProj(subSchema)
+ "." projection = BracketedSimpleProj(subSchema,lp,item)
{
assertAtomic(item,false);
- item = item.addSpec(projection);
+ lp.remove(item);
+ item = projection;
}
)
| ( "#" t = <QUOTEDSTRING> {
assertAtomic(item, false);
- item = item.addSpec(new MapLookupSpec(unquote(t.image)));
+ ExpressionOperator mapLookup = new LOMapLookup(lp, new OperatorKey(scope, getNextId()), item, unquote(t.image), DataType.BYTEARRAY, null);
+ item = mapLookup;
+ lp.add(mapLookup);
+ log.debug("BaseEvalSpec: Added operator " + mapLookup.getClass().getName() + " " + mapLookup + " to logical plan " + lp);
+ lp.connect(item, mapLookup);
+ log.debug("BaseEvalSpec: Connected operator " + item.getClass().getName() + " " + item+ " to " + mapLookup + " logical plan " + lp);
}
)
- )*
+ )*
)
)
- {return item;}
+ {log.trace("Exiting BaseEvalSpec"); return item;}
}
-EvalSpec BinCond(Schema over, Map<String, EvalSpec> specs):
-{Cond cond; EvalSpec ifTrue, ifFalse; EvalSpec ret = null;}
+ExpressionOperator BinCond(Schema over, Map<String, LogicalOperator> specs, LogicalPlan lp, LogicalOperator input):
+{
+ ExpressionOperator cond;
+ ExpressionOperator ifTrue, ifFalse;
+ LogicalPlan conditionPlan = new LogicalPlan();
+ LogicalPlan truePlan = new LogicalPlan();
+ LogicalPlan falsePlan = new LogicalPlan();
+ log.trace("Entering BinCond");
+}
{
(
- "(" cond = PCond(over,specs) "?" ifTrue = InfixExpr(over,specs)
- ":" ifFalse = InfixExpr(over,specs) ")"
+ "(" cond = PCond(over,specs,lp,input) "?" ifTrue = InfixExpr(over,specs,lp,input)
+ ":" ifFalse = InfixExpr(over,specs,lp,input) ")"
+
)
- { return new BinCondSpec(cond,ifTrue,ifFalse);}
+ {
+ //ExpressionOperator bincond = new LOBinCond(lp, new OperatorKey(scope, getNextId()), conditionPlan, truePlan, falsePlan);
+ ExpressionOperator bincond = new LOBinCond(lp, new OperatorKey(scope, getNextId()), cond, ifTrue, ifFalse);
+ //TODO - Need to connect expression operators with the new plan
+ lp.add(bincond);
+ log.debug("BinCond: Added operator " + bincond.getClass().getName() + " " + bincond + " to logical plan " + lp);
+ lp.connect(cond, bincond);
+ lp.connect(ifTrue, bincond);
+ lp.connect(ifFalse, bincond);
+ log.trace("Exiting BinCond");
+ return bincond;
+ }
}
-EvalSpec FuncEvalSpec(Schema over, Map<String, EvalSpec> specs) : {String funcName; GenerateSpec args;}
+ExpressionOperator FuncEvalSpec(Schema over, Map<String, LogicalOperator> specs, LogicalPlan lp, LogicalOperator input) :
{
- funcName=EvalFunction() "(" args=EvalArgs(over,specs) ")"
- {return new FuncEvalSpec(pigContext, funcName, args);}
+ String funcName;
+ List<ExpressionOperator> args;
+ log.trace("Entering FuncEvalSpec");
+}
+{
+ funcName=EvalFunction() "(" args=EvalArgs(over,specs,lp,input) ")"
+ {
+ ExpressionOperator userFunc = new LOUserFunc(lp, new OperatorKey(scope, getNextId()), funcName, args, DataType.BYTEARRAY);
+ lp.add(userFunc);
+ log.debug("FuncEvalSpec: Added operator " + userFunc.getClass().getName() + " " + userFunc + " to logical plan " + lp);
+ for(ExpressionOperator exprOp: args) {
+ lp.connect(exprOp, userFunc);
+ log.debug("FuncEvalSpec: Connected operator " + exprOp.getClass().getName() + " " + exprOp+ " to " + userFunc + " logical plan " + lp);
+ }
+ log.trace("Exiting BinCond");
+ return userFunc;
+ }
}
-GenerateSpec EvalArgs(Schema over, Map<String, EvalSpec> specs) : {ArrayList<EvalSpec> specList = new ArrayList<EvalSpec>(); EvalSpec item;}
+List<ExpressionOperator> EvalArgs(Schema over, Map<String, LogicalOperator> specs, LogicalPlan lp, LogicalOperator input) :
+{
+ ArrayList<ExpressionOperator> specList = new ArrayList<ExpressionOperator>();
+ ExpressionOperator item;
+ log.trace("Entering EvalArgs");
+}
{
(
- (item=EvalArgsItem(over,specs) {specList.add(item);}
- ("," item=EvalArgsItem(over,specs) {specList.add(item);})*)
+ (item=EvalArgsItem(over,specs,lp,input) {specList.add(item);}
+ ("," item=EvalArgsItem(over,specs,lp,input) {specList.add(item);})*)
+
| {}
)
{
- return new GenerateSpec(specList);
+ log.trace("Exiting EvalArgs");
+ return specList;
}
}
-EvalSpec EvalArgsItem(Schema over, Map<String, EvalSpec> specs):
-{EvalSpec item;}
+ExpressionOperator EvalArgsItem(Schema over, Map<String, LogicalOperator> specs, LogicalPlan lp, LogicalOperator input):
+{
+ ExpressionOperator item;
+ log.trace("Entering EvalArgsItem");
+}
{
(
- item = InfixExpr(over,specs)
-| item = Star()
+ item = InfixExpr(over,specs,lp,input)
+| <STAR>
+ {
+ LOProject project = new LOProject(lp, new OperatorKey(scope, getNextId()), input, -1);
+ project.setStar(true);
+ item = project;
+ lp.add(project);
+ log.debug("EvalArgsItem: Added operator " + project.getClass().getName() + " " + project + " to logical plan " + lp);
+ }
)
- {return item;}
+ {log.trace("Entering EvalArgsItem");return item;}
}
-Schema Schema() : { Token t1; Schema item = null;}
+Schema.FieldSchema FieldSchema() :
+{
+ Token t1;
+ Schema item = null;
+ Schema.FieldSchema fs = null;
+ log.trace("Entering Schema");
+}
{
(
- LOOKAHEAD(SchemaTuple()) item = SchemaTuple()
-| LOOKAHEAD(SchemaBag()) item = SchemaBag()
-| LOOKAHEAD(AtomSchema()) item = AtomSchema()
+ LOOKAHEAD(SchemaTuple()) fs = SchemaTuple()
+| LOOKAHEAD(SchemaBag()) fs = SchemaBag()
+| LOOKAHEAD(AtomSchema()) fs = AtomSchema()
)
- {return item;}
+ //{log.debug("Printing Aliases"); item.printAliases();log.trace("Exiting Schema");return item;}
+ {log.trace("Exiting Schema");return fs;}
}
-Schema AtomSchema() : {Token t1;}
+Schema.FieldSchema AtomSchema() :
+{
+ Token t1;
+ log.trace("Entering AtomSchema");
+}
{
- ( ( t1 = <IDENTIFIER> ) { return new AtomSchema(t1.image); } )
+ ( ( t1 = <IDENTIFIER> )
+ {
+ log.debug("AtomSchema: " + t1.image);
+
+ log.trace("Exiting AtomSchema");
+ return new Schema.FieldSchema(t1.image, DataType.BYTEARRAY);
+ }
+ )
}
-TupleSchema SchemaTuple() : {Token t1 = null; TupleSchema list;}
+Schema.FieldSchema SchemaTuple() :
+{
+ Token t1 = null;
+ Schema list;
+ Schema.FieldSchema fs;
+ log.trace("Entering SchemaTuple");
+}
{
[( t1 = <IDENTIFIER> ) ":"] "(" list = TupleSchema() ")"
{
- if (t1!=null)
- list.setAlias(t1.image);
- return list;
+ if (null != t1) {
+ log.debug("TUPLE alias " + t1.image);
+ fs = new Schema.FieldSchema(t1.image, list, DataType.TUPLE);
+ } else {
+ fs = new Schema.FieldSchema(null, list, DataType.TUPLE);
+ }
+ log.trace("Exiting SchemaTuple");
+ return fs;
}
}
-TupleSchema SchemaBag() : {Token t1 = null; TupleSchema list;}
+Schema.FieldSchema SchemaBag() :
+{
+ Token t1 = null;
+ Schema list;
+ Schema.FieldSchema fs;
+ log.trace("Entering SchemaBag");
+}
{
[( t1 = <IDENTIFIER> ) ":"] "[" list = TupleSchema() "]"
{
- if (t1!=null)
- list.setAlias(t1.image);
- return list;
+ if (null != t1) {
+ log.debug("BAG alias " + t1.image);
+ fs = new Schema.FieldSchema(t1.image, list, DataType.BAG);
+ } else {
+ fs = new Schema.FieldSchema(null, list, DataType.BAG);
+ }
+ log.trace("Exiting SchemaBag");
+ return fs;
}
}
-TupleSchema TupleSchema() : { Schema item = null; TupleSchema list = new TupleSchema(); }
+Schema TupleSchema() :
+{
+ Schema item = null;
+ Schema list = new Schema();
+ Schema.FieldSchema fs = null;
+ log.trace("Entering TupleSchema");
+}
{
(
- ( item=Schema() { list.add(item); }
- ( "," item=Schema() {list.add(item);} )*
+ (
+ fs = FieldSchema() {log.debug("Adding " + fs.alias + " to the list: " + list);list.add(fs);}
+ ( "," fs = FieldSchema() {log.debug("Adding " + fs.alias + " to the list: " + list);list.add(fs);})*
)
| {}
)
- {return list;}
+ {log.debug("Printing Aliases in TupleSchema"); list.printAliases();log.trace("Exiting TupleSchema");return list;}
}
-
-//CQ stuff
-
-EvalSpec PWindow() : {EvalSpec spec; int numTuples; double time;}
-{
- ( <WINDOW>
- ( LOOKAHEAD(2)
- time = PTimeWindow() { spec = new TimeWindowSpec(WindowSpec.windowType.SLIDING, time); } |
- numTuples = PTupleWindow() { spec = new TupleWindowSpec(WindowSpec.windowType.SLIDING, numTuples);}
- )
- )
- {return spec;}
-}
-
-double PTimeWindow() : {double n; Token t;}
-{
- ( t = <NUMBER> { n = Double.parseDouble(t.image); }
- ( <SECONDS> |
- <MINUTES> { n = n*60; } |
- <HOURS> { n = n * 3600; }
- )
- )
- {return n;}
-}
-
-int PTupleWindow() : {int n; Token t;}
-{
- ( t = <NUMBER> { try{
- n = Integer.parseInt(t.image);
- }catch(NumberFormatException e){
- throw new ParseException("Only whole number tuple windows allowed.");
- }
- }
- <TUPLES>
- )
- {return n;}
-}
-
-
-
-
-
// These the simple non-terminals that are shared across many
-String EvalFunction() : {String funcName;}
+String EvalFunction() :
{
- funcName = QualifiedFunction()
- {
- try{
- EvalFunc ef = (EvalFunc) pigContext.instantiateFuncFromAlias(funcName);
- }catch (Exception e){
- throw new ParseException(e.getMessage());
- }
- return funcName;
- }
+ String funcName;
+ log.trace("Entering EvalFunction");
}
-
-String FilterFunction() : {String funcName;}
{
funcName = QualifiedFunction()
{
try{
- FilterFunc ff = (FilterFunc) pigContext.instantiateFuncFromAlias(funcName);
+ LOUserFunc ef = (LOUserFunc) pigContext.instantiateFuncFromAlias(funcName);
}catch (Exception e){
- throw new ParseException(funcName + " is not a valid filter function");
+ throw new ParseException(e.getMessage());
}
+
+ log.trace("Exiting EvalFunc");
return funcName;
- }
+ }
}
-
/**
* Bug 831620 - '$' support
*/
@@ -1017,113 +1793,180 @@
/**
* Bug 831620 - '$' support
*/
-String QualifiedFunction() #void : {Token t1;StringBuffer s=new StringBuffer();}
+String QualifiedFunction() #void : {Token t1;StringBuffer s=new StringBuffer(); log.trace("Entering QualifiedFunction");}
{
((t1=<IDENTIFIER> { s.append(t1.image);}
(("." t1=<IDENTIFIER> {s.append("." + t1.image);})|
("$" t1=<IDENTIFIER> {s.append("$" + t1.image);}))*))
- {return s.toString();}
+ {
+ log.debug("QualifiedFunction: " + s.toString());
+ log.trace("Exiting QualifiedFunction");
+ return s.toString();
+ }
}
// If there is one time it may not be bracketed, but if multiple, they must be bracketed
-ProjectSpec BracketedSimpleProj(Schema over) : {EvalSpec es; int i; ProjectSpec spec = null;}
+ExpressionOperator BracketedSimpleProj(Schema over, LogicalPlan lp, LogicalOperator eOp) :
+{
+ ExpressionOperator es;
+ int i;
+ ExpressionOperator spec = null;
+ log.trace("Entering BracketedSimpleProj");
+ log.debug("eOp: " + eOp.getClass().getName());
+}
{
(
- es = ColOrSpec(over,null) {spec = (ProjectSpec) es;}
-| ("(" spec = SimpleProj(over) ")")
+ spec = ColOrSpec(over,null,lp,eOp)
+| ("(" spec = SimpleProj(over,lp,eOp) ")")
+
)
- {return spec;}
+ {log.trace("Exiting BracketedSimpleProj");return spec;}
}
-ProjectSpec SimpleProj(Schema over):
-{EvalSpec i = null; ArrayList<Integer> colList = new ArrayList<Integer>();}
+ExpressionOperator SimpleProj(Schema over, LogicalPlan lp, LogicalOperator eOp):
{
- i = ColOrSpec(over,null) {colList.add(((ProjectSpec)i).getCol());}
- ("," i = ColOrSpec(over, null) {colList.add(((ProjectSpec)i).getCol());})*
- {return new ProjectSpec(colList);}
+ int i;
+ ArrayList<Integer> colList = new ArrayList<Integer>();
+ log.trace("Entering SimpleProj");
}
-
-
-//Just a simple list of projection items
-GenerateSpec SimpleArgs(Schema over) : {EvalSpec i = null; ArrayList<EvalSpec> specList = new ArrayList<EvalSpec>();}
{
- (
- (
- i = SimpleArgsItem(over) {specList.add(i);}
- ("," i = SimpleArgsItem(over) {specList.add(i);})*
- )
- | {}
- )
+ i = ColName(over) {colList.add(i);}
+ ("," i = ColName(over) {colList.add(i);})*
{
- if (specList.isEmpty())
- return null;
- else
- return new GenerateSpec(specList);
- }
+ ExpressionOperator project = new LOProject(lp, new OperatorKey(scope, getNextId()), eOp, colList);
+ lp.add(project);
+ log.debug("SimpleProj: Added operator " + project.getClass().getName() + " " + project + " to logical plan " + lp);
+ log.trace("Exiting SimpleProj");
+ return project;
+ }
}
-EvalSpec SimpleArgsItem(Schema over):
-{EvalSpec item;}
-{
- (
- item = Const()
-| item = ColOrSpec(over,null)
-| item = Star()
- )
- {return item;}
-}
-
-StarSpec Star() : {Token t1; StarSpec spec;}
+ExpressionOperator Const(LogicalPlan lp) :
{
- t1=<STAR>
- {
- spec = new StarSpec();
- spec.setFlatten(true);
- return spec;
- }
+ Token t1;
+ String s;
+ log.trace("Entering Const");
}
-
-EvalSpec Const() : {Token t1; String s;}
{
(
t1=<QUOTEDSTRING> {s = unquote(t1.image);}
| t1 = <NUMBER> {s = t1.image;}
)
- {return new ConstSpec(s);}
+ {
+ ExpressionOperator lConst = new LOConst(lp, new OperatorKey(scope, getNextId()), s);
+ lp.add(lConst);
+ log.debug("Const: Added operator " + lConst.getClass().getName() + " " + lConst + " to logical plan " + lp);
+ log.trace("Exiting Const");
+ return lConst;
+ }
}
-EvalSpec ColOrSpec(Schema over, Map<String, EvalSpec> specs) :
-{EvalSpec spec;}
+ExpressionOperator ColOrSpec(Schema over, Map<String, LogicalOperator> specs, LogicalPlan lp, LogicalOperator eOp) :
+{
+ ExpressionOperator spec;
+ log.trace("Entering ColOrSpec");
+}
{
(
- spec = DollarVar()
-| spec = AliasFieldOrSpec(over,specs)
+ spec = DollarVar(lp, eOp)
+| spec = AliasFieldOrSpec(over,specs,lp,eOp)
+
)
{
+ log.trace("Exiting ColOrSpec");
return spec;
}
}
-ProjectSpec DollarVar() : {Token t1;}
+ExpressionOperator DollarVar(LogicalPlan lp, LogicalOperator eOp) :
+{
+ Token t1;
+ log.trace("Entering DollarVar");
+}
{
t1=<DOLLARVAR>
- {return new ProjectSpec(undollar(t1.image));}
+ {
+ log.debug("Token: " + t1.image);
+ ExpressionOperator project = new LOProject(lp, new OperatorKey(scope, getNextId()), eOp, undollar(t1.image));
+ try {
+ log.debug("eOp: " + eOp.getClass().getName() + " " + eOp);
+ lp.add(project);
+ log.debug("DollarVar: Added operator " + project.getClass().getName() + " " + project + " to logical plan " + lp);
+ } catch (Exception planException) {
+ ParseException parseException = new ParseException(planException.getMessage());
+ throw parseException;
+ }
+ log.trace("Exiting DollarVar");
+ return project;
+ }
}
-EvalSpec AliasFieldOrSpec(Schema over, Map<String, EvalSpec> specs) : {Token t1;}
+ExpressionOperator AliasFieldOrSpec(Schema over, Map<String, LogicalOperator> specs, LogicalPlan lp, LogicalOperator eOp) :
+{
+ Token t1;
+ LogicalPlan projectInputPlan = new LogicalPlan();
+ log.trace("Entering AliasFieldOrSpec");
+}
{
(t1=<GROUP> | t1=<IDENTIFIER>)
- { int i; EvalSpec item = null;
- if (specs!=null)
- item = specs.get(t1.image);
+ {
+ log.debug("Token: " + t1.image);
+ if(null != eOp) log.debug("eOp: " + eOp.getClass().getName());
+ int i;
+ ExpressionOperator item = null;
+ if (specs!=null) {
+ log.debug("specs != null");
+ LogicalOperator op = specs.get(t1.image);
+ if(null != op) {
+ log.debug("Alias: " + op.getAlias());
+
+ item = new LOProject(lp, new OperatorKey(scope, getNextId()), op, -1);
+ ((LOProject)item).setStar(true);
+ log.debug("Set star to true");
+
+ if(insideGenerate()) {
+ log.debug("AliasFieldOrSpec: Inside generate");
+ addGenerateInput(op);
+ }
+ try {
+ lp.add(item);
+ log.debug("AliasFieldOrSpec: Added operator " + item.getClass().getName() + " " + item + " to logical plan " + lp);
+ } catch (Exception planException) {
+ ParseException parseException = new ParseException(planException.getMessage());
+ throw parseException;
+ }
+ }
+ }
if (item == null){
- if ( over == null || (i = over.colFor(t1.image)) == -1)
- throw new ParseException("Invalid alias: " + t1.image + " in " + over);
- item = new ProjectSpec(i);
+ log.debug("item == null");
+ if (null == over) log.debug("over is null");
+ if ( over == null || (i = over.getPosition(t1.image)) == -1) {
+ log.debug("Invalid alias: " + t1.image + " in " + over);
+ if(null != over) {
+ log.debug("Printing out the aliases in the schema");
+ over.printAliases();
+ }
+ throw new ParseException("Invalid alias: " + t1.image + " in " + over);
+ }
+ log.debug("Position of " + t1.image + " = " + i);
+ if(null != over) {
+ log.debug("Printing out the aliases in the schema");
+ over.printAliases();
+ }
+ item = new LOProject(lp, new OperatorKey(scope, getNextId()), eOp, i);
+ try {
+ lp.add(item);
+ log.debug("AliasFieldOrSpec: Added operator " + item.getClass().getName() + " " + item + " to logical plan " + lp);
+ } catch (Exception planException) {
+ ParseException parseException = new ParseException(planException.getMessage());
+ throw parseException;
+ }
}
+
+ log.trace("Exiting AliasFieldOrSpec");
return item;
}
}
Modified: incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/schema/Schema.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/schema/Schema.java?rev=654629&r1=654628&r2=654629&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/schema/Schema.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/schema/Schema.java Thu May 8 14:25:22 2008
@@ -22,9 +22,17 @@
import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import java.util.Set;
+import java.util.Collection;
+import java.io.IOException;
import org.apache.pig.data.DataType;
import org.apache.pig.impl.logicalLayer.parser.ParseException;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.pig.impl.plan.MultiMap;
+import org.apache.pig.impl.logicalLayer.FrontendException;
+
public class Schema {
@@ -40,26 +48,35 @@
public byte type;
/**
- * If this is a tuple itself, it can have a schema. Otherwise this
- * field must be null.
+ * If this is a tuple itself, it can have a schema. Otherwise this field
+ * must be null.
*/
public Schema schema;
+
+ private static Log log = LogFactory.getLog(Schema.FieldSchema.class);
/**
* Constructor for any type.
- * @param a Alias, if known. If unknown leave null.
- * @param t Type, using codes from {@link org.apache.pig.data.DataType}.
+ *
+ * @param a
+ * Alias, if known. If unknown leave null.
+ * @param t
+ * Type, using codes from
+ * {@link org.apache.pig.data.DataType}.
*/
public FieldSchema(String a, byte t) {
alias = a;
- type = t ;
- schema = null;
+ type = t;
+ schema = null;
}
/**
* Constructor for tuple fields.
- * @param a Alias, if known. If unknown leave null.
- * @param s Schema of this tuple.
+ *
+ * @param a
+ * Alias, if known. If unknown leave null.
+ * @param s
+ * Schema of this tuple.
*/
public FieldSchema(String a, Schema s) {
alias = a;
@@ -67,6 +84,29 @@
schema = s;
}
+ /**
+ * Constructor for tuple fields.
+ *
+ * @param a
+ * Alias, if known. If unknown leave null.
+ * @param s
+ * Schema of this tuple.
+ * @param t
+ * Type, using codes from
+ * {@link org.apache.pig.data.DataType}.
+ *
+ */
+ public FieldSchema(String a, Schema s, byte t) throws FrontendException {
+ alias = a;
+ schema = s;
+ log.debug("t: " + t + " Bag: " + DataType.BAG + " tuple: " + DataType.TUPLE);
+ if ((null != s) && (t != DataType.BAG) && (t != DataType.TUPLE)) {
+ throw new FrontendException("Only a BAG or TUPLE can have schemas. Got "
+ + DataType.findTypeName(t));
+ }
+ type = t;
+ }
+
@Override
public boolean equals(Object other) {
if (!(other instanceof FieldSchema)) return false;
@@ -78,6 +118,8 @@
return true;
}
+
+ // TODO Need to add hashcode.
/***
* Compare two field schema for equality
@@ -120,6 +162,14 @@
private List<FieldSchema> mFields;
private Map<String, FieldSchema> mAliases;
+ private MultiMap<FieldSchema, String> mFieldSchemas;
+ private static Log log = LogFactory.getLog(Schema.class);
+
+ public Schema() {
+ mFields = new ArrayList<FieldSchema>();
+ mAliases = new HashMap<String, FieldSchema>();
+ mFieldSchemas = new MultiMap<FieldSchema, String>();
+ }
/**
* @param fields List of field schemas that describes the fields.
@@ -127,8 +177,14 @@
public Schema(List<FieldSchema> fields) {
mFields = fields;
mAliases = new HashMap<String, FieldSchema>(fields.size());
- for (FieldSchema fs : fields) {
- if (fs.alias != null) mAliases.put(fs.alias, fs);
+ mFieldSchemas = new MultiMap<FieldSchema, String>();
+ for (FieldSchema fs : fields) {
+ if (fs.alias != null) {
+ mAliases.put(fs.alias, fs);
+ if(null != fs) {
+ mFieldSchemas.put(fs, fs.alias);
+ }
+ }
}
}
@@ -139,8 +195,13 @@
public Schema(FieldSchema fieldSchema) {
mFields = new ArrayList<FieldSchema>(1);
mFields.add(fieldSchema);
+ mAliases = new HashMap<String, FieldSchema>(1);
+ mFieldSchemas = new MultiMap<FieldSchema, String>();
if (fieldSchema.alias != null) {
mAliases.put(fieldSchema.alias, fieldSchema);
+ if(null != fieldSchema) {
+ mFieldSchemas.put(fieldSchema, fieldSchema.alias);
+ }
}
}
@@ -155,15 +216,18 @@
/**
* Given a field number, find the associated FieldSchema.
- * @param fieldNum Field number to look up.
+ *
+ * @param fieldNum
+ * Field number to look up.
* @return FieldSchema for this field.
- * @throws ParseException if the field number exceeds the number of
- * fields in the tuple.
+ * @throws ParseException
+ * if the field number exceeds the number of fields in the
+ * tuple.
*/
public FieldSchema getField(int fieldNum) throws ParseException {
if (fieldNum >= mFields.size()) {
- throw new ParseException("Attempt to fetch field " + fieldNum +
- " from tuple of size " + mFields.size());
+ throw new ParseException("Attempt to fetch field " + fieldNum
+ + " from tuple of size " + mFields.size());
}
return mFields.get(fieldNum);
@@ -171,6 +235,7 @@
/**
* Find the number of fields in the schema.
+ *
* @return number of fields.
*/
public int size() {
@@ -198,9 +263,39 @@
for (int j = 0; i.hasNext(); j++) {
FieldSchema otherFs = i.next();
FieldSchema ourFs = mFields.get(j);
- if (otherFs.alias != null) ourFs.alias = otherFs.alias;
- if (otherFs.type != DataType.UNKNOWN) ourFs.type = otherFs.type;
- if (otherFs.schema != null) ourFs.schema = otherFs.schema;
+ log.debug("ourFs: " + ourFs + " otherFs: " + otherFs);
+ if (otherFs.alias != null) {
+ log.debug("otherFs.alias: " + otherFs.alias);
+ if (ourFs.alias != null) {
+ log.debug("Removing ourFs.alias: " + ourFs.alias);
+ mAliases.remove(ourFs.alias);
+ Collection<String> aliases = mFieldSchemas.get(ourFs);
+ List<String> listAliases = new ArrayList<String>();
+ for(String alias: aliases) {
+ listAliases.add(new String(alias));
+ }
+ for(String alias: listAliases) {
+ log.debug("Removing alias " + alias + " from multimap");
+ mFieldSchemas.remove(ourFs, alias);
+ }
+ }
+ ourFs.alias = otherFs.alias;
+ log.debug("Setting alias to: " + otherFs.alias);
+ mAliases.put(ourFs.alias, ourFs);
+ if(null != ourFs.alias) {
+ mFieldSchemas.put(ourFs, ourFs.alias);
+ }
+ }
+ if (otherFs.type != DataType.UNKNOWN) {
+ ourFs.type = otherFs.type;
+ log.debug("Setting type to: "
+ + DataType.findTypeName(otherFs.type));
+ }
+ if (otherFs.schema != null) {
+ ourFs.schema = otherFs.schema;
+ log.debug("Setting schema to: " + otherFs.schema);
+ }
+
}
}
@@ -219,7 +314,67 @@
}
return true;
}
+
+ // TODO add hashCode()
+
+ public void add(FieldSchema f) {
+ mFields.add(f);
+ if (null != f.alias) {
+ mAliases.put(f.alias, f);
+ }
+ }
+
+ /**
+ * Given an alias, find the associated position of the field schema.
+ *
+ * @param alias
+ * alias of the FieldSchema.
+ * @return position of the FieldSchema.
+ */
+ public int getPosition(String alias) {
+
+ FieldSchema fs = getField(alias);
+
+ if (null == fs) {
+ return -1;
+ }
+
+ log.debug("fs: " + fs);
+ int index = -1;
+ for(int i = 0; i < mFields.size(); ++i) {
+ log.debug("mFields(" + i + "): " + mFields.get(i) + " alias: " + mFields.get(i).alias);
+ if(fs == mFields.get(i)) {index = i;}
+ }
+
+ log.debug("index: " + index);
+ return index;
+ //return mFields.indexOf(fs);
+ }
+
+ public void addAlias(String alias, FieldSchema fs) {
+ if(null != alias) {
+ mAliases.put(alias, fs);
+ if(null != fs) {
+ mFieldSchemas.put(fs, alias);
+ }
+ }
+ }
+
+ public Set<String> getAliases() {
+ return mAliases.keySet();
+ }
+
+ public void printAliases() {
+ Set<String> aliasNames = mAliases.keySet();
+ for (String alias : aliasNames) {
+ log.debug("Schema Alias: " + alias);
+ }
+ }
+ public List<FieldSchema> getFields() {
+ return mFields;
+ }
+
/**
* Recursively compare two schemas for equality
* @param schema