You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by xu...@apache.org on 2011/05/07 02:15:44 UTC
svn commit: r1100420 [12/19] - in /pig/branches/branch-0.9: ./ src/
src/org/apache/pig/
src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/
src/org/apache/pig/impl/logicalLayer/
src/org/apache/pig/impl/logicalLayer/optimizer/ src/org/apach...
Modified: pig/branches/branch-0.9/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.9/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt?rev=1100420&r1=1100419&r2=1100420&view=diff
==============================================================================
--- pig/branches/branch-0.9/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt (original)
+++ pig/branches/branch-0.9/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt Sat May 7 00:15:40 2011
@@ -1,4252 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-/**
- * JavaCC file
- * This file lists the grammar for PIG Latin.
- * QueryParser program ouputs a ParseTree given a Valid Pig Latin Query
- */
-options {
- // Generate non-static functions
- STATIC = false;
- // Case is ignored in keywords
- IGNORE_CASE = true;
- JAVA_UNICODE_ESCAPE = true;
-}
-
-PARSER_BEGIN(QueryParser)
-package org.apache.pig.impl.logicalLayer.parser;
-import java.io.*;
-import java.util.*;
-import java.net.URI;
-import java.lang.Class;
-import java.net.URISyntaxException;
-import java.lang.reflect.Type;
-import org.apache.pig.impl.logicalLayer.*;
-import org.apache.pig.impl.logicalLayer.schema.*;
-import org.apache.pig.data.DataType;
-import org.apache.pig.impl.PigContext;
-import org.apache.pig.ExecType;
-import org.apache.pig.impl.io.*;
-import org.apache.pig.builtin.PigStorage;
-import org.apache.pig.builtin.RANDOM;
-import org.apache.pig.impl.builtin.GFAny;
-import org.apache.pig.impl.logicalLayer.LogicalPlan;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.pig.impl.util.MultiMap;
-import org.apache.pig.impl.plan.NodeIdGenerator;
-import org.apache.pig.impl.plan.OperatorKey;
-import org.apache.pig.impl.plan.PlanException;
-import org.apache.pig.impl.streaming.StreamingCommand;
-import org.apache.pig.impl.streaming.StreamingCommand.HandleSpec;
-import org.apache.pig.impl.logicalLayer.validators.LogicalPlanValidationExecutor;
-import org.apache.pig.impl.plan.CompilationMessageCollector;
-import org.apache.pig.impl.util.StringUtils;
-import org.apache.pig.StreamToPig;
-import org.apache.pig.PigToStream;
-import org.apache.pig.builtin.PigStreaming;
-import org.apache.pig.data.TupleFactory;
-import org.apache.pig.data.Tuple;
-import org.apache.pig.data.BagFactory;
-import org.apache.pig.data.DataBag;
-import org.apache.pig.EvalFunc;
-import org.apache.pig.ComparisonFunc;
-import org.apache.pig.LoadFunc;
-import org.apache.pig.FuncSpec;
-import org.apache.pig.StoreFuncInterface;
-import org.apache.pig.impl.plan.VisitorException;
-import org.apache.pig.PigException;
-import org.apache.pig.backend.datastorage.DataStorage;
-import org.apache.pig.backend.datastorage.ContainerDescriptor;
-import org.apache.pig.backend.datastorage.ElementDescriptor;
-import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
-import org.apache.pig.backend.executionengine.ExecException;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapreduce.Partitioner;
-import org.apache.pig.impl.util.LinkedMultiMap;
-import org.apache.pig.impl.builtin.ReadScalars;
-
-public class QueryParser {
- private PigContext pigContext;
- private Map<LogicalOperator, LogicalPlan> aliases;
- private Map<OperatorKey, LogicalOperator> opTable;
- private String scope;
- private NodeIdGenerator nodeIdGen;
- //a map of alias to logical operator for a quick lookup
- private Map<String, LogicalOperator> mapAliasOp;
- private static Log log = LogFactory.getLog(QueryParser.class);
- private boolean bracketed = false;
- private boolean scalarFound = false;
- private Map<String, String> fileNameMap;
-
- private long getNextId() {
- return nodeIdGen.getNextNodeId(scope);
- }
-
- public QueryParser(InputStream in,
- PigContext pigContext,
- String scope,
- Map<LogicalOperator, LogicalPlan> aliases,
- Map<OperatorKey, LogicalOperator> opTable,
- Map<String, LogicalOperator> aliasOp,
- Map<String, String> fileNameMap) {
- this(in);
- this.pigContext = pigContext;
- this.aliases = aliases;
- this.opTable = opTable;
- this.scope = scope;
- this.nodeIdGen = NodeIdGenerator.getGenerator();
- this.mapAliasOp = aliasOp;
- this.fileNameMap = fileNameMap;
- }
-
- public QueryParser(InputStream in,
- PigContext pigContext,
- String scope,
- Map<LogicalOperator, LogicalPlan> aliases,
- Map<OperatorKey, LogicalOperator> opTable,
- Map<String, LogicalOperator> aliasOp,
- int start,
- Map<String, String> fileNameMap) {
- this(in, pigContext, scope, aliases, opTable, aliasOp, fileNameMap);
- token_source.input_stream.line = start;
- }
-
- public class CogroupInput {
- public LogicalOperator op;
- public ArrayList<LogicalPlan> plans;
- public boolean isInner;
- }
-
- private static String removeQuotes(String str) {
- if (str.startsWith("\'") && str.endsWith("\'"))
- return str.substring(1, str.length() - 1);
- else
- return str;
- }
-
- public static LogicalPlan generateStorePlan(String scope,
- LogicalPlan readFrom,
- String fileName,
- String func,
- LogicalOperator input,
- String alias, PigContext pigContext) throws FrontendException {
-
- if (func == null) {
- func = PigStorage.class.getName();
- }
-
- fileName = removeQuotes(fileName);
-
- long storeNodeId = NodeIdGenerator.getGenerator().getNextNodeId(scope);
-
- LogicalPlan storePlan = new LogicalPlan();
-
- LogicalOperator store;
- FuncSpec funcSpec = new FuncSpec(func);
- FileSpec fileSpec = new FileSpec(fileName, funcSpec);
- try {
- store = new LOStore(storePlan, new OperatorKey(scope, storeNodeId),
- fileSpec, alias);
- } catch (IOException ioe) {
- throw new FrontendException(ioe.getMessage(), ioe);
- }
-
- Object obj = PigContext.instantiateFuncFromSpec(funcSpec);
- StoreFuncInterface stoFunc = (StoreFuncInterface)obj;
- stoFunc.setStoreFuncUDFContextSignature(LOStore.constructSignature(alias, fileName, funcSpec));
-
- try {
- stoFunc.relToAbsPathForStoreLocation(fileName, getCurrentDir(pigContext));
- } catch (IOException ioe) {
- FrontendException e = new FrontendException(ioe.getMessage(), ioe);
- throw e;
- }
-
- try {
- storePlan.add(store);
- storePlan.add(input);
- storePlan.connect(input, store);
- attachPlan(storePlan, input, readFrom, new HashMap<LogicalOperator, Boolean>());
- } catch (ParseException pe) {
- throw new FrontendException(pe.getMessage(), pe);
- }
-
- if (storePlan.getRoots().size() == 0) throw new RuntimeException("Store plan has no roots!");
- return storePlan;
- }
-
- static String unquote(String s) {
- return StringUtils.unescapeInputString(s.substring(1, s.length()-1)) ;
- }
-
- static int undollar(String s) {
- return Integer.parseInt(s.substring(1, s.length()));
- }
-
- static Path getCurrentDir(PigContext pigContext) throws IOException {
- DataStorage dfs = pigContext.getDfs();
- ContainerDescriptor desc = dfs.getActiveContainer();
- ElementDescriptor el = dfs.asElement(desc);
- return new Path(el.toString());
- }
-
- LogicalOperator parseCogroup(ArrayList<CogroupInput> gis, LogicalPlan lp, LOCogroup.GROUPTYPE type) throws ParseException, PlanException{
-
- log.trace("Entering parseCogroup");
- log.debug("LogicalPlan: " + lp);
-
- int n = gis.size();
- log.debug("Number of cogroup inputs = " + n);
-
- ArrayList<LogicalOperator> los = new ArrayList<LogicalOperator>();
- ArrayList<ArrayList<LogicalPlan>> plans = new ArrayList<ArrayList<LogicalPlan>>();
- MultiMap<LogicalOperator, LogicalPlan> groupByPlans = new MultiMap<LogicalOperator, LogicalPlan>();
- //Map<LogicalOperator, LogicalPlan> groupByPlans = new HashMap<LogicalOperator, LogicalPlan>();
- boolean[] isInner = new boolean[n];
-
- int arity = gis.get(0).plans.size();
-
- for (int i = 0; i < n ; i++){
-
- CogroupInput gi = gis.get(i);
- los.add(gi.op);
- ArrayList<LogicalPlan> planList = gi.plans;
- plans.add(gi.plans);
- int numGrpByOps = planList.size();
- log.debug("Number of group by operators = " + numGrpByOps);
-
- if(arity != numGrpByOps) {
- throw new ParseException("The arity of the group by columns do not match.");
- }
- for(int j = 0; j < numGrpByOps; ++j) {
- groupByPlans.put(gi.op, planList.get(j));
- for(LogicalOperator root: planList.get(j).getRoots()) {
- log.debug("Cogroup input plan root: " + root);
- }
- }
- isInner[i] = gi.isInner;
- }
-
- LogicalOperator cogroup = new LOCogroup(lp, new OperatorKey(scope, getNextId()), groupByPlans, type, isInner);
- lp.add(cogroup);
- log.debug("Added operator " + cogroup.getClass().getName() + " object " + cogroup + " to the logical plan " + lp);
-
- for(LogicalOperator op: los) {
- lp.connect(op, cogroup);
- log.debug("Connected operator " + op.getClass().getName() + " to " + cogroup.getClass().getName() + " in the logical plan");
- }
-
- log.trace("Exiting parseCogroup");
- return cogroup;
- }
-
- private LogicalOperator parseUsingForGroupBy(String modifier, ArrayList<CogroupInput> gis, LogicalPlan lp) throws ParseException, PlanException{
-
- if(modifier.equalsIgnoreCase("collected")){
- if (gis.size() != 1) {
- throw new ParseException("Collected group is only supported for single input");
- }
- if (!isColumnProjectionsOrStar(gis.get(0))) {
- throw new ParseException("Collected group is only supported for columns or star projection");
- }
- LogicalOperator cogroup = parseCogroup(gis, lp, LOCogroup.GROUPTYPE.COLLECTED);
- cogroup.pinOption(LOCogroup.OPTION_GROUPTYPE);
- return cogroup;
- }
-
- else if (modifier.equalsIgnoreCase("regular")){
- LogicalOperator cogroup = parseCogroup(gis, lp, LOCogroup.GROUPTYPE.REGULAR);
- cogroup.pinOption(LOCogroup.OPTION_GROUPTYPE);
- return cogroup;
- }
-
- else if (modifier.equalsIgnoreCase("merge")){
- LogicalOperator cogroup = parseCogroup(gis, lp, LOCogroup.GROUPTYPE.MERGE);
- cogroup.pinOption(LOCogroup.OPTION_GROUPTYPE);
- return cogroup;
- }
-
- else{
- throw new ParseException("Only COLLECTED, REGULAR or MERGE are valid GROUP modifiers.");
- }
- }
-
- /**
- * Join parser.
- */
- LogicalOperator parseJoin(ArrayList<CogroupInput> gis, LogicalPlan lp, LOJoin.JOINTYPE jt) throws ParseException, PlanException{
- log.trace("Entering parseJoin");
-
- int n = gis.size();
-
- if (jt == LOJoin.JOINTYPE.SKEWED && n != 2) {
- throw new ParseException("Skewed join can only be applied for 2-way joins");
- }
- if (jt == LOJoin.JOINTYPE.MERGE && n != 2) {
- throw new ParseException("Merge join can only be applied for 2-way joins");
- }
-
- ArrayList<LogicalOperator> los = new ArrayList<LogicalOperator>();
- ArrayList<ArrayList<LogicalPlan>> plans = new ArrayList<ArrayList<LogicalPlan>>();
- MultiMap<LogicalOperator, LogicalPlan> joinPlans = new LinkedMultiMap<LogicalOperator, LogicalPlan>();
- boolean[] isInner = new boolean[n];
-
- int arity = gis.get(0).plans.size();
-
- for (int i = 0; i < n ; i++){
-
- CogroupInput gi = gis.get(i);
- los.add(gi.op);
- ArrayList<LogicalPlan> planList = gi.plans;
- plans.add(gi.plans);
- int numJoinOps = planList.size();
- log.debug("Number of join operators = " + numJoinOps);
-
- if(arity != numJoinOps) {
- throw new ParseException("The arity of the join columns do not match.");
- }
- for(int j = 0; j < numJoinOps; ++j) {
- joinPlans.put(gi.op, planList.get(j));
- for(LogicalOperator root: planList.get(j).getRoots()) {
- log.debug("Join input plan root: " + root);
- }
- }
- isInner[i] = gi.isInner;
- }
-
- LogicalOperator loj = new LOJoin(lp, new OperatorKey(scope, getNextId()), joinPlans, jt, isInner);
- lp.add(loj);
- log.debug("Added operator " + loj.getClass().getName() + " object " + loj + " to the logical plan " + lp);
-
- for(LogicalOperator op: los) {
- lp.connect(op, loj);
- log.debug("Connected operator " + op.getClass().getName() + " to " + loj.getClass().getName() + " in the logical plan");
- }
-
- log.trace("Exiting parseJoin");
- return loj;
- }
-
- /**
- * The join operator is translated to foreach
- */
- LogicalOperator rewriteJoin(ArrayList<CogroupInput> gis, LogicalPlan lp) throws ParseException, PlanException{
-
- log.trace("Entering rewriteJoin");
- log.debug("LogicalPlan: " + lp);
- int n = gis.size();
- ArrayList<ExpressionOperator> flattenedColumns = new ArrayList<ExpressionOperator>();
- ArrayList<LogicalPlan> generatePlans = new ArrayList<LogicalPlan>();
- ArrayList<Boolean> flattenList = new ArrayList<Boolean>();
-
- /*
- * Construct the projection operators required for the generate
- * Make sure that the operators are flattened
- */
-
-
-
- //Construct the cogroup operator and add it to the logical plan
- // for join, inner is true for all the inputs involved in the join
- for (int i = 0; i < n; i++) {
- (gis.get(i)).isInner = true;
- }
- LogicalOperator cogroup = parseCogroup(gis, lp, LOCogroup.GROUPTYPE.REGULAR);
- lp.add(cogroup);
- log.debug("Added operator " + cogroup.getClass().getName() + " to the logical plan");
-
- for (int i = 0; i < n; i++) {
- LogicalPlan projectPlan = new LogicalPlan();
- LogicalOperator projectInput = cogroup;
- ExpressionOperator column = new LOProject(projectPlan, new OperatorKey(scope, getNextId()), projectInput, i+1);
- flattenList.add(true);
- flattenedColumns.add(column);
- projectPlan.add(column);
- if(projectInput instanceof ExpressionOperator) {
- projectPlan.add(projectInput);
- projectPlan.connect(projectInput, column);
- }
- log.debug("parseCogroup: Added operator " + column.getClass().getName() + " " + column + " to logical plan " + projectPlan);
- generatePlans.add(projectPlan);
- }
-
-
-
- /*
- * Construct the foreach operator from the foreach logical plan
- * Add the foreach operator to the top level logical plan
- */
- LogicalOperator foreach = new LOForEach(lp, new OperatorKey(scope, getNextId()), generatePlans, flattenList);
- lp.add(foreach);
- log.debug("Added operator " + foreach.getClass().getName() + " to the logical plan");
- lp.connect(cogroup, foreach);
- log.debug("Connected operator " + cogroup.getClass().getName() + " to opeator " + foreach.getClass().getName() + " in the logical plan " + lp);
-
- log.trace("Exiting rewriteJoin");
- return foreach;
- }
-
- private LogicalOperator parseUsingForJoin(String modifier, ArrayList<CogroupInput> gis,
- LogicalPlan lp, boolean isFullOuter, boolean isRightOuter, boolean isOuter) throws
- ParseException, PlanException{
-
- if (modifier.equalsIgnoreCase("repl") || modifier.equalsIgnoreCase("replicated")) {
- if(isFullOuter || isRightOuter) {
- throw new ParseException("Replicated join does not support (right|full) outer joins");
- }
- LogicalOperator joinOp = parseJoin(gis, lp, LOJoin.JOINTYPE.REPLICATED);
- joinOp.pinOption(LOJoin.OPTION_JOIN);
- return joinOp;
- }
- else if (modifier.equalsIgnoreCase("hash") || modifier.equalsIgnoreCase("default")) {
- LogicalOperator joinOp = parseJoin(gis, lp, LOJoin.JOINTYPE.HASH);
- joinOp.pinOption(LOJoin.OPTION_JOIN);
- return joinOp;
- }
- else if (modifier.equalsIgnoreCase("skewed")) {
- LogicalOperator joinOp = parseJoin(gis, lp, LOJoin.JOINTYPE.SKEWED);
- joinOp.pinOption(LOJoin.OPTION_JOIN);
- return joinOp;
- }
- else if (modifier.equalsIgnoreCase("merge")) {
- LogicalOperator joinOp = parseJoin(gis, lp, LOJoin.JOINTYPE.MERGE);
- joinOp.pinOption(LOJoin.OPTION_JOIN);
- return joinOp;
- }
- else{
- throw new ParseException("Only REPL, REPLICATED, HASH, SKEWED and MERGE are vaild JOIN modifiers.");
- }
- }
-
- void assertAtomic(LogicalOperator spec, boolean desiredAtomic) throws ParseException{
- Boolean isAtomic = null;
- if ( spec instanceof LOConst ||
- (spec instanceof LOUserFunc &&
- DataType.isAtomic(((LOUserFunc)spec).getType())))
- isAtomic = true;
- else if (spec instanceof LOUserFunc)
- isAtomic = false;
-
- if (isAtomic != null && isAtomic != desiredAtomic){
- if (desiredAtomic)
- throw new ParseException("Atomic field expected but found non-atomic field");
- else
- throw new ParseException("Non-atomic field expected but found atomic field");
- }
- }
-
- void addSplitOutput(LogicalPlan lp, LOSplit splitOp, String alias, LogicalPlan condPlan, int index) throws PlanException{
- LogicalOperator splitOutput = new LOSplitOutput(lp, new OperatorKey(scope, getNextId()), index, condPlan);
- splitOp.addOutput(splitOutput);
- addAlias(alias, splitOutput);
- splitOutput.setAlias(alias);
- addLogicalPlan(splitOutput, lp);
-
- lp.add(splitOutput);
- log.debug("Added alias: " + splitOutput.getAlias() + " class: "
- + splitOutput.getClass().getName() + " to the logical plan");
-
- lp.connect(splitOp, splitOutput);
- log.debug("Connected " + splitOp.getClass().getName() + " to class: "
- + splitOutput.getClass().getName() + " in the logical plan");
-
- }
-
- void addAlias(String alias, LogicalOperator lOp) {
- mapAliasOp.put(alias, lOp);
- }
-
- LogicalOperator getOp(String alias) {
- return mapAliasOp.get(alias);
- }
-
- Set<String> getRemoteHosts(String absolutePath, String defaultHost) {
- String HAR_PREFIX = "hdfs-";
- Set<String> result = new HashSet<String>();
- String[] fnames = absolutePath.split(",");
- for (String fname: fnames) {
- // remove leading/trailing whitespace(s)
- fname = fname.trim();
- Path p = new Path(fname);
- URI uri = p.toUri();
- if(uri.isAbsolute()) {
- String scheme = uri.getScheme();
- if (scheme!=null && scheme.toLowerCase().equals("hdfs")||scheme.toLowerCase().equals("har")) {
- if (uri.getHost()==null)
- continue;
- String thisHost = uri.getHost().toLowerCase();
- if (scheme.toLowerCase().equals("har")) {
- if (thisHost.startsWith(HAR_PREFIX)) {
- thisHost = thisHost.substring(HAR_PREFIX.length());
- }
- }
- if (!uri.getHost().isEmpty() &&
- !thisHost.equals(defaultHost)) {
- if (uri.getPort()!=-1)
- result.add("hdfs://"+thisHost+":"+uri.getPort());
- else
- result.add("hdfs://"+thisHost);
- }
- }
- }
- }
- return result;
- }
-
- void setHdfsServers(String absolutePath, PigContext pigContext) throws URISyntaxException {
- // Get native host
- String defaultFS = (String)pigContext.getProperties().get("fs.default.name");
- URI defaultFSURI = new URI(defaultFS);
- String defaultHost = defaultFSURI.getHost();
- if (defaultHost == null) defaultHost = "";
-
- defaultHost = defaultHost.toLowerCase();
-
- Set<String> remoteHosts = getRemoteHosts(absolutePath, defaultHost);
-
- String hdfsServersString = (String)pigContext.getProperties().get("mapreduce.job.hdfs-servers");
- if (hdfsServersString == null) hdfsServersString = "";
- String hdfsServers[] = hdfsServersString.split(",");
-
- for (String remoteHost : remoteHosts) {
- boolean existing = false;
- for (String hdfsServer : hdfsServers) {
- if (hdfsServer.equals(remoteHost)) {
- existing = true;
- }
- }
- if (!existing) {
- if (!hdfsServersString.isEmpty()) {
- hdfsServersString = hdfsServersString + ",";
- }
- hdfsServersString = hdfsServersString + remoteHost;
- }
- }
-
- if (!hdfsServersString.isEmpty()) {
- pigContext.getProperties().setProperty("mapreduce.job.hdfs-servers", hdfsServersString);
- }
- }
-
- // Check and set files to be automatically shipped for the given StreamingCommand
- // Auto-shipping rules:
- // 1. If the command begins with either perl or python assume that the
- // binary is the first non-quoted string it encounters that does not
- // start with dash - subject to restrictions in (2).
- // 2. Otherwise, attempt to ship the first string from the command line as
- // long as it does not come from /bin, /user/bin, /user/local/bin.
- // It will determine that by scanning the path if an absolute path is
- // provided or by executing "which". The paths can be made configurable
- // via "set stream.skippath <paths>" option.
- private static final String PERL = "perl";
- private static final String PYTHON = "python";
- private void checkAutoShipSpecs(StreamingCommand command, String[] argv)
- throws ParseException {
- // Candidate for auto-ship
- String arg0 = argv[0];
-
- // Check if command is perl or python ... if so use the first non-option
- // and non-quoted string as the candidate
- if (arg0.equalsIgnoreCase(PERL) || arg0.equalsIgnoreCase(PYTHON)) {
- for (int i=1; i < argv.length; ++i) {
- if (!argv[i].startsWith("-") && !isQuotedString(argv[i])) {
- checkAndShip(command, argv[i]);
- break;
- }
- }
- } else {
- // Ship the first argument if it can be ...
- checkAndShip(command, arg0);
- }
- }
-
- private void checkAndShip(StreamingCommand command, String arg)
- throws ParseException {
- // Don't auto-ship if it is an absolute path...
- if (arg.startsWith("/")) {
- return;
- }
-
- // $ which arg
- String argPath = which(arg);
- if (argPath != null && !inSkipPaths(argPath)) {
- try {
- command.addPathToShip(argPath);
- } catch(IOException e) {
- ParseException pe = new ParseException(e.getMessage());
- pe.initCause(e);
- throw pe;
- }
- }
-
- }
-
- private boolean isQuotedString(String s) {
- return (s.charAt(0) == '\'' && s.charAt(s.length()-1) == '\'');
- }
-
- // Check if file is in the list paths to be skipped
- private boolean inSkipPaths(String file) {
- for (String skipPath : pigContext.getPathsToSkip()) {
- if (file.startsWith(skipPath)) {
- return true;
- }
- }
- return false;
- }
-
-
- private static String which(String file) {
- try {
- ProcessBuilder processBuilder =
- new ProcessBuilder(new String[] {"which", file});
- Process process = processBuilder.start();
-
- BufferedReader stdout =
- new BufferedReader(new InputStreamReader(process.getInputStream()));
- String fullPath = stdout.readLine();
-
- return (process.waitFor() == 0) ? fullPath : null;
- } catch (Exception e) {}
- return null;
- }
-
- private static final char SINGLE_QUOTE = '\'';
- private static final char DOUBLE_QUOTE = '"';
- private static String[] splitArgs(String command) throws ParseException {
- List<String> argv = new ArrayList<String>();
-
- int beginIndex = 0;
-
- while (beginIndex < command.length()) {
- // Skip spaces
- while (Character.isWhitespace(command.charAt(beginIndex))) {
- ++beginIndex;
- }
-
- char delim = ' ';
- char charAtIndex = command.charAt(beginIndex);
- if (charAtIndex == SINGLE_QUOTE || charAtIndex == DOUBLE_QUOTE) {
- delim = charAtIndex;
- }
-
- int endIndex = command.indexOf(delim, beginIndex+1);
- if (endIndex == -1) {
- if (Character.isWhitespace(delim)) {
- // Reached end of command-line
- argv.add(command.substring(beginIndex));
- break;
- } else {
- // Didn't find the ending quote/double-quote
- throw new ParseException("Illegal command: " + command);
- }
- }
-
- if (Character.isWhitespace(delim)) {
- // Do not consume the space
- argv.add(command.substring(beginIndex, endIndex));
- } else {
- argv.add(command.substring(beginIndex, endIndex+1));
- }
-
- beginIndex = endIndex + 1;
- }
-
- return argv.toArray(new String[argv.size()]);
- }
-
- //BEGIN
- //I am maintaining state about the operators that should
- //serve as the inputs to generate in the foreach logical
- //plan. I did not want to pass this structure around for
- //the entire parse tree
-
- private boolean insideGenerate = false; //to check if we are parsing inside a generate statement
- private List<LogicalOperator> generateInputs = new ArrayList<LogicalOperator>();
-
- boolean insideGenerate() {
- return insideGenerate;
- }
-
- void setInsideGenerate(boolean b) {
- insideGenerate = b;
- }
-
- List<LogicalOperator> getGenerateInputs() {
- return generateInputs;
- }
-
- void resetGenerateInputs() {
- generateInputs.clear();
- }
-
- void addGenerateInput(LogicalOperator op) {
- generateInputs.add(op);
- }
-
- void resetGenerateState() {
- insideGenerate = false;
- resetGenerateInputs();
- }
-
- boolean checkGenerateInput(LogicalOperator in) {
- if(null == generateInputs) return false;
- for(LogicalOperator op: generateInputs) {
- if(op == in) return true;
- }
- return false;
- }
-
- //END
-
- private static Map<String, Byte> nameToTypeMap = DataType.genNameToTypeMap();
-
- public void addLogicalPlan(LogicalOperator op, LogicalPlan plan) {
- aliases.put(op, plan);
- }
-
- public LogicalPlan getLogicalPlan(LogicalOperator op) {
- return aliases.get(op);
- }
-
- public static void attachPlan(LogicalPlan lp, LogicalOperator root, LogicalPlan rootPlan, Map<LogicalOperator, Boolean> rootProcessed) throws ParseException {
- log.trace("Entering attachPlan");
- if(null == rootProcessed) {
- rootProcessed = new HashMap<LogicalOperator, Boolean>();
- }
- if((rootProcessed.get(root) != null) && (rootProcessed.get(root))) {
- log.trace("Root has been processed");
- log.trace("Exiting attachPlan");
- return;
- }
- lp.add(root);
- log.debug("Added operator " + root + " to the logical plan " + lp);
- if(null == rootPlan.getPredecessors(root)) {
- log.trace("Exiting attachPlan");
- return;
- }
- for(LogicalOperator rootPred: rootPlan.getPredecessors(root)) {
- attachPlan(lp, rootPred, rootPlan, rootProcessed);
- rootProcessed.put(rootPred, true);
- try {
- lp.connect(rootPred, root);
- log.debug("Connected operator " + rootPred + " to " + root + " in the logical plan " + lp);
- } catch (FrontendException fee) {
- ParseException pe = new ParseException(fee.getMessage());
- pe.initCause(fee);
- throw pe;
- }
- }
- log.trace("Exiting attachPlan");
- }
-
- boolean isColumnProjectionsOrStar(CogroupInput cgi) {
- if (cgi == null || cgi.plans == null || cgi.plans.size() == 0) {
- return false;
- }
- for (LogicalPlan keyPlan: cgi.plans) {
- for (LogicalOperator op : keyPlan) {
- if(!(op instanceof LOProject)) {
- return false;
- }
- }
- }
- return true;
- }
-
- static String constructFileNameSignature(String fileName, FuncSpec funcSpec) {
- return fileName+"_"+funcSpec.toString();
- }
-
- ExpressionOperator attachColPosToReadScalar(LogicalPlan lp,
- ExpressionOperator expr, int colNum, Schema over)
- throws PlanException, FrontendException{
-
- scalarFound = false;
- // We also need to attach LOConst to the userfunc
- // so that it can read that projection number in ReadScalars UDF
- LOConst rconst = new LOConst(lp, new OperatorKey(scope, getNextId()), colNum);
- rconst.setType(DataType.INTEGER);
- lp.add(rconst);
- lp.connect(rconst, expr);
-
-// if(over != null && over.getField(colNum).type != DataType.BYTEARRAY) {
-// LOCast loCast = new LOCast(lp, new OperatorKey(scope, getNextId()), over.getField(colNum).type);
-// lp.add(loCast);
-// lp.connect(expr, loCast);
-// expr = loCast;
-// }
- return expr;
- }
-}
-
-
-
-class FunctionType {
- public static final byte UNKNOWNFUNC = 0;
- public static final byte EVALFUNC = 2;
- public static final byte COMPARISONFUNC = 4;
- public static final byte LOADFUNC = 8;
- public static final byte STOREFUNC = 16;
- public static final byte PIGTOSTREAMFUNC = 32;
- public static final byte STREAMTOPIGFUNC = 64;
-
- public static void tryCasting(Object func, byte funcType) throws Exception {
- switch(funcType) {
- case FunctionType.EVALFUNC:
- EvalFunc evalFunc = (EvalFunc) func;
- break;
- case FunctionType.COMPARISONFUNC:
- ComparisonFunc comparisonFunc = (ComparisonFunc) func;
- break;
- case FunctionType.LOADFUNC:
- LoadFunc loadFunc = (LoadFunc) func;
- break;
- case FunctionType.STOREFUNC:
- StoreFuncInterface storeFunc = (StoreFuncInterface) func;
- break;
- case FunctionType.PIGTOSTREAMFUNC:
- PigToStream ptsFunc = (PigToStream) func;
- break;
- case FunctionType.STREAMTOPIGFUNC:
- StreamToPig stpFunc = (StreamToPig) func;
- break;
- default:
- throw new Exception("Received an unknown function type: " + funcType);
- }
- }
-};
-
-class ClassType {
- public static final byte UNKNOWNCLASS = 0;
- public static final byte PARTITIONER = 2;
-
- public static void checkClassType(Class cs, byte classType) throws Exception {
- switch(classType) {
- case ClassType.PARTITIONER:
- if(!(cs.newInstance() instanceof Partitioner)) {
- throw new Exception("Not a class of org.apache.hadoop.mapreduce.Partitioner");
- }
- break;
- default:
- throw new Exception("Received an unknown class type: " + classType);
- }
- }
-}
-
-PARSER_END(QueryParser)
-
-// Skip all the new lines, tabs and spaces
-SKIP : { " " | "\r" | "\t" | "\n" }
-
-// Skip comments(single line and multiline)
-SKIP : {
- <"--"(~["\r","\n"])*>
-| <"/*" (~["*"])* "*" ("*" | (~["*","/"] (~["*"])* "*"))* "/">
-}
-// Every readable token added to following list before identifier needs to be added in
-// IdentifierOrReserved function so that package names will allow these keywords
-// Also, add it to TestLogicalPlanBuilder.testReservedWordsInFunctionNames
-
-// Comparison operators that can be used in a filter:
-TOKEN : { <#STRFILTEROP : "eq" | "gt" | "lt" | "gte" | "lte" | "neq" > }
-TOKEN : { <#NUMFILTEROP : "==" | "<" | "<=" | ">" | ">=" | "!=" > }
-TOKEN : { <FILTEROP : <STRFILTEROP> | <NUMFILTEROP> > }
-
-// List all the keywords in the language
-TOKEN : { <DEFINE : "define"> }
-TOKEN : { <LOAD : "load"> }
-TOKEN : { <FILTER : "filter"> }
-TOKEN : { <FOREACH : "foreach"> }
-TOKEN : { <MATCHES : "matches"> }
-TOKEN : { <ORDER : "order"> }
-TOKEN : { <ARRANGE : "arrange"> }
-TOKEN : { <DISTINCT : "distinct"> }
-TOKEN : { <COGROUP : "cogroup"> }
-TOKEN : { <JOIN : "join"> }
-TOKEN : { <CROSS : "cross"> }
-TOKEN : { <UNION : "union"> }
-TOKEN : { <SPLIT : "split"> }
-TOKEN : { <INTO : "into"> }
-TOKEN : { <IF : "if"> }
-TOKEN : { <ALL : "all"> }
-TOKEN : { <ANY : "any"> }
-TOKEN : { <AS : "as"> }
-TOKEN : { <BY : "by"> }
-TOKEN : { <USING : "using"> }
-TOKEN : { <INNER : "inner"> }
-TOKEN : { <OUTER : "outer"> }
-TOKEN : { <ONSCHEMA : "ONSCHEMA"> }
-TOKEN : { <STAR : "*"> }
-TOKEN : { <PARALLEL : "parallel"> }
-TOKEN : { <PARTITION : "partition"> }
-TOKEN : { <GROUP : "group"> }
-TOKEN : { <AND : "and"> }
-TOKEN : { <OR : "or"> }
-TOKEN : { <NOT : "not"> }
-TOKEN : { <GENERATE : "generate"> }
-TOKEN : { <FLATTEN : "flatten"> }
-TOKEN : { <EVAL : "eval"> }
-TOKEN : { <ASC : "asc"> }
-TOKEN : { <DESC : "desc"> }
-TOKEN : { <INT : "int"> }
-TOKEN : { <LONG : "long"> }
-TOKEN : { <FLOAT : "float"> }
-TOKEN : { <DOUBLE : "double"> }
-TOKEN : { <CHARARRAY : "chararray"> }
-TOKEN : { <BYTEARRAY : "bytearray"> }
-TOKEN : { <BAG : "bag"> }
-TOKEN : { <TUPLE : "tuple"> }
-TOKEN : { <MAP : "map"> }
-TOKEN : { <IS : "is"> }
-TOKEN : { <NULL : "null"> }
-TOKEN : { <STREAM : "stream"> }
-TOKEN : { <THROUGH : "through"> }
-TOKEN : { <STORE : "store"> }
-TOKEN : { <MAPREDUCE: "mapreduce">}
-TOKEN : { <SHIP: "ship"> }
-TOKEN : { <CACHE: "cache"> }
-TOKEN : { <INPUT: "input"> }
-TOKEN : { <OUTPUT: "output"> }
-TOKEN : { <ERROR: "stderr"> }
-TOKEN : { <STDIN: "stdin"> }
-TOKEN : { <STDOUT: "stdout"> }
-TOKEN : { <LIMIT: "limit"> }
-TOKEN : { <SAMPLE: "sample"> }
-TOKEN : { <LEFT: "left"> }
-TOKEN : { <RIGHT: "right"> }
-TOKEN : { <FULL: "full"> }
-
-TOKEN:
-{
- <#LETTER : ["a"-"z", "A"-"Z"] >
-| <#DIGIT : ["0"-"9"] >
-| <#SPECIALCHAR : ["_"] >
-| <#FSSPECIALCHAR: ["-", ":", "/"]>
-| <IDENTIFIER: ( <LETTER> )+ ( <DIGIT> | <LETTER> | <SPECIALCHAR> | "::")* >
-}
-// Define Numeric Constants
-TOKEN :
-{
-// < NUMBER: <INTEGER> | <LONGINTEGER> | <DOUBLENUMBER> | <FLOATNUMBER> >
- < #FLOATINGPOINT: <INTEGER> ( "." <INTEGER> )? | "." <INTEGER> >
-| < INTEGER: ( <DIGIT> )+ >
-| < LONGINTEGER: <INTEGER> (["l","L"])? >
-| < DOUBLENUMBER: <FLOATINGPOINT> ( ["e","E"] ([ "-","+"])? <FLOATINGPOINT> )?>
-| < FLOATNUMBER: <DOUBLENUMBER> (["f","F"])? >
-}
-
-TOKEN : { <QUOTEDSTRING : "'"
-( (~["'","\\","\n","\r"])
- | ("\\"
- ( ["n","t","b","r","f","\\","'"] )
- )
- | ("\\u"
- ["0"-"9","A"-"F","a"-"f"]
- ["0"-"9","A"-"F","a"-"f"]
- ["0"-"9","A"-"F","a"-"f"]
- ["0"-"9","A"-"F","a"-"f"]
- )
-)*
-"'"> }
-
-
-TOKEN:{ < QUOTED_MULTI_STRING :
-"'" ( (~["'","\\","\n","\r"])
- | ("\\" ["n","t","b","r","f","\\","'"] )
- | ("\\u" ["0"-"9","A"-"F","a"-"f"]["0"-"9","A"-"F","a"-"f"]
- ["0"-"9","A"-"F","a"-"f"]["0"-"9","A"-"F","a"-"f"])
- )*
- ["\n","\r"]
- ( (~["'","\\"] )
- | ("\\" ["n","t","b","r","f","\\","'"] )
- | ("\\u" ["0"-"9","A"-"F","a"-"f"]["0"-"9","A"-"F","a"-"f"]
- ["0"-"9","A"-"F","a"-"f"]["0"-"9","A"-"F","a"-"f"])
- )*
-"'">
-}
-
-
-TOKEN : { <EXECCOMMAND : "`" (~["`"])* "`"> }
-// Pig has special variables starting with $
-TOKEN : { <DOLLARVAR : "$" <INTEGER> > }
-
-// Parse is the Starting function.
-LogicalPlan Parse() :
-{
- LogicalOperator root = null;
- Token t1;
- Token t2;
- LogicalPlan lp = new LogicalPlan();
- log.trace("Entering Parse");
-}
-{
- (
- LOOKAHEAD(3)
- // For now don't allow A = B; kind of statements - this should
- // be fixed and allowed in the future
- (t1 = <IDENTIFIER> "=" t2 = <IDENTIFIER> [ <AS> "(" TupleSchema() ")" ] ";" {
- throw new ParseException(
- "Currently PIG does not support assigning an existing relation (" + t1.image + ") to another alias (" + t2.image + ")");})
-| LOOKAHEAD(2)
- (t1 = <IDENTIFIER> "=" root = Expr(lp) ";" {
- root.setAlias(t1.image);
- addAlias(t1.image, root);
- pigContext.setLastAlias(t1.image);
- })
-| (root = Expr(lp) ";")
-| (<SPLIT> root = SplitClause(lp) ";")
- )
- {
- if(null != root) {
- try {
- log.debug("Adding " + root.getAlias() + " " + root + " to the lookup table " + aliases);
-
- //Translate all the project(*) leaves in the plan to a sequence of projections
- ProjectStarTranslator translate = new ProjectStarTranslator(lp);
- translate.visit();
-
- addLogicalPlan(root, lp);
-
- log.debug("Root: " + root.getClass().getName() + " schema: " + root.getSchema());
- } catch(FrontendException fee) {
- ParseException pe = new ParseException(fee.getMessage());
- pe.initCause(fee);
- throw pe;
- }
- }
-
- ArrayList<LogicalOperator> roots = new ArrayList<LogicalOperator>(lp.getRoots().size());
- for(LogicalOperator op: lp.getRoots()) {
- roots.add(op);
- }
-
- Map<LogicalOperator, Boolean> rootProcessed = new HashMap<LogicalOperator, Boolean>();
- for(LogicalOperator op: roots) {
- //At this point we have a logical plan for the pig statement
- //In order to construct the entire logical plan we need to traverse
- //each root and get the logical plan it belongs to. From each of those
- //plans we need the predecessors of the root of the current logical plan
- //and so on. This is a computationally intensive operatton but should
- //be fine as its restricted to the parser
-
- LogicalPlan rootPlan = aliases.get(op);
- if(null != rootPlan) {
- attachPlan(lp, op, rootPlan, rootProcessed);
- rootProcessed.put(op, true);
- }
- }
-
- log.trace("Exiting Parse");
- return lp;
- }
-}
-
-LogicalOperator SplitClause(LogicalPlan lp):
-{
- LogicalOperator input;
- ExpressionOperator cond;
- Token alias;
- LOSplit splitOp;
- int index = 0;
- LogicalPlan condPlan;
- log.trace("Entering SplitClause");
-}
-{
- (
- input = NestedExpr(lp) <INTO>
- {
- splitOp = new LOSplit(lp, input.getOperatorKey(), new ArrayList<LogicalOperator>());
- lp.add(splitOp);
- log.debug("Adding operator " + splitOp.getClass().getName() + " to the logical plan");
- lp.connect(input, splitOp);
- log.debug("Connected alias: " + input.getAlias() + " operator " + input.getClass().getName() + " to operator " + splitOp.getClass().getName());
- }
- alias = <IDENTIFIER> <IF> cond = PCond(input.getSchema(), null, condPlan = new LogicalPlan(), input)
- {
- addSplitOutput(lp, splitOp, alias.image, condPlan, index);
- ++index;
- log.debug("Added splitoutput");
- }
- (
- "," alias = <IDENTIFIER> <IF> cond = PCond(input.getSchema(), null, condPlan = new LogicalPlan(), input)
- {
- addSplitOutput(lp, splitOp, alias.image, condPlan, index);
- ++index;
- log.debug("Added splitoutput");
- }
- )+
- )
- {log.trace("Exiting SplitClause"); return splitOp;}
-}
-
-LogicalOperator Expr(LogicalPlan lp) :
-{
- LogicalOperator op;
- Schema schema = null;
- log.trace("Entering Expr");
-}
-{
- (
- ( op = NestedExpr(lp) [ <AS> "(" schema = TupleSchema() ")" {Schema.setSchemaDefaultType(schema, DataType.BYTEARRAY); op.setSchema(schema);} ] )
-| op = BaseExpr(lp)
- )
- {log.trace("Exiting Expr"); return op;}
-}
-
-LogicalOperator NestedExpr(LogicalPlan lp) :
-{
- LogicalOperator op;
- ExpressionOperator eOp;
- Map<String, LogicalOperator> specs = null;
- log.trace("Entering NestedExpr");
-}
-{
- (
- (op = Alias(lp))
-| LOOKAHEAD(2) ( "(" op = NestedExpr(lp) ")" )
-| ( "(" op = BaseExpr(lp) ")" )
- )
- {log.trace("Exiting NestedExpr"); return op;}
-}
-
-// A keyword or an identifier
-
-Token IdentifierOrReserved() :
-{
- Token t1;
- log.trace("Entering IdentifierOrReserved");
-}
-{
- (
- (t1 = <FILTEROP> )
-| (t1 = <DEFINE> )
-| (t1 = <LOAD> )
-| (t1 =<FILTER> )
-| (t1 =<FOREACH> )
-| (t1 =<MATCHES> )
-| (t1 =<ORDER> )
-| (t1 =<ARRANGE> )
-| (t1 =<DISTINCT> )
-| (t1 =<COGROUP> )
-| (t1 =<JOIN> )
-| (t1 =<CROSS> )
-| (t1 =<UNION> )
-| (t1 =<SPLIT> )
-| (t1 =<INTO> )
-| (t1 =<IF> )
-| (t1 =<ALL> )
-| (t1 =<ANY> )
-| (t1 =<AS> )
-| (t1 =<BY> )
-| (t1 =<USING> )
-| (t1 =<INNER> )
-| (t1 =<OUTER> )
-| (t1 =<PARALLEL> )
-| (t1 =<PARTITION>)
-| (t1 =<GROUP> )
-| (t1 =<AND> )
-| (t1 =<OR> )
-| (t1 =<NOT> )
-| (t1 =<GENERATE> )
-| (t1 =<FLATTEN> )
-| (t1 =<EVAL> )
-| (t1 =<ASC> )
-| (t1 =<DESC> )
-| (t1 =<INT> )
-| (t1 =<LONG> )
-| (t1 =<FLOAT> )
-| (t1 =<DOUBLE> )
-| (t1 =<CHARARRAY> )
-| (t1 =<BYTEARRAY> )
-| (t1 =<BAG> )
-| (t1 =<TUPLE> )
-| (t1 =<MAP> )
-| (t1 =<IS> )
-| (t1 =<NULL> )
-| (t1 =<STREAM> )
-| (t1 =<THROUGH> )
-| (t1 =<STORE> )
-| (t1 =<MAPREDUCE>)
-| (t1 =<SHIP> )
-| (t1 =<CACHE> )
-| (t1 =<INPUT> )
-| (t1 =<OUTPUT> )
-| (t1 =<ERROR> )
-| (t1 =<STDIN> )
-| (t1 =<STDOUT> )
-| (t1 =<LIMIT> )
-| (t1 =<SAMPLE> )
-| (t1 =<LEFT>)
-| (t1 =<RIGHT>)
-| (t1 =<FULL>)
-| (t1 =<IDENTIFIER>)
-)
- {
- return t1;
- }
-}
-
-// A reference to an alias
-LogicalOperator Alias(LogicalPlan lp) :
-{
- Token t1;
- LogicalOperator op;
- log.trace("Entering Alias");
-}
-{
- t1 = <IDENTIFIER>
- {
- LogicalOperator aliasOp;
- String alias = t1.image;
-
- aliasOp = getOp(alias);
- if (aliasOp == null) {
- throw new ParseException("Unrecognized alias " + alias);
- }
- addAlias(alias, aliasOp);
- log.debug("Added " + alias + " to aliasOp");
-
- lp.add(aliasOp);
- log.debug("Added operator: " + aliasOp.getClass().getName() + " to the logical plan " + lp);
- log.trace("Exiting Alias");
- return aliasOp;
- }
-}
-
-LogicalOperator BaseExpr(LogicalPlan lp) :
-{
- LogicalOperator op;
- Schema schema;
- Token t1, t2;
- Schema.FieldSchema fs;
- log.trace("Entering BaseExpr");
- String partitioner = null;
-}
-{
- (
- (
- (<DEFINE> op = DefineClause(lp))
-| (<LOAD> op = LoadClause(lp)
- [ <AS>
- (
- LOOKAHEAD(2) "(" schema = TupleSchema() ")"
- {
- Schema.setSchemaDefaultType(schema, DataType.BYTEARRAY);
- op.setSchema(schema);
- log.debug("Load as schema" + schema);
- }
- | fs = AtomSchema()
- {
- schema = new Schema(fs);
- op.setSchema(schema);
- log.debug("Load as atomschema" + schema);
- }
- )
- ]
- )
-| ((<GROUP> | <COGROUP>) op = CogroupClause(lp))
-| (<FILTER> op = FilterClause(lp))
-| (<LIMIT> op = LimitClause(lp))
-| (<SAMPLE> op = SampleClause(lp))
-| (<ORDER> op = OrderClause(lp))
-| (<DISTINCT> op = NestedExpr(lp)
- ([<PARTITION> <BY> (partitioner = EvalClass(ClassType.PARTITIONER))])
- {
- LogicalOperator distinct = new LODistinct(lp, new OperatorKey(scope, getNextId()));
- lp.add(distinct);
- distinct.setCustomPartitioner(partitioner);
- log.debug("Added operator: " + distinct.getClass().getName() + " to the logical plan");
- lp.connect(op, distinct);
- log.debug("Connected alias: " + op.getAlias() + " operator " + op.getClass().getName() + " to operator " + distinct.getClass().getName());
- op = distinct;
- })
-| (<CROSS> op = CrossClause(lp))
-| (<JOIN> op = JoinClause(lp))
-| (<UNION> op = UnionClause(lp))
-| (<FOREACH> op = ForEachClause(lp))
-| (<STREAM> op = StreamClause(lp)
- [ <AS>
- (
- LOOKAHEAD(2) "(" schema = TupleSchema() ")"
- {
- Schema.setSchemaDefaultType(schema, DataType.BYTEARRAY);
- op.setSchema(schema);
- log.debug("Stream as schema()"+ schema);
- }
- | fs = AtomSchema()
- {
- schema = new Schema(fs);
- op.setSchema(schema);
- log.debug("Stream as atomschema()" + schema);
- }
- )
- ]
- )
-| (<STORE> op = StoreClause(lp))
-{
- String inputAlias = ((LOStore)op).getAlias();
- LogicalOperator input = mapAliasOp.get(inputAlias);
- if (input == null)
- throw new ParseException("Unable to find alias " + inputAlias);
-
- lp.add(input);
- lp.connect(input, op);
-}
-| (<MAPREDUCE> op = MapReduceClause(lp))
- )
- [<PARALLEL> t2=<INTEGER> {
- // In Local Mode we can only use one reducer
- if( this.pigContext.getExecType() == ExecType.LOCAL ) {
- op.setRequestedParallelism(1);
- } else {
- op.setRequestedParallelism(Integer.parseInt(t2.image));
- }
- } ]
- )
- {log.trace("Exiting BaseExpr"); return op;}
-}
-
-LogicalOperator LoadClause(LogicalPlan lp) :
-{
- Token t1, t2, t3;
- String filename;
- String funcName,funcArgs =null;
- FuncSpec funcSpec = null;
- String funcSpecAsString = null;
- LOLoad lo=null;
- String splitBy;
- boolean splittable = true;
- log.trace("Entering LoadClause");
-}
-{
- ( filename = FileName()
- (
- <USING> funcSpec = NonEvalFuncSpec(FunctionType.LOADFUNC)
- )?
- )
- {
- if (funcSpec == null){
- funcSpecAsString = PigStorage.class.getName();
- funcSpec = new FuncSpec(funcSpecAsString);
- log.debug("LoadClause: funcSpec = " + funcSpec);
- }
-
- Object obj = PigContext.instantiateFuncFromSpec(funcSpec);
- LoadFunc loFunc = (LoadFunc)obj;
- try {
-
- // If we converted the file name before, we return the old
- // result. This is not done for performance but to make sure
- // we return the same result for relative paths when
- // re-parsing the same script for execution.
- // For example if a script has
- // a = load ...
- // ...
- // cd x
- // ...
- // store c into 'foo'; => the abs. path for 'foo' would be '/user/<username>/x/foo'
- // ..
- // cd y
- // ..
- // store f into 'bar'=> the abs. path for 'bar' would be '/user/<username>/x/y/bar'
- // While re-parsing, the current working dir is already at /user/<username>/x/y
- // so translating 'foo' to its absolute path based on that dir would give incorrect
- // results - hence we store the translations into a map during the first parse for
- // use during the reparse.
- String absolutePath = fileNameMap.get(constructFileNameSignature(filename, funcSpec));
- if (absolutePath == null) {
- absolutePath = loFunc.relativeToAbsolutePath(filename, getCurrentDir(pigContext));
-
- if (absolutePath!=null) {
- setHdfsServers(absolutePath, pigContext);
- }
- fileNameMap.put(constructFileNameSignature(filename, funcSpec), absolutePath);
- }
- lo = new LOLoad(lp, new OperatorKey(scope, getNextId()), new FileSpec(absolutePath, funcSpec),
- ConfigurationUtil.toConfiguration(pigContext.getProperties()));
- } catch (IOException ioe) {
- // The autogenerated parser code only catches RuntimeException and
- // ParseException as special Exceptions. All others are caught as
- // Throwable and then re-thrown by casting to ERROR - this can result
- // in ClassCastException if it is due to the IOException here - so
- // wrap the IOException in an "Error" object and throw the Error here
- Error e = new Error(ioe);
- throw e;
- }
- lp.add(lo);
- log.debug("Added operator " + lo.getClass().getName() + " to the logical plan");
-
- log.trace("Exiting LoadClause");
- return lo;
- }
-}
-
-String StringList() :
-{
- StringBuilder sb = new StringBuilder();
- Token t;
-}
-{
- (
- (
- t = <QUOTEDSTRING> {sb.append(StringUtils.unescapeInputString(t.image));}
- ( "," t = <QUOTEDSTRING> {sb.append(",");sb.append(StringUtils.unescapeInputString(t.image));} )*
- )
- | {}
- )
- {log.debug("StringList: " + sb.toString()); return sb.toString();}
-}
-
-String FunctionArgs() :
-{
- StringBuilder sb = new StringBuilder();
- Token t;
-}
-{
-(
- (
- ( ( t = <QUOTED_MULTI_STRING> {sb.append(StringUtils.unescapeInputString(t.image));} )
- |
- ( t = <QUOTEDSTRING> {sb.append(StringUtils.unescapeInputString(t.image));})
- )
- ( ","
- (
- (t = <QUOTED_MULTI_STRING> {sb.append(",");sb.append(StringUtils.unescapeInputString(t.image));} )
- |
- (t = <QUOTEDSTRING> {sb.append(",");sb.append(StringUtils.unescapeInputString(t.image));} )
- )
- )*
- )
- | {}
-)
- {log.debug("FuncArgs: " + sb.toString()); return sb.toString();}
-}
-
-
-
-//B = native ('mymr.jar' [, 'other.jar' ...]) A store into 'storeLocation' using storeFunc load 'loadLocation' using loadFunc ['params'];
-LogicalOperator MapReduceClause(LogicalPlan lp) :
-{
- LogicalOperator loLoad;
- LogicalOperator loStore;
- LONative loNative;
- Schema schema;
- Schema.FieldSchema fs;
- String nativeMRJar;
- Token t1, t2;
- String[] paths;
- String[] params = null;
-}
-{
- t1 = <QUOTEDSTRING>
- {
- nativeMRJar = unquote(t1.image);
- pigContext.addJar(nativeMRJar);
- }
- [ "("
- paths = PathList()
- {
- for(String path: paths) {
- pigContext.addJar(path);
- }
- }
- ")"
- ]
-
- <STORE>
- loStore = StoreClause(lp)
- {
- ((LOStore)loStore).setTmpStore(true);
- String inputAlias = ((LOStore)loStore).getAlias();
- LogicalOperator input = mapAliasOp.get(inputAlias);
- if (input == null)
- throw new ParseException("Unable to find alias " + inputAlias);
- lp.add(input);
- lp.connect(input, loStore);
-
- }
- <LOAD>
- loLoad = LoadClause(lp)
- [ <AS>
- (
- LOOKAHEAD(2) "(" schema = TupleSchema() ")"
- {
- Schema.setSchemaDefaultType(schema, DataType.BYTEARRAY);
- loLoad.setSchema(schema);
- log.debug("Load as schema" + schema);
- }
- | fs = AtomSchema()
- {
- schema = new Schema(fs);
- loLoad.setSchema(schema);
- log.debug("Load as atomschema" + schema);
- }
- )
- ]
- [
- t2 = <EXECCOMMAND>
- {
- params = splitArgs(unquote(t2.image));
- }
- ]
- {
- loNative = new LONative(lp, new OperatorKey(scope, getNextId()),
- nativeMRJar, params);
-
- lp.add(loNative);
- lp.connect(loStore, loNative);
- lp.connect(loNative, loLoad);
- return loLoad;
- }
-}
-
-String FileName():
-{
- Token t;
-}
-{
- t = <QUOTEDSTRING>
- {log.debug("FileName: " + unquote(t.image)); return unquote(t.image);}
-}
-
-LogicalOperator FilterClause(LogicalPlan lp):
-{
- ExpressionOperator cond; LogicalOperator input;
- LogicalPlan conditionPlan = new LogicalPlan();
- log.trace("Entering FilterClause");
-}
-{
- (
- input = NestedExpr(lp) {log.debug("Filter input: " + input);}
- <BY> cond = PCond(input.getSchema(),null,conditionPlan,input)
- )
- {
- //LogicalOperator filter = new LOFilter(lp, new OperatorKey(scope, getNextId()), conditionPlan, input);
- LogicalOperator filter = new LOFilter(lp, new OperatorKey(scope, getNextId()), conditionPlan);
- addAlias(input.getAlias(), input);
- lp.add(filter);
- log.debug("Added operator " + filter.getClass().getName() + " to the logical plan");
-
- lp.connect(input, filter);
- log.debug("Connected alias " + input.getAlias() + " operator " + input.getClass().getName() + " to operator " + filter.getClass().getName() +" in the logical plan");
-
- log.trace("Exiting FilterClause");
- return filter;
- }
-}
-
-/*
- * "SAMPLE a x" is translated to "FILTER a BY RANDOM()<x"
- */
-LogicalOperator SampleClause(LogicalPlan lp) :
-{
- ExpressionOperator cond;
- LogicalOperator input;
- Token t;
- LogicalPlan conditionPlan = new LogicalPlan();
- log.trace("Entering SampleClause");
-}
-{
- (
- input = NestedExpr(lp) {log.debug("Filter input: " + input);}
- t = <DOUBLENUMBER>
- )
- {
- LOUserFunc rand = new LOUserFunc(conditionPlan, new OperatorKey(scope, getNextId()), new FuncSpec(RANDOM.class.getName()), DataType.DOUBLE);
- conditionPlan.add(rand);
-
- double l = Double.parseDouble(t.image);
- LOConst prob = new LOConst(conditionPlan, new OperatorKey(scope, getNextId()), l);
- conditionPlan.add(prob);
-
- cond = new LOLesserThanEqual(conditionPlan, new OperatorKey(scope, getNextId()));
- conditionPlan.add(cond);
- conditionPlan.connect(rand, cond);
- conditionPlan.connect(prob, cond);
-
- LogicalOperator filter = new LOFilter(lp, new OperatorKey(scope, getNextId()), conditionPlan);
- addAlias(input.getAlias(), input);
- lp.add(filter);
- log.debug("Added operator " + filter.getClass().getName() + " to the logical plan");
-
- lp.connect(input, filter);
- log.debug("Connected alias " + input.getAlias() + " operator " + input.getClass().getName() + " to operator " + filter.getClass().getName() +" in the logical plan");
-
- log.trace("Exiting SampleClause");
- return filter;
- }
-}
-
-LogicalOperator LimitClause(LogicalPlan lp):
-{
- LogicalOperator input;
- Token t;
- long l;
- log.trace("Entering LimitClause");
-}
-{
- (
- input = NestedExpr(lp) {log.debug("Limit input: " + input);}
- (
- t = <INTEGER> { l = Long.parseLong(t.image); }
- | t = <LONGINTEGER> { l = Long.parseLong(t.image.substring(0, t.image.length() - 1)); }
- )
- )
- {
- LogicalOperator limit = new LOLimit(lp, new OperatorKey(scope, getNextId()), l);
- addAlias(input.getAlias(), input);
- lp.add(limit);
- log.debug("Added operator " + limit.getClass().getName() + " to the logical plan");
-
- lp.connect(input, limit);
- log.debug("Connected alias " + input.getAlias() + " operator " + input.getClass().getName() + " to operator " + limit.getClass().getName() +" in the logical plan");
-
- log.trace("Exiting LimitClause");
- return limit;
- }
-}
-
-ExpressionOperator PCond(Schema over, Map<String, LogicalOperator> specs, LogicalPlan lp, LogicalOperator input) :
-{
- ExpressionOperator cond = null;
- log.trace("Entering PCond");
- log.debug("PCond Input: " + input);
-}
-{
- cond = POrCond(over,specs,lp, input)
- {log.trace("Exiting PCond"); return cond;}
-}
-
-ExpressionOperator POrCond(Schema over, Map<String, LogicalOperator> specs, LogicalPlan lp, LogicalOperator input) :
-{
- ExpressionOperator lhsCond, rhsCond;
- log.trace("Entering POrCond");
- log.debug("POrCond Input: " + input);
-}
-{
- (
- lhsCond = PAndCond(over,specs,lp,input)
- (
- <OR> rhsCond = PAndCond(over,specs,lp,input)
- {
- ExpressionOperator exprOp = new LOOr(lp, new OperatorKey(scope, getNextId()) );
- lp.add(exprOp);
- log.debug("POrCond: Added operator " + exprOp.getClass().getName() + " " + exprOp + " to logical plan " + lp);
- lp.connect(lhsCond, exprOp);
- log.debug("POrCond: Connected operator " + lhsCond.getClass().getName() + " " + lhsCond + " to " + exprOp + " logical plan " + lp);
- lp.connect(rhsCond, exprOp);
- log.debug("POrCond: Connected operator " + rhsCond.getClass().getName() + " " + rhsCond + " to " + exprOp + " logical plan " + lp);
- lhsCond = exprOp;
- }
- )*
- )
- {
- log.trace("Exiting POrCond");
- return lhsCond;
- }
-}
-
-ExpressionOperator PAndCond(Schema over, Map<String, LogicalOperator> specs, LogicalPlan lp, LogicalOperator input) :
-{
- ExpressionOperator lhsCond, rhsCond;
- log.trace("Entering PAndCond");
- log.debug("PAndCond Input: " + input);
-}
-{
- (
- lhsCond = PUnaryCond(over,specs,lp,input)
- (
- <AND> rhsCond = PUnaryCond(over,specs,lp,input)
- {
- ExpressionOperator exprOp = new LOAnd(lp, new OperatorKey(scope, getNextId()) );
- lp.add(exprOp);
- log.debug("PAndCond: Added operator " + exprOp.getClass().getName() + " " + exprOp + " to logical plan " + lp);
- lp.connect(lhsCond, exprOp);
- log.debug("PAndCond: Connected operator " + lhsCond.getClass().getName() + " " + lhsCond + " to " + exprOp + " logical plan " + lp);
- lp.connect(rhsCond, exprOp);
- log.debug("PAndCond: Connected operator " + rhsCond.getClass().getName() + " " + rhsCond + " to " + exprOp + " logical plan " + lp);
- lhsCond = exprOp;
- }
- )*
- )
- {
- log.trace("Exiting PAndCond");
- return lhsCond;
- }
-}
-
-ExpressionOperator PUnaryCond(Schema over, Map<String, LogicalOperator> specs, LogicalPlan lp, LogicalOperator input) :
-{
- ExpressionOperator cond = null;
- ExpressionOperator lhs, rhs;
- Token t1;
- List<ExpressionOperator> args;
- EvalFunc evalFunc = null;
- log.trace("Entering PUnaryCond");
-}
-{
- (
- LOOKAHEAD("(" PCond(over,specs,lp,input) ")")
- ("(" cond = PCond(over,specs,lp,input) ")")
-| LOOKAHEAD(InfixExpr(over,specs,lp,input) <FILTEROP>)
- (lhs=InfixExpr(over,specs,lp,input) t1=<FILTEROP> rhs=InfixExpr(over,specs,lp,input)
- {
- //the long switch case to instantiate the right operator
- //I have the long switch case from CompCond
- String op = t1.image;
- op = op.toLowerCase();
-
- char op1 = op.charAt(0);
- char op2 = op.length() >= 2 ? op.charAt(1) : '0';
- char op3 = op.length() == 3 ? op.charAt(2) : '0';
-
- switch (op1) {
- // numeric ops first
- case '=':
- if (op2 == '=') {
- cond = new LOEqual(lp, new OperatorKey(scope, getNextId()) );
- } else {
- throw new ParseException("Internal error: Invalid filter operator: " + op);
- }
- break;
- case '<':
- if (op2 == '=') {
- cond = new LOLesserThanEqual(lp, new OperatorKey(scope, getNextId()));
- } else {
- cond = new LOLesserThan(lp, new OperatorKey(scope, getNextId()));
- }
- break;
- case '>':
- if (op2 == '=') {
- cond = new LOGreaterThanEqual(lp, new OperatorKey(scope, getNextId()));
- } else {
- cond = new LOGreaterThan(lp, new OperatorKey(scope, getNextId()));
- }
- break;
- case '!':
- if (op2 == '=') {
- cond = new LONotEqual(lp, new OperatorKey(scope, getNextId()));
- } else {
- throw new ParseException("Internal error: Invalid filter operator: " + op);
- }
- break;
- // now string ops
- case 'e':
- if (op2 == 'q') {
- cond = new LOEqual(lp, new OperatorKey(scope, getNextId()));
- } else {
- throw new ParseException("Internal error: Invalid filter operator: " + op);
- }
- break;
- case 'l':
- if (op2 == 't' && op3 == 'e') {
- cond = new LOLesserThanEqual(lp, new OperatorKey(scope, getNextId()));
- } else {
- cond = new LOLesserThan(lp, new OperatorKey(scope, getNextId()));
- }
- break;
- case 'g':
- if (op2 == 't' && op3 == 'e') {
- cond = new LOGreaterThanEqual(lp, new OperatorKey(scope, getNextId()));
- } else {
- cond = new LOGreaterThan(lp, new OperatorKey(scope, getNextId()));
- }
- break;
- case 'n':
- if (op2 == 'e' && op3 == 'q') {
- cond = new LONotEqual(lp, new OperatorKey(scope, getNextId()));
- } else {
- throw new ParseException("Internal error: Invalid filter operator: " + op);
- }
- break;
- default:
- throw new ParseException("Internal error: Invalid filter operator: " + op);
- }
-
- lp.add(cond);
- log.debug("PUnaryCond: Added operator " + cond.getClass().getName() + " " + cond + " to logical plan " + lp);
- lp.connect(lhs, cond);
- log.debug("PUnaryCond: Connected operator " + lhs.getClass().getName() + " " + lhs+ " to " + cond + " logical plan " + lp);
- lp.connect(rhs, cond);
- log.debug("PUnaryCond: Connected operator " + rhs.getClass().getName() + " " + rhs+ " to " + cond + " logical plan " + lp);
- }
- )
-| LOOKAHEAD(InfixExpr(over,specs,lp,input) <MATCHES>)
- (lhs=InfixExpr(over,specs,lp,input) <MATCHES> t1=<QUOTEDSTRING>
- {
- LOConst rconst = new LOConst(lp, new OperatorKey(scope, getNextId()), unquote(t1.image));
- rconst.setType(DataType.CHARARRAY);
- cond = new LORegexp(lp, new OperatorKey(scope, getNextId()));
- lp.add(rconst);
- lp.add(cond);
- log.debug("PUnaryCond: Added operator " + cond.getClass().getName() + " " + cond + " to logical plan " + lp);
- lp.connect(lhs, cond);
- lp.connect(rconst, cond);
- log.debug("PUnaryCond: Connected operator " + cond.getClass().getName() + " " + cond + " to " + lhs + " logical plan " + lp);
- }
- )
-| LOOKAHEAD(EvalFuncSpec(over, specs, lp, input)) cond = EvalFuncSpec(over,specs,lp, input, FunctionType.EVALFUNC)
-| cond = PNullCond(over,specs,lp,input)
-| cond = PNotCond(over,specs,lp,input)
-
- )
- {log.trace("Exiting PUnaryCond"); return cond;}
-}
-
-ExpressionOperator PNotCond(Schema over, Map<String, LogicalOperator> specs,LogicalPlan lp,LogicalOperator input) :
-{
- ExpressionOperator c1;
- log.trace("Entering PNotCond");
-}
-{
- <NOT> c1=PUnaryCond(over,specs,lp,input)
- {
- ExpressionOperator eOp = new LONot(lp, new OperatorKey(scope, getNextId()));
- lp.add(eOp);
- log.debug("PNotCond: Added operator " + eOp.getClass().getName() + " " + eOp + " to logical plan " + lp);
- lp.connect(c1, eOp);
- log.debug("PNotCond: Connected operator " + eOp.getClass().getName() + " " + eOp + " to " + c1 + " logical plan " + lp);
- log.trace("Exiting PNotCond");
- return eOp;
- }
-}
-
-ExpressionOperator PNullCond(Schema over, Map<String, LogicalOperator> specs,LogicalPlan lp,LogicalOperator input) :
-{
- ExpressionOperator c1;
- boolean not = false;
- log.trace("Entering PNullCond");
-}
-{
- c1=InfixExpr(over,specs,lp,input) <IS> [<NOT> {not = true;}] <NULL>
- {
- ExpressionOperator eOp = new LOIsNull(lp, new OperatorKey(scope, getNextId()));
- lp.add(eOp);
- log.debug("PNullCond: Added operator " + eOp.getClass().getName() + " " + eOp + " to logical plan " + lp);
- lp.connect(c1, eOp);
- log.debug("PNullCond: Connected operator " + eOp.getClass().getName() + " " + eOp + " to " + c1 + " logical plan " + lp);
- ExpressionOperator notNull = null;
- if (not) {
- notNull = new LONot(lp, new OperatorKey(scope, getNextId()));
- lp.add(notNull);
- log.debug("PNullCond: Added operator " + notNull.getClass().getName() + " " + notNull + " to logical plan " + lp);
- lp.connect(eOp, notNull);
- log.debug("PNullCond: Connected operator " + notNull.getClass().getName() + " " + notNull + " to " + eOp + " logical plan " + lp);
- eOp = notNull;
- }
- log.trace("Exiting PNullCond");
- return eOp;
- }
-}
-
-LogicalOperator CogroupClause(LogicalPlan lp) :
-{
- CogroupInput gi;
- ArrayList<CogroupInput> gis = new ArrayList<CogroupInput>();
- LogicalOperator cogroup = null;
- log.trace("Entering CoGroupClause");
- Token t;
- String partitioner = null;
-}
-{
- (gi = GroupItem(lp) { gis.add(gi); }
- ("," gi = GroupItem(lp) { gis.add(gi); })*
- ([ <USING> (
- (t = < QUOTEDSTRING> { cogroup = parseUsingForGroupBy(unquote (t.image), gis, lp); })
- |("\"collected\"") {
- log.info("[WARN] Use of double-quoted string to specify hint is deprecated. Please specify hint in single quotes.");
- cogroup = parseUsingForGroupBy("collected", gis, lp);
- }
- |("\"regular\"") {
- log.info("[WARN] Use of double-quoted string to specify hint is deprecated. Please specify hint in single quotes.");
- cogroup = parseUsingForGroupBy("regular", gis, lp);
- }
- |("\"merge\"") {
- log.info("[WARN] Use of double-quoted string to specify hint is deprecated. Please specify hint in single quotes.");
- cogroup = parseUsingForGroupBy("merge", gis, lp);
- }
- )])
- ([<PARTITION> <BY> (partitioner = EvalClass(ClassType.PARTITIONER))])
- )
- {
- if (cogroup != null) {
- cogroup.setCustomPartitioner(partitioner);
- log.trace("Exiting CoGroupClause");
- return cogroup;
- }
- cogroup = parseCogroup(gis, lp, LOCogroup.GROUPTYPE.REGULAR);
- if(cogroup != null) {
- cogroup.setCustomPartitioner(partitioner);
- }
- log.trace("Exiting CoGroupClause");
- return cogroup;
- }
-
-}
-
-CogroupInput JoinItem(LogicalPlan lp) :
-{
- LogicalOperator cgOp;
- boolean isInner = true;
- ArrayList<LogicalPlan> listPlans = new ArrayList<LogicalPlan>();
- LogicalPlan groupByPlan;
- ArrayList<Boolean> flattenList = new ArrayList<Boolean>();
- ArrayList<Schema> userDefinedSchemaList = new ArrayList<Schema>();
- log.trace("Entering JoinItem");
- log.debug("LogicalPlan: " + lp);
-}
-{
- (
- cgOp = NestedExpr(lp)
- (
- ( <BY>
- (
- LOOKAHEAD ( "(" FlattenedGenerateItemList(cgOp.getSchema(), null, groupByPlan, cgOp) ")" )
- ( "(" FlattenedGenerateItem(cgOp.getSchema(), null, groupByPlan = new LogicalPlan(), cgOp, flattenList, userDefinedSchemaList)
- {listPlans.add(groupByPlan);}
- (
- "," FlattenedGenerateItem(cgOp.getSchema(), null, groupByPlan = new LogicalPlan(), cgOp, flattenList, userDefinedSchemaList)
- {listPlans.add(groupByPlan);}
- )*
- ")"
- )
- | (
- FlattenedGenerateItem(cgOp.getSchema(), null, groupByPlan = new LogicalPlan(), cgOp, flattenList, userDefinedSchemaList)
- {listPlans.add(groupByPlan);}
- )
- )
- )
- )
- )
- {
- CogroupInput cogroupInput = new CogroupInput();
-
- cogroupInput.plans = listPlans;
- cogroupInput.op = cgOp;
- cogroupInput.isInner = isInner;
-
- log.trace("Exiting GroupItem");
- return cogroupInput;
- }
-}
-
-
-CogroupInput GroupItem(LogicalPlan lp) :
-{
- ExpressionOperator es;
- LogicalOperator cgOp;
- boolean isInner = false;
- ArrayList<LogicalPlan> listPlans = new ArrayList<LogicalPlan>();
- LogicalPlan groupByPlan;
- ArrayList<Boolean> flattenList = new ArrayList<Boolean>();
- ArrayList<Schema> userDefinedSchemaList = new ArrayList<Schema>();
- log.trace("Entering GroupItem");
- log.debug("LogicalPlan: " + lp);
-}
-{
- (
- cgOp = NestedExpr(lp)
- (
- ( <BY>
- (
- LOOKAHEAD ( "(" FlattenedGenerateItemList(cgOp.getSchema(), null, groupByPlan, cgOp) ")" )
- ( "(" es = FlattenedGenerateItem(cgOp.getSchema(), null, groupByPlan = new LogicalPlan(), cgOp, flattenList, userDefinedSchemaList)
- {listPlans.add(groupByPlan);}
- (
- "," es = FlattenedGenerateItem(cgOp.getSchema(), null, groupByPlan = new LogicalPlan(), cgOp, flattenList, userDefinedSchemaList)
- {listPlans.add(groupByPlan);}
- )*
- ")"
- )
- | (
- es = FlattenedGenerateItem(cgOp.getSchema(), null, groupByPlan = new LogicalPlan(), cgOp, flattenList, userDefinedSchemaList)
- {listPlans.add(groupByPlan);}
- )
- )
- )
- | <ALL> {
- es = new LOConst(groupByPlan = new LogicalPlan(), new OperatorKey(scope, getNextId()), "all");
- es.setType(DataType.CHARARRAY);
- groupByPlan.add(es);
- log.debug("GroupItem: Added operator " + es.getClass().getName() + " " + es + " to logical plan " + groupByPlan);
- listPlans.add(groupByPlan);
- }
- | <ANY> {
- es = new LOUserFunc(groupByPlan = new LogicalPlan(), new OperatorKey(scope, getNextId()), new FuncSpec(GFAny.class.getName()), DataType.INTEGER);
- groupByPlan.add(es);
- log.debug("GroupItem: Added operator " + es.getClass().getName() + " " + es + " to logical plan " + groupByPlan);
- listPlans.add(groupByPlan);
- }
- )
- [<INNER> {isInner = true;} | <OUTER>]
- )
- {
- CogroupInput cogroupInput = new CogroupInput();
-
- cogroupInput.plans = listPlans;
- cogroupInput.op = cgOp;
- cogroupInput.isInner = isInner;
-
- log.trace("Exiting GroupItem");
- return cogroupInput;
- }
-}
-
-LogicalOperator OrderClause(LogicalPlan lp) :
-{
- LogicalOperator op;
- ExpressionOperator col;
- boolean star = false;
- ArrayList<ExpressionOperator> sortCols = new ArrayList<ExpressionOperator>();
- ArrayList<LogicalPlan> sortColPlans = new ArrayList<LogicalPlan>();
- ArrayList<Boolean> ascOrder = new ArrayList<Boolean>();
- boolean asc = true;
- String funcName = null;
- Token t1;
- FuncSpec funcSpec = null;
- log.trace("Entering OrderClause");
-}
-{
- (
- op = NestedExpr(lp) <BY>
- (
- (
- (
- col = SortCol(op.getSchema(), lp, op, ascOrder, sortColPlans)
- ("," col = SortCol(op.getSchema(), lp, op, ascOrder, sortColPlans))*
- )
- )
- | <STAR> {star = true;} [<ASC> | <DESC> {asc = false;}]
- {
- LogicalPlan sortColPlan = new LogicalPlan();
- LOProject projectStar = new LOProject(sortColPlan, new OperatorKey(scope, getNextId()), op, -1);
- projectStar.setStar(true);
- sortColPlan.add(projectStar);
- sortColPlans.add(sortColPlan);
- log.debug("Set star to true");
- if(asc) {
- ascOrder.add(true);
- } else {
- ascOrder.add(false);
- }
- }
- )
- (
- <USING> funcSpec = NonEvalFuncSpec(FunctionType.COMPARISONFUNC)
- )?
-
- )
- {
- LOSort sort = new LOSort(lp, new OperatorKey(scope, getNextId()), sortColPlans, ascOrder,
- funcSpec );
- sort.setStar(star);
- sort.setLimit(-1);
- lp.add(sort);
- log.debug("Added operator " + sort.getClass().getName() + " to the logical plan");
-
- lp.connect(op, sort);
- log.debug("Connecting sort input alias " + op.getAlias() + " operator " + op.getClass().getName() + " to operator " + sort.getClass().getName() + " in the logical plan");
-
- log.trace("Exiting OrderClause");
- return sort;
- }
-}
-
-
-ExpressionOperator SortCol(Schema over, LogicalPlan lp, LogicalOperator op, ArrayList<Boolean> ascOrder, ArrayList<LogicalPlan> sortColPlans) :
-{
- ExpressionOperator col;
- int colNum;
- boolean asc = true;
- LogicalPlan sortColPlan = new LogicalPlan();
- log.trace("Entering SortCol");}
-{
- (
- //col = ColOrSpec(op.getSchema(), null, sortColPlan, op) [<ASC> | <DESC> {asc = false;}]
- colNum = ColNameOrNum(op.getSchema()) [<ASC> | <DESC> {asc = false;}]
- {
- if(asc) {
- log.debug("Ascending");
- ascOrder.add(true);
- } else {
- log.debug("Descending");
- ascOrder.add(false);
- }
- col = new LOProject(sortColPlan, new OperatorKey(scope, getNextId()), op, colNum);
- sortColPlan.add(col);
- sortColPlans.add(sortColPlan);
- }
- |
- (
- //"(" col = ColOrSpec(op.getSchema(), null, sortColPlan, op) ")" [<ASC> | <DESC> {asc = false;}]
- "(" colNum = ColNameOrNum(op.getSchema()) ")" [<ASC> | <DESC> {asc = false;}]
- {
- if(asc) {
- log.debug("Ascending");
- ascOrder.add(true);
- } else {
- log.debug("Descending");
- ascOrder.add(false);
- }
- col = new LOProject(sortColPlan, new OperatorKey(scope, getNextId()), op, colNum);
- sortColPlan.add(col);
- sortColPlans.add(sortColPlan);
- }
- )
- )
- {
- log.trace("Exiting SortCol");
- return col;
- }
-}
-
-int ColNameOrNum(Schema over) :
-{
- Token t;
- log.trace("Entering ColNameOrNum");
-}
-{
- (
- t = <DOLLARVAR> {return undollar(t.image);}
- |
- t = <IDENTIFIER>
- { int i;
-
- try {
- if ( over == null || (i = over.getPosition(t.image)) == -1) {
- throw new ParseException("Invalid alias: " + t.image + " in " + over);
- }
- } catch (FrontendException fee) {
- ParseException pe = new ParseException(fee.getMessage());
- pe.initCause(fee);
- throw pe;
- }
-
- log.trace("Exiting ColNameOrNum");
- return i;
- }
- |
- t = <GROUP>
- {
- try {
- if ( over == null || (i = over.getPosition(t.image)) == -1) {
- throw new ParseException("Invalid alias: " + t.image + " in " + over);
- }
- } catch (FrontendException fee) {
- ParseException pe = new ParseException(fee.getMessage());
- pe.initCause(fee);
- throw pe;
- }
-
- log.trace("Exiting ColNameOrNum");
- return i;
- }
- )
-}
-
-LogicalOperator CrossClause(LogicalPlan lp) :
-{
- LogicalOperator op;
- ArrayList<LogicalOperator> inputs = new ArrayList<LogicalOperator>();
- String partitioner = null;
- log.trace("Entering CrossClause");
-}
-{
- (
- op = NestedExpr(lp) { inputs.add(op); }
- ("," op = NestedExpr(lp) { inputs.add(op); })+
- )
- ([<PARTITION> <BY> (partitioner = EvalClass(ClassType.PARTITIONER))])
- {
- LogicalOperator cross = new LOCross(lp, new OperatorKey(scope, getNextId()));
- lp.add(cross);
- cross.setCustomPartitioner(partitioner);
- log.debug("Added operator " + cross.getClass().getName() + " to the logical plan");
-
- for (LogicalOperator lop: inputs) {
- lp.connect(lop, cross);
- log.debug("Connected operator " + lop.getClass().getName() + " " + lop + " to " + cross + " logical plan " + lp);
- }
- log.debug("Connected cross inputs to the cross operator");
-
- log.trace("Exiting CrossClause");
- return cross;
- }
-}
-
-LogicalOperator JoinClause(LogicalPlan lp) :
-{
- CogroupInput gi;
- ArrayList<CogroupInput> gis = new ArrayList<CogroupInput>();
- log.trace("Entering JoinClause");
- log.debug("LogicalPlan: " + lp);
- LogicalOperator joinOp = null;
- boolean isLeftOuter = false;
- boolean isRightOuter = false;
- boolean isFullOuter = false;
- boolean isOuter = false;
- Token t;
- String partitioner = null;
-}
-{
- (gi = JoinItem(lp) { gis.add(gi); }
- [
- (<LEFT> [<OUTER>] { isLeftOuter = true;})
- |
- (<RIGHT> [<OUTER>] {isRightOuter = true;})
- |
- (<FULL> [<OUTER>] {isFullOuter = true;})
- ]
- ("," gi = JoinItem(lp) { gis.add(gi); })+
- {
- // in the case of outer joins, only two
- // inputs are allowed
- isOuter = (isLeftOuter || isRightOuter || isFullOuter);
- if(isOuter && gis.size() > 2) {
- throw new ParseException("(left|right|full) outer joins are only supported for two inputs");
- }
-
- // we have exactly two inputs
-
- // the semantics of "outer"
- // for join are different from cogroup
- // cogroup a by $0 inner, b by $0 outer means keep
- // all keys from a and for cases where there is no match
- // from b have an empty bag for b. For keys in b which
- // do not match in a, there will be no output records.
- // Whereas with join,
- // join a by $0 inner, b by $0 outer implies right outer
- // join which has the exact opposite semantics - for
- // all keys in b which do not have a match in b we need to
- // output null for fields in a. For keys in a which do not
- // match in b, no record should be output. Since we will be
- // using the same underlying implementation for outer join
- // as cogroup we should achieve join semantics by setting the
- // isinner flag accordingly
- if (isLeftOuter) {
- gis.get(1).isInner = false;
- } else if (isRightOuter) {
- gis.get(0).isInner = false;
- } else if (isFullOuter) {
- gis.get(0).isInner = false;
- gis.get(1).isInner = false;
- }
-
- }
- // For all types of join we create LOJoin and mark what type of join it is.
- ([<USING> (
- (t = <QUOTEDSTRING> { joinOp = parseUsingForJoin(unquote(t.image), gis, lp, isFullOuter, isRightOuter, isOuter);})
- | ("\"repl\"" | "\"replicated\"") {
- log.info("[WARN] Use of double-quotes for specifying join algorithm is deprecated. Please use single quotes.");
- joinOp = parseUsingForJoin("replicated", gis, lp, isFullOuter, isRightOuter, isOuter);
- }
- | ("\"skewed\"") {
- log.info("[WARN] Use of double-quotes for specifying join algorithm is deprecated. Please use single quotes.");
- joinOp = parseUsingForJoin("skewed", gis, lp, isFullOuter, isRightOuter, isOuter);
- }
- | ("\"merge\"") {
- log.info("[WARN] Use of double-quotes for specifying join algorithm is deprecated. Please use single quotes.");
- joinOp = parseUsingForJoin("merge", gis, lp, isFullOuter, isRightOuter, isOuter);
- }
- | ("\"hash\"" | "\"default\"") {
- log.info("[WARN] Use of double-quotes for specifying join algorithm is deprecated. Please use single quotes.");
- joinOp = parseUsingForJoin("hash", gis, lp, isFullOuter, isRightOuter, isOuter);
- }
- )]))
- ([<PARTITION> <BY> (partitioner = EvalClass(ClassType.PARTITIONER))])
-
- {
- log.trace("Exiting JoinClause");
- if (joinOp == null) {
- joinOp = parseJoin(gis, lp, LOJoin.JOINTYPE.HASH);
- }
- if(partitioner != null) {
- if(((LOJoin)joinOp).getJoinType() == LOJoin.JOINTYPE.SKEWED) {
- throw new ParseException("Custom Partitioner is not supported for skewed join");
- }
- else {
- joinOp.setCustomPartitioner(partitioner);
- }
- }
- return joinOp;
- }
-}
-
-LogicalOperator UnionClause(LogicalPlan lp) :
-{
- LogicalOperator op;
- boolean isOnSchema = false;
- ArrayList<LogicalOperator> inputs = new ArrayList<LogicalOperator>();
- log.trace("Entering UnionClause");
-}
-{
- [<ONSCHEMA> {isOnSchema = true;}]
- (op = NestedExpr(lp){inputs.add(op);}
- ("," op = NestedExpr(lp) {inputs.add(op);})+)
- {
- try{// this try-catch block will catch all exceptions and convert them
- // to ParseException. Otherwise, if any exception than ParseException
- // is thrown , the generated parse code tries to cast
- //the exception to Error, resulting in a misleading error message
- LOUnion union = new LOUnion(lp, new OperatorKey(scope, getNextId()));
- union.setOnSchema(isOnSchema);
- lp.add(union);
-
- log.debug("Added operator " + union.getClass().getName() + " to the logical plan");
-
- for (LogicalOperator lop: inputs) {
- lp.connect(lop, union);
- log.debug("Connected union input operator " +
- lop.getClass().getName() + " to operator " +
- lop.getClass().getName() + " in the logical plan");
- }
-
- log.trace("Exiting UnionClause");
- return union;
- }
- catch(Exception e){
- ParseException pe = new ParseException();
- pe.initCause(e);
- throw pe;
- }
- }
-
-}
-
- LogicalOperator ForEachClause(LogicalPlan lp) :
-{
- ArrayList<LogicalOperator> specList = new ArrayList<LogicalOperator>();
- LogicalOperator input, foreach;
- LogicalPlan foreachPlan = new LogicalPlan();
- ArrayList<LogicalPlan> foreachPlans = new ArrayList<LogicalPlan>();
- log.trace("Entering ForEachClause");
-}
-{
- (
- input = NestedExpr(lp)
- specList = NestedBlock(input.getSchema(), specList, foreachPlan, input)
- )
- {
- LOGenerate generate = (LOGenerate)specList.get(specList.size() - 1);
- List<LogicalPlan> generatePlans = generate.getGeneratePlans();
- List<Boolean> flattenList = generate.getFlatten();
- List<Schema> userDefinedSchemaList = generate.getUserDefinedSchema();
- /*
- Generate's nested plans will be translated to foreach's nested plan
- If generate contains an expression that does not require generate's
- inputs then it should be made part of foreach without the generate
- For the remaining expressions, the entire DAG till generate has to be
- duplicated and then the nested plan attached to a new generate
- */
-
- for (int planCtr = 0; planCtr < generatePlans.size(); ++planCtr) {
- LogicalPlan generatePlan = generatePlans.get(planCtr);
- List<LogicalOperator> planRoots = new ArrayList<LogicalOperator>(generatePlan.getRoots());
- boolean needGenerateInput = false;
- boolean needForEachInput = false;
- MultiMap<LogicalOperator, LogicalOperator> mapProjectInputs = null;
- Map<LogicalOperator, Boolean> rootProcessed = new HashMap<LogicalOperator, Boolean>();
- for(LogicalOperator root: planRoots) {
- if(root instanceof ExpressionOperator && !(root instanceof LOProject)) {
- if(checkGenerateInput(root)) {
- needGenerateInput = true;
- attachPlan(generatePlan, root, foreachPlan, rootProcessed);
- rootProcessed.put(root, true);
- }
- }
- }
-
- planRoots = generatePlan.getRoots();
- needGenerateInput = false;
- needForEachInput = false;
-
- for(LogicalOperator root: planRoots) {
- if(root instanceof LOProject) {
- LOProject project = (LOProject)root;
- LogicalOperator projectInput = project.getExpression();
- if(checkGenerateInput(projectInput) || !(projectInput.equals(input))) {
- needGenerateInput = true;
- if(null == mapProjectInputs) {
- mapProjectInputs = new MultiMap<LogicalOperator, LogicalOperator>();
- }
- mapProjectInputs.put(root, projectInput);
- } else {
- needForEachInput = true;
- }
- }
- }
- if(needGenerateInput) {
- /*
- Duplicate the logical plan until the generate but excluding generate
- Create a new generate operator with the plan being iterated on
- Attach the generate as the leaf and add the duplicated plan to
- the list of foreach plans
- */
-
- for(LogicalOperator project: mapProjectInputs.keySet()) {
- for(LogicalOperator projectInput: mapProjectInputs.get(project)) {
- generatePlan.add(projectInput);
- generatePlan.connect(projectInput, project);
- attachPlan(generatePlan, projectInput, foreachPlan, rootProcessed);
- rootProcessed.put(projectInput, true);
- }
- }
- }
- LogicalPlanCloner lpCloner = new LogicalPlanCloner(generatePlan);
- LogicalPlan generatePlanClone;
- try {
- generatePlanClone = lpCloner.getClonedPlan();
- } catch (CloneNotSupportedException cnse) {
- ParseException pe = new ParseException("Not able to clone foreach plan");
- pe.initCause(cnse);
- throw pe;
- }
- RemoveRedundantOperators removeOperators = new RemoveRedundantOperators(generatePlanClone);
- try {
- removeOperators.visit();
- } catch (VisitorException ve) {
- ParseException pe = new ParseException("Could not remove redundant operators in foreach plan.");
- pe.initCause(ve);
- throw pe;
- }
- foreachPlans.add(generatePlanClone);
- }
-
- resetGenerateState();
- foreach = new LOForEach(lp, new OperatorKey(scope, getNextId()), (ArrayList)foreachPlans, (ArrayList)flattenList, (ArrayList) userDefinedSchemaList);
- try {
- lp.add(foreach);
- log.debug("Added operator " + foreach.getClass().getName() + " to the logical plan");
-
- lp.connect(input, foreach);
- log.debug("Connected alias " + input.getAlias() + " operator " + input.getClass().getName() + " object " + input + " to operator " + foreach.getClass().getName() + " in the logical plan");
- } catch (PlanException planException) {
- ParseException pe = new ParseException(planException.getMessage());
- pe.initCause(planException);
- throw pe;
- }
-
- log.trace("Exiting ForEachClause");
- return foreach;
- }
-}
-
-LogicalOperator StreamClause(LogicalPlan lp):
-{
- LogicalOperator input;
- StreamingCommand command;
-}
-{
- input = NestedExpr(lp)
-
- <THROUGH> command = Command()
- {
- LOStream loStream = new LOStream(lp, new OperatorKey(scope, getNextId()), input,
- pigContext.createExecutableManager(), command);
- //addAlias(input.getAlias(), input);
- lp.add(loStream);
- lp.connect(input, loStream);
- return loStream;
- }
-}
-
-StreamingCommand Command(): {Token t; StreamingCommand command;}
-{
- t = <EXECCOMMAND>
- {
- String[] argv = splitArgs(unquote(t.image));
- command = new StreamingCommand(pigContext, argv);
- checkAutoShipSpecs(command, argv);
- return command;
- }
- |
- t = <IDENTIFIER>
- {
- command = pigContext.getCommandForAlias(t.image);
- if (command == null) {
- throw new ParseException("Undefined command-alias: " + t.image +
- " used as stream operator");
- }
-
- return command;
- }
-}
-
-LogicalOperator DefineClause(LogicalPlan lp) : {Token t; Token cmd; String functionName, functionArgs;}
-{
- t = <IDENTIFIER>
- (
- (
- cmd = <EXECCOMMAND>
- {
- StreamingCommand command =
- new StreamingCommand(pigContext, splitArgs(unquote(cmd.image)));
- String[] paths;
- StreamingCommand.HandleSpec[] handleSpecs;
- }
- (
- <SHIP> "(" paths = PathList() ")"
- {
- if (paths.length == 0) {
- command.setShipFiles(false);
- } else {
- for (String path : paths) {
- try {
- command.addPathToShip(path);
- } catch(IOException e) {
- ParseException pe = new ParseException(e.getMessage());
- pe.initCause(e);
- throw pe;
- }
- }
- }
- }
- |
- <CACHE> "(" paths = PathList() ")"
- {
- for (String path : paths) {
- try {
- command.addPathToCache(path);
- } catch(IOException e) {
- ParseException pe = new ParseException(e.getMessage());
- pe.initCause(e);
- throw pe;
- }
- }
- }
- |
- <INPUT> "(" InputOutputSpec(command, StreamingCommand.Handle.INPUT) ")"
- |
[... 1717 lines stripped ...]