You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by xu...@apache.org on 2011/05/07 02:15:44 UTC
svn commit: r1100420 [3/19] - in /pig/branches/branch-0.9: ./ src/
src/org/apache/pig/
src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/
src/org/apache/pig/impl/logicalLayer/
src/org/apache/pig/impl/logicalLayer/optimizer/ src/org/apache...
Modified: pig/branches/branch-0.9/src/org/apache/pig/impl/logicalLayer/BinaryExpressionOperator.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.9/src/org/apache/pig/impl/logicalLayer/BinaryExpressionOperator.java?rev=1100420&r1=1100419&r2=1100420&view=diff
==============================================================================
--- pig/branches/branch-0.9/src/org/apache/pig/impl/logicalLayer/BinaryExpressionOperator.java (original)
+++ pig/branches/branch-0.9/src/org/apache/pig/impl/logicalLayer/BinaryExpressionOperator.java Sat May 7 00:15:40 2011
@@ -1,97 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.pig.impl.logicalLayer;
-
-import java.util.List;
-
-import org.apache.pig.impl.plan.OperatorKey;
-import org.apache.pig.impl.plan.VisitorException;
-//import org.apache.commons.logging.Log;
-//import org.apache.commons.logging.LogFactory;
-
-/**
- * This abstract class represents the logical Binary Expression Operator
- * The binary operator has two operands and an operator. The format of
- * the expression is lhs_operand operator rhs_operand. The operator name
- * is assumed and can be inferred by the class name
- */
-
-public abstract class BinaryExpressionOperator extends ExpressionOperator {
- private static final long serialVersionUID = 2L;
- // private static Log log = LogFactory.getLog(BinaryExpressionOperator.class);
-
- /**
- * @param plan
- * Logical plan this operator is a part of.
- * @param k
- * Operator key to assign to this node.
- * @param rp
- * degree of requested parallelism with which to execute this
- * node.
- */
- public BinaryExpressionOperator(LogicalPlan plan, OperatorKey k, int rp) {
- super(plan, k, rp);
- }
-
- /**
- * @param plan
- * Logical plan this operator is a part of.
- * @param k
- * Operator key to assign to this node.
- */
- public BinaryExpressionOperator(LogicalPlan plan, OperatorKey k) {
- super(plan, k);
- }
-
- public ExpressionOperator getLhsOperand() {
- List<LogicalOperator>preds = getPlan().getPredecessors(this);
- if(preds == null)
- return null;
- return (ExpressionOperator)preds.get(0);
- }
-
- public ExpressionOperator getRhsOperand() {
- List<LogicalOperator>preds = getPlan().getPredecessors(this);
- if(preds == null)
- return null;
- return (ExpressionOperator)preds.get(1);
- }
-
- @Override
- public void visit(LOVisitor v) throws VisitorException {
- v.visit(this);
- }
-
- @Override
- public boolean supportsMultipleInputs() {
- return true;
- }
-
- /**
- * @see org.apache.pig.impl.logicalLayer.ExpressionOperator#clone()
- * Do not use the clone method directly. Operators are cloned when logical plans
- * are cloned using {@link LogicalPlanCloner}
- */
- @Override
- protected Object clone() throws CloneNotSupportedException {
- BinaryExpressionOperator binExOpClone = (BinaryExpressionOperator)super.clone();
- return binExOpClone;
- }
-
-}
Modified: pig/branches/branch-0.9/src/org/apache/pig/impl/logicalLayer/CastFinder.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.9/src/org/apache/pig/impl/logicalLayer/CastFinder.java?rev=1100420&r1=1100419&r2=1100420&view=diff
==============================================================================
--- pig/branches/branch-0.9/src/org/apache/pig/impl/logicalLayer/CastFinder.java (original)
+++ pig/branches/branch-0.9/src/org/apache/pig/impl/logicalLayer/CastFinder.java Sat May 7 00:15:40 2011
@@ -1,81 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.pig.impl.logicalLayer;
-
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-import java.util.ArrayList;
-
-import org.apache.pig.FuncSpec;
-import org.apache.pig.impl.plan.DepthFirstWalker;
-import org.apache.pig.impl.plan.VisitorException;
-
-/**
- * A visitor to track the casts in a plan.
- */
-public class CastFinder extends LOVisitor {
-
- List<LOCast> mCastList = new ArrayList<LOCast>();
-
- /**
- *
- * @param plan
- * logical plan to query the presence of UDFs
- */
- public CastFinder(LogicalPlan plan) {
- super(plan, new DepthFirstWalker<LogicalOperator, LogicalPlan>(plan));
- }
-
- /*
- * (non-Javadoc)
- *
- * @see
- * org.apache.pig.impl.logicalLayer.LOVisitor#visit(org.apache.pig.impl.
- * logicalLayer.LOUserFunc)
- */
- @Override
- protected void visit(LOCast cast) throws VisitorException {
- mCastList.add(cast);
- }
-
- /**
- *
- * @return list of casts found in the plan
- */
- public List<LOCast> getCastList() {
- return mCastList;
- }
-
- /**
- *
- * @return set of casts found in the plan
- */
- public Set<LOCast> getCastSet() {
- return new HashSet<LOCast>(mCastList);
- }
-
- /**
- *
- * @return true if the plan contained any Casts; false otherwise
- */
- public boolean foundAnyCast() {
- return (mCastList.size() == 0 ? false : true);
- }
-
-}
\ No newline at end of file
Modified: pig/branches/branch-0.9/src/org/apache/pig/impl/logicalLayer/ColumnPruner.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.9/src/org/apache/pig/impl/logicalLayer/ColumnPruner.java?rev=1100420&r1=1100419&r2=1100420&view=diff
==============================================================================
--- pig/branches/branch-0.9/src/org/apache/pig/impl/logicalLayer/ColumnPruner.java (original)
+++ pig/branches/branch-0.9/src/org/apache/pig/impl/logicalLayer/ColumnPruner.java Sat May 7 00:15:40 2011
@@ -1,233 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.pig.impl.logicalLayer;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.pig.PigException;
-import org.apache.pig.impl.plan.DependencyOrderWalker;
-import org.apache.pig.impl.plan.RequiredFields;
-import org.apache.pig.impl.plan.VisitorException;
-import org.apache.pig.impl.plan.optimizer.OptimizerException;
-import org.apache.pig.impl.util.Pair;
-import org.apache.pig.impl.logicalLayer.RelationalOperator;
-
-public class ColumnPruner extends LOVisitor {
- private Map<LogicalOperator, List<Pair<Integer,Integer>>> prunedColumnsMap;
- LogicalPlan plan;
-
- public ColumnPruner(LogicalPlan plan) {
- super(plan, new DependencyOrderWalker<LogicalOperator, LogicalPlan>(plan));
- prunedColumnsMap = new HashMap<LogicalOperator, List<Pair<Integer,Integer>>>();
- this.plan = plan;
- }
-
- public void addPruneMap(LogicalOperator op, List<Pair<Integer,Integer>> prunedColumns) {
- prunedColumnsMap.put(op, prunedColumns);
- }
-
- public boolean isEmpty() {
- return prunedColumnsMap.isEmpty();
- }
-
- protected void prune(RelationalOperator lOp) throws VisitorException {
- List<LogicalOperator> predecessors = plan.getPredecessors(lOp);
- if (predecessors==null)
- {
- int errCode = 2187;
- throw new VisitorException("Cannot get predessors", errCode, PigException.BUG);
- }
- List<Pair<Integer, Integer>> columnsPruned = new ArrayList<Pair<Integer, Integer>>();
- List<Pair<Integer, Integer>> columnsToPrune = new ArrayList<Pair<Integer, Integer>>();
- for (int i=0;i<predecessors.size();i++)
- {
- RelationalOperator predecessor = (RelationalOperator)predecessors.get(i);
- if (prunedColumnsMap.containsKey(predecessor))
- {
- List<Pair<Integer, Integer>> predColumnsToPrune = prunedColumnsMap.get(predecessor);
- if (predColumnsToPrune!=null)
- {
- for (int j=0;j<predColumnsToPrune.size();j++)
- {
- predColumnsToPrune.get(j).first = i;
- }
- columnsPruned.addAll(predColumnsToPrune);
- }
- }
- }
-
- try {
- if (lOp.getSchema()==null)
- {
- int errCode = 2189;
- throw new VisitorException("Expect schema", errCode, PigException.BUG);
- }
-
- // For every input column, check if it is pruned
- for (int i=0;i<lOp.getSchema().size();i++)
- {
- List<RequiredFields> relevantFieldsList = lOp.getRelevantInputs(0, i);
-
- // Check if this output do not need any inputs, if so, it is a constant field.
- // Since we never prune a constant field, so we continue without pruning
- boolean needNoInputs = true;
-
- if (relevantFieldsList==null)
- needNoInputs = true;
- else
- {
- for (RequiredFields relevantFields: relevantFieldsList)
- {
- if (relevantFields!=null && !relevantFields.needNoFields())
- needNoInputs = false;
- }
- }
-
- if (needNoInputs)
- continue;
-
- boolean columnPruned = false;
-
- // For LOCogroup, one output can be pruned if all its relevant input are pruned except for "key" fields
- if (lOp instanceof LOCogroup)
- {
- List<RequiredFields> requiredFieldsList = lOp.getRequiredFields();
- for (Pair<Integer, Integer> column : columnsPruned)
- {
- if (column.first == i-1) // Saw at least one input pruned
- {
- if (requiredFieldsList.get(i-1).getFields().contains(column))
- {
- columnPruned = true;
- break;
- }
- }
- }
- }
- else
- {
- // If we see any of the relevant field of this column get pruned,
- // then we prune this column for this operator
- for (RequiredFields relevantFields: relevantFieldsList)
- {
- if (relevantFields == null)
- continue;
- if (relevantFields.getNeedAllFields())
- break;
- for (Pair<Integer, Integer> relevantField: relevantFields.getFields())
- {
- // If any of the input column is pruned, prune this output column
- if (columnsPruned.contains(relevantField))
- {
- columnPruned = true;
- break;
- }
- }
- }
- }
- if (columnPruned)
- columnsToPrune.add(new Pair<Integer, Integer>(0, i));
- }
-
- LogicalOperator currentOp = lOp;
-
- // If it is LOCogroup, insert foreach to mimic pruning, because we have no way to prune
- // LOCogroup output only by pruning the inputs
- if (columnsPruned.size()!=0 && lOp instanceof LOCogroup)
- {
- List<Integer> columnsToProject = new ArrayList<Integer>();
- for (int i=0;i<=predecessors.size();i++) {
- if (!columnsToPrune.contains(new Pair<Integer, Integer>(0, i)))
- columnsToProject.add(i);
- }
- currentOp = lOp.insertPlainForEachAfter(columnsToProject);
- }
-
- if (!columnsPruned.isEmpty()&&lOp.pruneColumns(columnsPruned)) {
- prunedColumnsMap.put(currentOp, columnsToPrune);
- }
- } catch (FrontendException e) {
- int errCode = 2188;
- throw new VisitorException("Cannot prune columns for "+lOp, errCode, PigException.BUG, e);
- }
- }
-
- protected void visit(LOCogroup cogroup) throws VisitorException {
- prune(cogroup);
- }
-
- protected void visit(LOCross cross) throws VisitorException {
- prune(cross);
- }
-
- protected void visit(LODistinct distinct) throws VisitorException {
- prune(distinct);
- }
-
- protected void visit(LOFilter filter) throws VisitorException {
- prune(filter);
- }
-
- protected void visit(LOForEach foreach) throws VisitorException {
- // The only case we should skip foreach is when this is the foreach
- // inserted after LOLoad to mimic pruning, then we put the prunedColumns entry
- // for that foreach, and we do not need to further visit this foreach here
- if (!prunedColumnsMap.containsKey(foreach))
- prune(foreach);
- }
-
- protected void visit(LOJoin join) throws VisitorException {
- prune(join);
- }
-
- protected void visit(LOLimit limit) throws VisitorException {
- prune(limit);
- }
-
- protected void visit(LOSort sort) throws VisitorException {
- prune(sort);
- }
-
- protected void visit(LOSplit split) throws VisitorException {
- prune(split);
- }
-
- protected void visit(LOSplitOutput splitoutput) throws VisitorException {
- prune(splitoutput);
- }
-
- protected void visit(LOStore store) throws VisitorException {
- return;
- }
-
- protected void visit(LOStream stream) throws VisitorException {
- return;
- }
-
- protected void visit(LOUnion union) throws VisitorException {
- prune(union);
- return;
- }
-
- protected void visit(LOLoad lOp) throws VisitorException {
- return;
- }
-}
Modified: pig/branches/branch-0.9/src/org/apache/pig/impl/logicalLayer/DotLOPrinter.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.9/src/org/apache/pig/impl/logicalLayer/DotLOPrinter.java?rev=1100420&r1=1100419&r2=1100420&view=diff
==============================================================================
--- pig/branches/branch-0.9/src/org/apache/pig/impl/logicalLayer/DotLOPrinter.java (original)
+++ pig/branches/branch-0.9/src/org/apache/pig/impl/logicalLayer/DotLOPrinter.java Sat May 7 00:15:40 2011
@@ -1,121 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.pig.impl.logicalLayer;
-
-import java.io.PrintStream;
-import java.util.List;
-import java.util.LinkedList;
-import java.util.Collection;
-import org.apache.pig.impl.util.MultiMap;
-import java.util.HashSet;
-import java.util.Set;
-
-import org.apache.pig.impl.plan.DotPlanDumper;
-import org.apache.pig.impl.plan.Operator;
-
-/**
- * This class can print a logical plan in the DOT format. It uses
- * clusters to illustrate nesting. If "verbose" is off, it will skip
- * any nesting.
- */
-public class DotLOPrinter extends DotPlanDumper<LogicalOperator, LogicalPlan,
- LogicalOperator, LogicalPlan> {
-
- public DotLOPrinter(LogicalPlan plan, PrintStream ps) {
- this(plan, ps, false, new HashSet<Operator>(), new HashSet<Operator>(),
- new HashSet<Operator>());
- }
-
- private DotLOPrinter(LogicalPlan plan, PrintStream ps, boolean isSubGraph,
- Set<Operator> subgraphs,
- Set<Operator> multiInSubgraphs,
- Set<Operator> multiOutSubgraphs) {
- super(plan, ps, isSubGraph, subgraphs,
- multiInSubgraphs, multiOutSubgraphs);
- }
-
- @Override
- protected DotPlanDumper makeDumper(LogicalPlan plan, PrintStream ps) {
- return new DotLOPrinter(plan, ps, true, mSubgraphs,
- mMultiInputSubgraphs,
- mMultiOutputSubgraphs);
- }
-
- @Override
- protected String getName(LogicalOperator op) {
- String info = (op.name().split("\\d+-\\d+"))[0];
- if (op instanceof LOProject) {
- LOProject pr = (LOProject)op;
- info += pr.isStar()?" [*]": pr.getProjection();
- }
- return info;
- }
-
- @Override
- protected String[] getAttributes(LogicalOperator op) {
- if (op instanceof LOStore || op instanceof LOLoad) {
- String[] attributes = new String[3];
- attributes[0] = "label=\""+getName(op).replace(":",",\\n")+"\"";
- attributes[1] = "style=\"filled\"";
- attributes[2] = "fillcolor=\"gray\"";
- return attributes;
- }
- else {
- return super.getAttributes(op);
- }
- }
-
- @Override
- protected MultiMap<LogicalOperator, LogicalPlan>
- getMultiInputNestedPlans(LogicalOperator op) {
-
- if(op instanceof LOCogroup){
- return ((LOCogroup)op).getGroupByPlans();
- }
- else if(op instanceof LOJoin){
- return ((LOJoin)op).getJoinPlans();
- }
- else if(op instanceof LOJoin){
- return ((LOJoin)op).getJoinPlans();
- }
- return new MultiMap<LogicalOperator, LogicalPlan>();
- }
-
- @Override
- protected Collection<LogicalPlan> getNestedPlans(LogicalOperator op) {
- Collection<LogicalPlan> plans = new LinkedList<LogicalPlan>();
-
- if(op instanceof LOFilter){
- plans.add(((LOFilter)op).getComparisonPlan());
- }
- else if(op instanceof LOForEach){
- plans.addAll(((LOForEach)op).getForEachPlans());
- }
- else if(op instanceof LOGenerate){
- plans.addAll(((LOGenerate)op).getGeneratePlans());
- }
- else if(op instanceof LOSort){
- plans.addAll(((LOSort)op).getSortColPlans());
- }
- else if(op instanceof LOSplitOutput){
- plans.add(((LOSplitOutput)op).getConditionPlan());
- }
-
- return plans;
- }
-}
Modified: pig/branches/branch-0.9/src/org/apache/pig/impl/logicalLayer/ExpressionOperator.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.9/src/org/apache/pig/impl/logicalLayer/ExpressionOperator.java?rev=1100420&r1=1100419&r2=1100420&view=diff
==============================================================================
--- pig/branches/branch-0.9/src/org/apache/pig/impl/logicalLayer/ExpressionOperator.java (original)
+++ pig/branches/branch-0.9/src/org/apache/pig/impl/logicalLayer/ExpressionOperator.java Sat May 7 00:15:40 2011
@@ -1,149 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.pig.impl.logicalLayer;
-
-import java.util.List;
-import java.util.ArrayList;
-
-import org.apache.pig.impl.logicalLayer.schema.Schema;
-import org.apache.pig.data.DataType;
-import org.apache.pig.impl.logicalLayer.parser.ParseException;
-import org.apache.pig.impl.plan.OperatorKey;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-
-public abstract class ExpressionOperator extends LogicalOperator {
-
- private static final long serialVersionUID = 2L;
- private static Log log = LogFactory.getLog(ExpressionOperator.class);
- protected boolean mIsFieldSchemaComputed = false;
- protected Schema.FieldSchema mFieldSchema = null;
-
- /**
- * @param plan
- * Logical plan this operator is a part of.
- * @param k
- * Operator key to assign to this node.
- * @param rp
- * degree of requested parallelism with which to execute this
- * node.
- */
- public ExpressionOperator(LogicalPlan plan, OperatorKey k, int rp) {
- super(plan, k, rp);
- }
-
- /**
- * @param plan
- * Logical plan this operator is a part of.
- * @param k
- * Operator key to assign to this node.
- */
- public ExpressionOperator(LogicalPlan plan, OperatorKey k) {
- super(plan, k);
- }
-
-
- @Override
- public boolean supportsMultipleOutputs() {
- return true;
- }
-
- @Override
- public Schema getSchema() throws FrontendException{
- return mSchema;
- }
-
- // Default implementation just get type info from mType
- public Schema.FieldSchema getFieldSchema() throws FrontendException {
- Schema.FieldSchema fs = new Schema.FieldSchema(null, mType) ;
- return fs ;
- }
-
- /**
- * Set the output schema for this operator. If a schema already exists, an
- * attempt will be made to reconcile it with this new schema.
- *
- * @param fs
- * FieldSchema to set.
- * @throws FrontendException
- * if there is already a schema and the existing schema cannot
- * be reconciled with this new schema.
- */
- public void setFieldSchema(Schema.FieldSchema fs) throws FrontendException {
- mFieldSchema = fs;
- setAlias(fs.alias);
- setType(fs.type);
- mIsFieldSchemaComputed = true;
- }
-
- /**
- * Unset the field schema as if it had not been calculated. This is used
- * by anyone who reorganizes the tree and needs to have schemas
- * recalculated.
- */
- public void unsetFieldSchema() {
- mIsFieldSchemaComputed = false;
- mFieldSchema = null;
- }
-
- public Schema.FieldSchema regenerateFieldSchema() throws FrontendException {
- unsetFieldSchema();
- return getFieldSchema();
- }
-
- void setFieldSchemaComputed(boolean b) {
- mIsFieldSchemaComputed = b;
- }
-
- boolean getFieldSchemaComputed() {
- return mIsFieldSchemaComputed;
- }
-
- @Override
- public byte getType() {
- // Called to make sure we've constructed the field schema before trying
- // to read it.
- try {
- getFieldSchema();
- } catch (FrontendException fe) {
- return DataType.UNKNOWN;
- }
-
- if (mFieldSchema != null){
- return mFieldSchema.type ;
- }
- else {
- return DataType.UNKNOWN ;
- }
- }
-
- /**
- * @see org.apache.pig.impl.logicalLayer.LogicalOperator#clone()
- * Do not use the clone method directly. Operators are cloned when logical plans
- * are cloned using {@link LogicalPlanCloner}
- */
- @Override
- protected Object clone() throws CloneNotSupportedException {
- ExpressionOperator exOpClone = (ExpressionOperator)super.clone();
- if(mFieldSchema != null)
- exOpClone.mFieldSchema = this.mFieldSchema.clone();
- return exOpClone;
- }
-
-}
-
Modified: pig/branches/branch-0.9/src/org/apache/pig/impl/logicalLayer/LOAdd.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.9/src/org/apache/pig/impl/logicalLayer/LOAdd.java?rev=1100420&r1=1100419&r2=1100420&view=diff
==============================================================================
--- pig/branches/branch-0.9/src/org/apache/pig/impl/logicalLayer/LOAdd.java (original)
+++ pig/branches/branch-0.9/src/org/apache/pig/impl/logicalLayer/LOAdd.java Sat May 7 00:15:40 2011
@@ -1,71 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.pig.impl.logicalLayer;
-
-import org.apache.pig.impl.plan.OperatorKey;
-import org.apache.pig.impl.plan.PlanVisitor;
-import org.apache.pig.impl.plan.VisitorException;
-import org.apache.pig.impl.logicalLayer.schema.Schema;
-import org.apache.pig.data.DataType;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-
-public class LOAdd extends BinaryExpressionOperator {
-
- private static final long serialVersionUID = 2L;
- private static Log log = LogFactory.getLog(LOAdd.class);
-
- /**
- *
- * @param plan
- * Logical plan this operator is a part of.
- * @param k
- * Operator key to assign to this node.
- */
- public LOAdd(LogicalPlan plan, OperatorKey k) {
- super(plan, k);
- }
-
- @Override
- public Schema getSchema() {
- return mSchema;
- }
-
- @Override
- public Schema.FieldSchema getFieldSchema() throws FrontendException {
- if(!mIsFieldSchemaComputed) {
- mFieldSchema = new Schema.FieldSchema(null, DataType.mergeType(getLhsOperand().getType(), getRhsOperand().getType()));
- mFieldSchema.setParent(getLhsOperand().getFieldSchema().canonicalName, getLhsOperand());
- mFieldSchema.setParent(getRhsOperand().getFieldSchema().canonicalName, getRhsOperand());
- mIsFieldSchemaComputed = true;
- }
- return mFieldSchema;
- }
-
-
- @Override
- public void visit(LOVisitor v) throws VisitorException {
- v.visit(this);
- }
-
- @Override
- public String name() {
- return "Add " + mKey.scope + "-" + mKey.id;
- }
-}
Modified: pig/branches/branch-0.9/src/org/apache/pig/impl/logicalLayer/LOAnd.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.9/src/org/apache/pig/impl/logicalLayer/LOAnd.java?rev=1100420&r1=1100419&r2=1100420&view=diff
==============================================================================
--- pig/branches/branch-0.9/src/org/apache/pig/impl/logicalLayer/LOAnd.java (original)
+++ pig/branches/branch-0.9/src/org/apache/pig/impl/logicalLayer/LOAnd.java Sat May 7 00:15:40 2011
@@ -1,70 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.pig.impl.logicalLayer;
-
-import org.apache.pig.impl.plan.OperatorKey;
-import org.apache.pig.impl.plan.PlanVisitor;
-import org.apache.pig.impl.plan.VisitorException;
-import org.apache.pig.impl.logicalLayer.schema.Schema;
-import org.apache.pig.data.DataType;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-
-public class LOAnd extends BinaryExpressionOperator {
-
- private static final long serialVersionUID = 2L;
- private static Log log = LogFactory.getLog(LOAnd.class);
-
- /**
- *
- * @param plan
- * Logical plan this operator is a part of.
- * @param k
- * Operator key to assign to this node.
- */
- public LOAnd(LogicalPlan plan, OperatorKey k) {
- super(plan, k);
- }
-
- @Override
- public Schema getSchema() {
- return mSchema;
- }
-
- @Override
- public Schema.FieldSchema getFieldSchema() throws FrontendException {
- if(!mIsFieldSchemaComputed) {
- mFieldSchema = new Schema.FieldSchema(null, DataType.BOOLEAN);
- mFieldSchema.setParent(getLhsOperand().getFieldSchema().canonicalName, getLhsOperand());
- mFieldSchema.setParent(getRhsOperand().getFieldSchema().canonicalName, getRhsOperand());
- mIsFieldSchemaComputed = true;
- }
- return mFieldSchema;
- }
-
- @Override
- public void visit(LOVisitor v) throws VisitorException {
- v.visit(this);
- }
-
- @Override
- public String name() {
- return "And " + mKey.scope + "-" + mKey.id;
- }
-}
Modified: pig/branches/branch-0.9/src/org/apache/pig/impl/logicalLayer/LOBinCond.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.9/src/org/apache/pig/impl/logicalLayer/LOBinCond.java?rev=1100420&r1=1100419&r2=1100420&view=diff
==============================================================================
--- pig/branches/branch-0.9/src/org/apache/pig/impl/logicalLayer/LOBinCond.java (original)
+++ pig/branches/branch-0.9/src/org/apache/pig/impl/logicalLayer/LOBinCond.java Sat May 7 00:15:40 2011
@@ -1,118 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.pig.impl.logicalLayer;
-
-
-import java.util.List;
-
-import org.apache.pig.impl.logicalLayer.schema.Schema;
-import org.apache.pig.impl.plan.OperatorKey;
-import org.apache.pig.impl.plan.PlanVisitor;
-import org.apache.pig.impl.plan.VisitorException;
-
-public class LOBinCond extends ExpressionOperator {
-
- // BinCond has a conditional expression and two nested queries.
- // If the conditional expression evaluates to true the first nested query
- // is executed else the second nested query is executed
-
- private static final long serialVersionUID = 2L;
-
- /**
- *
- * @param plan
- * Logical plan this operator is a part of.
- * @param k
- * Operator key to assign to this node.
- */
- public LOBinCond(LogicalPlan plan, OperatorKey k) {
- super(plan, k);
- }// End Constructor LOBinCond
-
- public ExpressionOperator getCond() {
- List<LogicalOperator>preds = getPlan().getPredecessors(this);
- if(preds == null)
- return null;
- return (ExpressionOperator)preds.get(0);
- }
-
- public ExpressionOperator getLhsOp() {
- List<LogicalOperator>preds = getPlan().getPredecessors(this);
- if(preds == null)
- return null;
- return (ExpressionOperator)preds.get(1);
- }
-
- public ExpressionOperator getRhsOp() {
- List<LogicalOperator>preds = getPlan().getPredecessors(this);
- if(preds == null)
- return null;
- return (ExpressionOperator)preds.get(2);
- }
-
-
- @Override
- public void visit(LOVisitor v) throws VisitorException {
- v.visit(this);
- }
-
- @Override
- public Schema getSchema() throws FrontendException {
- return mSchema;
- }
-
- @Override
- public Schema.FieldSchema getFieldSchema() throws FrontendException {
- //We need a check of LHS and RHS schemas
- //The type checker perform this task
- if (!mIsFieldSchemaComputed) {
- try {
- mFieldSchema = getLhsOp().getFieldSchema();
- mIsFieldSchemaComputed = true;
- } catch (FrontendException fee) {
- mFieldSchema = null;
- mIsFieldSchemaComputed = false;
- throw fee;
- }
- }
- return mFieldSchema;
- }
-
- @Override
- public String name() {
- return "BinCond " + mKey.scope + "-" + mKey.id;
- }
-
- @Override
- public boolean supportsMultipleInputs() {
- return true;
- }
-
- /**
- * @see org.apache.pig.impl.logicalLayer.ExpressionOperator#clone()
- * Do not use the clone method directly. Operators are cloned when logical plans
- * are cloned using {@link LogicalPlanCloner}
- */
- @Override
- protected Object clone() throws CloneNotSupportedException {
- LOBinCond clone = (LOBinCond)super.clone();
- return clone;
- }
-
-}
Modified: pig/branches/branch-0.9/src/org/apache/pig/impl/logicalLayer/LOCast.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.9/src/org/apache/pig/impl/logicalLayer/LOCast.java?rev=1100420&r1=1100419&r2=1100420&view=diff
==============================================================================
--- pig/branches/branch-0.9/src/org/apache/pig/impl/logicalLayer/LOCast.java (original)
+++ pig/branches/branch-0.9/src/org/apache/pig/impl/logicalLayer/LOCast.java Sat May 7 00:15:40 2011
@@ -1,130 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.pig.impl.logicalLayer;
-
-import java.util.List;
-
-import org.apache.pig.FuncSpec;
-import org.apache.pig.LoadFunc;
-import org.apache.pig.impl.plan.OperatorKey;
-import org.apache.pig.impl.plan.PlanVisitor;
-import org.apache.pig.impl.plan.VisitorException;
-import org.apache.pig.impl.logicalLayer.schema.Schema;
-import org.apache.pig.impl.logicalLayer.schema.Schema.FieldSchema;
-import org.apache.pig.data.DataType;
-
-public class LOCast extends ExpressionOperator {
-
- // Cast has an expression that has to be converted to a specified type
-
- private static final long serialVersionUID = 2L;
- private FuncSpec mLoadFuncSpec = null;
- // store field schema representing the schema
- // in user specified casts -this is so that if
- // field schema is unset and then getFieldSchema is called we still
- // rebuild the fieldschema correctly as specified by the user in the script
- private FieldSchema userSpecifiedFieldSchema;
-
- /**
- *
- * @param plan
- * Logical plan this operator is a part of.
- * @param k
- * Operator key to assign to this node.
- * @param type
- * the type to which the expression is cast
- */
- public LOCast(LogicalPlan plan, OperatorKey k, byte type) {
- super(plan, k);
- mType = type;
- }// End Constructor LOCast
-
- public ExpressionOperator getExpression() {
- List<LogicalOperator>preds = getPlan().getPredecessors(this);
- if(preds == null)
- return null;
- return (ExpressionOperator)preds.get(0);
- }
-
- @Override
- public void visit(LOVisitor v) throws VisitorException {
- v.visit(this);
- }
-
- @Override
- public Schema getSchema() {
- return mSchema;
- }
-
-
- @Override
- public void setFieldSchema(FieldSchema fs) throws FrontendException {
- super.setFieldSchema(fs);
- userSpecifiedFieldSchema = new Schema.FieldSchema(fs);
- }
-
- @Override
- public Schema.FieldSchema getFieldSchema() throws FrontendException {
- if(!mIsFieldSchemaComputed) {
- if(userSpecifiedFieldSchema != null) {
- mFieldSchema = new FieldSchema(userSpecifiedFieldSchema);
- } else {
- mFieldSchema = new Schema.FieldSchema(null, mType);
- }
- Schema.FieldSchema parFs = getExpression().getFieldSchema();
- String canonicalName = (parFs != null ? parFs.canonicalName : null);
- mFieldSchema.setParent(canonicalName, getExpression());
- mIsFieldSchemaComputed = true;
- }
- return mFieldSchema;
- }
-
- @Override
- public String name() {
- return "Cast " + mKey.scope + "-" + mKey.id;
- }
-
- @Override
- public boolean supportsMultipleInputs() {
- return false;
- }
-
- public FuncSpec getLoadFuncSpec() {
- return mLoadFuncSpec;
- }
-
- public void setLoadFuncSpec(FuncSpec loadFuncSpec) {
- mLoadFuncSpec = loadFuncSpec;
- }
-
- /**
- * @see org.apache.pig.impl.logicalLayer.ExpressionOperator#clone()
- * Do not use the clone method directly. Operators are cloned when logical plans
- * are cloned using {@link LogicalPlanCloner}
- */
- @Override
- protected Object clone() throws CloneNotSupportedException {
- LOCast clone = (LOCast)super.clone();
- if(mLoadFuncSpec != null) {
- clone.mLoadFuncSpec = mLoadFuncSpec.clone();
- }
- return clone;
- }
-
-}
Modified: pig/branches/branch-0.9/src/org/apache/pig/impl/logicalLayer/LOCogroup.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.9/src/org/apache/pig/impl/logicalLayer/LOCogroup.java?rev=1100420&r1=1100419&r2=1100420&view=diff
==============================================================================
--- pig/branches/branch-0.9/src/org/apache/pig/impl/logicalLayer/LOCogroup.java (original)
+++ pig/branches/branch-0.9/src/org/apache/pig/impl/logicalLayer/LOCogroup.java Sat May 7 00:15:40 2011
@@ -1,853 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.pig.impl.logicalLayer;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.HashSet;
-import java.util.HashMap;
-import java.util.Iterator;
-
-import org.apache.pig.PigException;
-import org.apache.pig.data.DataType;
-import org.apache.pig.impl.plan.Operator;
-import org.apache.pig.impl.plan.OperatorKey;
-import org.apache.pig.impl.plan.PlanException;
-import org.apache.pig.impl.plan.ProjectionMap;
-import org.apache.pig.impl.plan.RequiredFields;
-import org.apache.pig.impl.plan.VisitorException;
-import org.apache.pig.impl.logicalLayer.schema.Schema;
-import org.apache.pig.impl.logicalLayer.optimizer.SchemaRemover;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.pig.impl.util.MultiMap;
-import org.apache.pig.impl.util.Pair;
-
-public class LOCogroup extends RelationalOperator {
- private static final long serialVersionUID = 2L;
-
- /**
- * Enum for the type of group
- */
- public static enum GROUPTYPE {
- REGULAR, // Regular (co)group
- COLLECTED, // Collected group
- MERGE // Map-side CoGroup on sorted data
- };
-
- /**
- * Cogroup contains a list of logical operators corresponding to the
- * relational operators and a list of generates for each relational
- * operator. Each generate operator in turn contains a list of expressions
- * for the columns that are projected
- */
- private boolean[] mIsInner;
- private static Log log = LogFactory.getLog(LOCogroup.class);
- private MultiMap<LogicalOperator, LogicalPlan> mGroupByPlans;
- private GROUPTYPE mGroupType;
-
- /**
- * static constant to refer to the option of selecting a group type
- */
- public final static Integer OPTION_GROUPTYPE = 1;
-
- /**
- *
- * @param plan
- * LogicalPlan this operator is a part of.
- * @param k
- * OperatorKey for this operator
- * @param groupByPlans
- * the group by columns
- * @param isInner
- * indicates whether the cogroup is inner for each relation
- */
- public LOCogroup(
- LogicalPlan plan,
- OperatorKey k,
- MultiMap<LogicalOperator, LogicalPlan> groupByPlans,
- boolean[] isInner) {
- this(plan, k, groupByPlans, GROUPTYPE.REGULAR, isInner);
- }
-
- /**
- *
- * @param plan
- * LogicalPlan this operator is a part of.
- * @param k
- * OperatorKey for this operator
- * @param groupByPlans
- * the group by columns
- * @param type
- * the type of this group
- * @param isInner
- * indicates whether the cogroup is inner for each relation
- */
- public LOCogroup(
- LogicalPlan plan,
- OperatorKey k,
- MultiMap<LogicalOperator, LogicalPlan> groupByPlans,
- GROUPTYPE type,
- boolean[] isInner) {
- super(plan, k);
- mGroupByPlans = groupByPlans;
- if (isInner != null) {
- mIsInner = Arrays.copyOf(isInner, isInner.length);
- }
- mGroupType = type;
- }
-
- public List<LogicalOperator> getInputs() {
- return mPlan.getPredecessors(this);
- }
-
- public MultiMap<LogicalOperator, LogicalPlan> getGroupByPlans() {
- return mGroupByPlans;
- }
-
- public void setGroupByPlans(MultiMap<LogicalOperator, LogicalPlan> groupByPlans) {
- mGroupByPlans = groupByPlans;
- }
-
- public boolean[] getInner() {
- return mIsInner;
- }
-
- public void setInner(boolean[] inner) {
- mIsInner = inner;
- }
-
- public GROUPTYPE getGroupType() {
- return mGroupType;
- }
-
- @Override
- public String name() {
- return getAliasString() + "CoGroup " + mKey.scope + "-" + mKey.id;
- }
-
- @Override
- public boolean supportsMultipleInputs() {
- return true;
- }
-
- @Override
- public Schema getSchema() throws FrontendException {
- List<LogicalOperator> inputs = mPlan.getPredecessors(this);
- /*
- * Dumping my understanding of how the schema of a Group/CoGroup will
- * look. The first field of the resulting tuple will have the alias
- * 'group'. The schema for this field is a union of the group by columns
- * for each input. The subsequent fields in the output tuple will have
- * the alias of the input as the alias for a bag that contains the
- * tuples from the input that match the grouping criterion
- */
- if (!mIsSchemaComputed) {
- List<Schema.FieldSchema> fss = new ArrayList<Schema.FieldSchema>(
- inputs.size() + 1);
- // one more to account for the "group"
- // the alias of the first field is group and hence the
- // string "group"
-
- /*
- * Here goes an attempt to describe how the schema for the first
- * column - 'group' should look like. If the number of group by
- * columns = 1 then the schema for 'group' is the
- * schema(fieldschema(col)) If the number of group by columns > 1
- * then find the set union of the group by columns and form the
- * schema as schema(list<fieldschema of the cols>)
- * The parser will ensure that the number of group by columns are
- * the same across all inputs. The computation of the schema for group
- * is as follows:
- * For each input of cogroup, for each operator (projection ,udf, constant), etc.
- * compute the multimaps <group_column_number, alias> and <group_column_number, operator>
- * and <alias, expression_operator>
- * Also set the lookup table for each alias to false
- */
-
- Schema groupBySchema = null;
- List<Schema.FieldSchema> groupByFss = new ArrayList<Schema.FieldSchema>();
- Map<String, Boolean> aliasLookup = new HashMap<String, Boolean>();
- MultiMap<String, ExpressionOperator> aliasExop = new MultiMap<String, ExpressionOperator>();
- MultiMap<Integer, String> positionAlias = new MultiMap<Integer, String>();
- MultiMap<Integer, ExpressionOperator> positionOperators = new MultiMap<Integer, ExpressionOperator>();
-
- for (LogicalOperator op : inputs) {
- int position = 0;
- for(LogicalPlan plan: mGroupByPlans.get(op)) {
- for(LogicalOperator eOp: plan.getLeaves()) {
- Schema.FieldSchema fs = ((ExpressionOperator)eOp).getFieldSchema();
- if (null != fs) {
- String alias = fs.alias;
- if(null != alias) {
- aliasLookup.put(alias, false);
- aliasExop.put(alias, (ExpressionOperator)eOp);
- positionAlias.put(position, alias);
- }
- //store the operators for each position in the group
- } else {
- log.warn("Field Schema of an expression operator cannot be null");
- }
- positionOperators.put(position, (ExpressionOperator)eOp);
- }
- ++position;
- }
- }
-
- /*
- * Now that the multi maps and the look up table are computed, do the following:
- * for each column in the group, in order check if the alias is alaready used or not
- * If the alias is already used, check for the next unused alias.
- * IF none of the aliases can be used then the alias of that column is null
- * If an alias is found usable, then use that alias and the schema of the expression operator
- * corresponding to that position. Note that the first operator for that position is
- * picked. The type checker will ensure that the correct schema is merged
- */
- int arity = mGroupByPlans.get(inputs.get(0)).size();
- for (int i = 0; i < arity; ++i) {
- Schema.FieldSchema groupByFs;
- Collection<String> cAliases = positionAlias.get(i);
- if(null != cAliases) {
- Object[] aliases = cAliases.toArray();
- for(int j = 0; j < aliases.length; ++j) {
- String alias = (String) aliases[j];
- if(null != alias) {
- //Collection<ExpressionOperator> cEops = aliasExop.get(alias);
- Collection<ExpressionOperator> cEops = positionOperators.get(i);
- if(null != cEops) {
- ExpressionOperator eOp = (ExpressionOperator) (cEops.toArray())[0];
- if(null != eOp) {
- if(!aliasLookup.get(alias)) {
- Schema.FieldSchema fs = eOp.getFieldSchema();
- if(null != fs) {
- groupByFs = new Schema.FieldSchema(alias, fs.schema, fs.type);
- groupByFss.add(groupByFs);
- aliasLookup.put(alias, true);
- } else {
- groupByFs = new Schema.FieldSchema(alias, null, DataType.BYTEARRAY);
- groupByFss.add(groupByFs);
- }
- setFieldSchemaParent(groupByFs, positionOperators, i);
- break;
- } else {
- if((j + 1) < aliases.length) {
- continue;
- } else {
- //we have seen this alias before
- //just add the schema of the expression operator with the null alias
- Schema.FieldSchema fs = eOp.getFieldSchema();
- if(null != fs) {
- groupByFs = new Schema.FieldSchema(null, fs.schema, fs.type);
- groupByFss.add(groupByFs);
- for(ExpressionOperator op: cEops) {
- Schema.FieldSchema opFs = op.getFieldSchema();
- if(null != opFs) {
- groupByFs.setParent(opFs.canonicalName, eOp);
- } else {
- groupByFs.setParent(null, eOp);
- }
- }
- } else {
- groupByFs = new Schema.FieldSchema(null, null, DataType.BYTEARRAY);
- groupByFss.add(groupByFs);
- }
- setFieldSchemaParent(groupByFs, positionOperators, i);
- break;
- }
- }
- } else {
- //should not be here
- log.debug("Cannot be here: we cannot have a collection of null expression operators");
- }
- } else {
- //should not be here
- log.debug("Cannot be here: we should have an expression operator at each position");
- }
- } else {
- //should not be here
- log.debug("Cannot be here: we cannot have a collection of null aliases ");
- }
- }
- } else {
- //We do not have any alias for this position in the group by columns
- //We have positions $1, $2, etc.
- Collection<ExpressionOperator> cEops = positionOperators.get(i);
- if(null != cEops) {
- ExpressionOperator eOp = (ExpressionOperator) (cEops.toArray())[0];
- if(null != eOp) {
- Schema.FieldSchema fs = eOp.getFieldSchema();
- if(null != fs) {
- groupByFs = new Schema.FieldSchema(null, fs.schema, fs.type);
- groupByFss.add(groupByFs);
- } else {
- groupByFs = new Schema.FieldSchema(null, null, DataType.BYTEARRAY);
- groupByFss.add(groupByFs);
- }
- } else {
- groupByFs = new Schema.FieldSchema(null, DataType.BYTEARRAY);
- groupByFss.add(groupByFs);
- }
- } else {
- groupByFs = new Schema.FieldSchema(null, DataType.BYTEARRAY);
- groupByFss.add(groupByFs);
- }
- setFieldSchemaParent(groupByFs, positionOperators, i);
- }
- }
-
- groupBySchema = new Schema(groupByFss);
-
- if(1 == arity) {
- byte groupByType = getAtomicGroupByType();
- Schema groupSchema = groupByFss.get(0).schema;
- Schema.FieldSchema groupByFs = new Schema.FieldSchema("group", groupSchema, groupByType);
- setFieldSchemaParent(groupByFs, positionOperators, 0);
- fss.add(groupByFs);
- } else {
- Schema mergedGroupSchema = getTupleGroupBySchema();
- if(mergedGroupSchema.size() != groupBySchema.size()) {
- mSchema = null;
- mIsSchemaComputed = false;
- int errCode = 2000;
- String msg = "Internal error. Mismatch in group by arities. Expected: " + mergedGroupSchema + ". Found: " + groupBySchema;
- throw new FrontendException(msg, errCode, PigException.BUG, false, null);
- } else {
- for(int i = 0; i < mergedGroupSchema.size(); ++i) {
- Schema.FieldSchema mergedFs = mergedGroupSchema.getField(i);
- Schema.FieldSchema groupFs = groupBySchema.getField(i);
- mergedFs.alias = groupFs.alias;
- mergedGroupSchema.addAlias(mergedFs.alias, mergedFs);
- }
- }
-
- Schema.FieldSchema groupByFs = new Schema.FieldSchema("group", mergedGroupSchema);
- fss.add(groupByFs);
- for(int i = 0; i < arity; ++i) {
- setFieldSchemaParent(groupByFs, positionOperators, i);
- }
- }
- for (LogicalOperator op : inputs) {
- try {
- Schema.FieldSchema bagFs = new Schema.FieldSchema(op.getAlias(),
- op.getSchema(), DataType.BAG);
- fss.add(bagFs);
- setFieldSchemaParent(bagFs, op);
- } catch (FrontendException ioe) {
- mIsSchemaComputed = false;
- mSchema = null;
- throw ioe;
- }
- }
- mIsSchemaComputed = true;
- mSchema = new Schema(fss);
- mType = DataType.BAG;//mType is from the super class
- }
- return mSchema;
- }
-
- public boolean isTupleGroupCol() {
- List<LogicalOperator> inputs = mPlan.getPredecessors(this);
- if (inputs == null || inputs.size() == 0) {
- throw new AssertionError("COGroup.isTupleGroupCol() can be called "
- + "after it has an input only") ;
- }
- return mGroupByPlans.get(inputs.get(0)).size() > 1 ;
- }
-
- @Override
- public void visit(LOVisitor v) throws VisitorException {
- v.visit(this);
- }
-
- /***
- *
- * This does switch the mapping
- *
- * oldOp -> List of inner plans
- * to
- * newOp -> List of inner plans
- *
- * which is useful when there is a structural change in LogicalPlan
- *
- * @param oldOp the old operator
- * @param newOp the new operator
- */
- public void switchGroupByPlanOp(LogicalOperator oldOp,
- LogicalOperator newOp) {
- Collection<LogicalPlan> innerPlans = mGroupByPlans.removeKey(oldOp) ;
- mGroupByPlans.put(newOp, innerPlans);
- }
-
- public void unsetSchema() throws VisitorException{
- for(LogicalOperator input: getInputs()) {
- for(LogicalPlan plan: mGroupByPlans.get(input)) {
- SchemaRemover sr = new SchemaRemover(plan);
- sr.visit();
- }
- }
- super.unsetSchema();
- }
-
- /**
- * This can be used to get the merged type of output group col
- * only when the group col is of atomic type
- * TODO: This doesn't work with group by complex type
- * @return The type of the group by
- */
- public byte getAtomicGroupByType() throws FrontendException {
- if (isTupleGroupCol()) {
- int errCode = 1010;
- String msg = "getAtomicGroupByType is used only when"
- + " dealing with atomic group col";
- throw new FrontendException(msg, errCode, PigException.INPUT, false, null) ;
- }
-
- byte groupType = DataType.BYTEARRAY ;
- // merge all the inner plan outputs so we know what type
- // our group column should be
- for(int i=0;i < getInputs().size(); i++) {
- LogicalOperator input = getInputs().get(i) ;
- List<LogicalPlan> innerPlans
- = new ArrayList<LogicalPlan>(getGroupByPlans().get(input)) ;
- if (innerPlans.size() != 1) {
- int errCode = 1012;
- String msg = "Each COGroup input has to have "
- + "the same number of inner plans";
- throw new FrontendException(msg, errCode, PigException.INPUT, false, null) ;
- }
- byte innerType = innerPlans.get(0).getSingleLeafPlanOutputType() ;
- groupType = DataType.mergeType(groupType, innerType) ;
- if (groupType==-1)
- {
- int errCode = 1107;
- String msg = "Cannot merge cogroup keys, incompatible types";
- throw new FrontendException(msg, errCode, PigException.INPUT) ;
- }
- }
-
- return groupType ;
- }
-
- /*
- This implementation is based on the assumption that all the
- inputs have the same group col tuple arity.
- TODO: This doesn't work with group by complex type
- */
- public Schema getTupleGroupBySchema() throws FrontendException {
- if (!isTupleGroupCol()) {
- int errCode = 1011;
- String msg = "getTupleGroupBySchema is used only when"
- + " dealing with tuple group col";
- throw new FrontendException(msg, errCode, PigException.INPUT, false, null) ;
- }
-
- // this fsList represents all the columns in group tuple
- List<Schema.FieldSchema> fsList = new ArrayList<Schema.FieldSchema>() ;
-
- int outputSchemaSize = getGroupByPlans().get(getInputs().get(0)).size() ;
-
- // by default, they are all bytearray
- // for type checking, we don't care about aliases
- for(int i=0; i<outputSchemaSize; i++) {
- fsList.add(new Schema.FieldSchema(null, DataType.BYTEARRAY)) ;
- }
-
- // merge all the inner plan outputs so we know what type
- // our group column should be
- for(int i=0;i < getInputs().size(); i++) {
- LogicalOperator input = getInputs().get(i) ;
- List<LogicalPlan> innerPlans
- = new ArrayList<LogicalPlan>(getGroupByPlans().get(input)) ;
-
- boolean seenProjectStar = false;
- for(int j=0;j < innerPlans.size(); j++) {
- byte innerType = innerPlans.get(j).getSingleLeafPlanOutputType() ;
- ExpressionOperator eOp = (ExpressionOperator)innerPlans.get(j).getSingleLeafPlanOutputOp();
-
- if(eOp instanceof LOProject) {
- if(((LOProject)eOp).isStar()) {
- seenProjectStar = true;
- }
- }
-
- Schema.FieldSchema groupFs = fsList.get(j);
- byte dt = groupFs.type;
- groupFs.type = DataType.mergeType(groupFs.type, innerType) ;
- if (!DataType.isUsableType(groupFs.type)) {
- int errCode = 1110;
- String msg = "Cogroup column " + j + " has incompatible types: "
- + DataType.findTypeName(dt) + " versus "
- + DataType.findTypeName(innerType);
- throw new FrontendException(msg, errCode, PigException.INPUT, false, null);
- }
- Schema.FieldSchema fs = eOp.getFieldSchema();
- if(null != fs) {
- groupFs.setParent(eOp.getFieldSchema().canonicalName, eOp);
- } else {
- groupFs.setParent(null, eOp);
- }
- }
-
- if(seenProjectStar) {
- int errCode = 1013;
- String msg = "Grouping attributes can either be star (*) or a list of expressions, but not both.";
- throw new FrontendException(msg, errCode, PigException.INPUT, false, null);
- }
-
- }
-
- return new Schema(fsList) ;
- }
-
- private void setFieldSchemaParent(Schema.FieldSchema fs, MultiMap<Integer, ExpressionOperator> positionOperators, int position) throws FrontendException {
- for(ExpressionOperator op: positionOperators.get(position)) {
- Schema.FieldSchema opFs = op.getFieldSchema();
- if(null != opFs) {
- fs.setParent(opFs.canonicalName, op);
- } else {
- fs.setParent(null, op);
- }
- }
- }
-
- private void setFieldSchemaParent(Schema.FieldSchema fs, LogicalOperator op) throws FrontendException {
- Schema s = op.getSchema();
- if(null != s) {
- for(Schema.FieldSchema inputFs: s.getFields()) {
- if(null != inputFs) {
- fs.setParent(inputFs.canonicalName, op);
- } else {
- fs.setParent(null, op);
- }
- }
- } else {
- fs.setParent(null, op);
- }
- }
-
- /**
- * @see org.apache.pig.impl.logicalLayer.LogicalOperator#clone()
- * Do not use the clone method directly. Operators are cloned when logical plans
- * are cloned using {@link LogicalPlanCloner}
- */
- @Override
- protected Object clone() throws CloneNotSupportedException {
-
- // first start with LogicalOperator clone
- LOCogroup cogroupClone = (LOCogroup)super.clone();
-
- // create deep copy of other cogroup specific members
- cogroupClone.mIsInner = new boolean[mIsInner.length];
- for (int i = 0; i < mIsInner.length; i++) {
- cogroupClone.mIsInner[i] = mIsInner[i];
- }
-
- cogroupClone.mGroupByPlans = new MultiMap<LogicalOperator, LogicalPlan>();
- for (Iterator<LogicalOperator> it = mGroupByPlans.keySet().iterator(); it.hasNext();) {
- LogicalOperator relOp = it.next();
- Collection<LogicalPlan> values = mGroupByPlans.get(relOp);
- for (Iterator<LogicalPlan> planIterator = values.iterator(); planIterator.hasNext();) {
- LogicalPlanCloneHelper lpCloneHelper = new LogicalPlanCloneHelper(planIterator.next());
- cogroupClone.mGroupByPlans.put(relOp, lpCloneHelper.getClonedPlan());
- }
- }
-
- return cogroupClone;
- }
-
- @Override
- public ProjectionMap getProjectionMap() {
-
- if(mIsProjectionMapComputed) return mProjectionMap;
- mIsProjectionMapComputed = true;
-
- Schema outputSchema;
-
- try {
- outputSchema = getSchema();
- } catch (FrontendException fee) {
- mProjectionMap = null;
- return mProjectionMap;
- }
-
- if(outputSchema == null) {
- mProjectionMap = null;
- return mProjectionMap;
- }
-
- List<LogicalOperator> predecessors = (ArrayList<LogicalOperator>)mPlan.getPredecessors(this);
- if(predecessors == null) {
- mProjectionMap = null;
- return mProjectionMap;
- }
-
- //the column with the alias 'group' can be mapped in several ways
- //1. group A by $0;
- //Here the mapping is 0 -> (0, 0)
- //2. group A by ($0, $1);
- //Here there is no direct mapping and 'group' is an added column
- //3. cogroup A by $0, B by $0;
- //Here the mapping is 0 -> ((0, 0), (1, 0))
- //4. cogroup A by ($0, $1), B by ($0, $1);
- //Here there is no direct mapping and 'group' is an added column
- //For anything other than a simple project 'group' is an added column
-
- MultiMap<LogicalOperator, LogicalPlan> groupByPlans = getGroupByPlans();
-
- boolean groupByAdded = false;
- MultiMap<Integer, ProjectionMap.Column> mapFields = new MultiMap<Integer, ProjectionMap.Column>();
- List<Pair<Integer, Integer>> removedFields = new ArrayList<Pair<Integer, Integer>>();
-
- for(int inputNum = 0; (inputNum < predecessors.size()) && (!groupByAdded); ++inputNum) {
- LogicalOperator predecessor = predecessors.get(inputNum);
-
- List<LogicalPlan> predecessorPlans = (ArrayList<LogicalPlan>) groupByPlans.get(predecessor);
-
- int inputColumn = -1;
- for(LogicalPlan predecessorPlan: predecessorPlans) {
- List<LogicalOperator> leaves = predecessorPlan.getLeaves();
- if(leaves == null || leaves.size() > 1) {
- groupByAdded = true;
- break;
- }
-
- if(leaves.get(0) instanceof LOProject) {
- //find out if this project is a chain of projects
- Pair<LOProject, LOCast> pair = LogicalPlan.chainOfProjects(predecessorPlan);
- if (pair != null) {
- LOProject topProject = pair.first;
- if (topProject != null) {
- inputColumn = topProject.getCol();
- LOCast cast = pair.second;
- if (cast != null) {
- mapFields.put(0,
- new ProjectionMap.Column(
- new Pair<Integer, Integer>(inputNum, inputColumn), true, cast.getType()
- )
- );
- } else {
- mapFields.put(0, new ProjectionMap.Column(new Pair<Integer, Integer>(inputNum, inputColumn)));
- }
- }
- }
- } else {
- groupByAdded = true;
- }
- }
-
- Schema inputSchema;
- try {
- inputSchema = predecessor.getSchema();
- } catch (FrontendException fee) {
- mProjectionMap = null;
- return mProjectionMap;
- }
-
- if(inputSchema != null) {
- for(int column = 0; column < inputSchema.size(); ++column) {
- if(!groupByAdded && inputColumn != column) {
- removedFields.add(new Pair<Integer, Integer>(inputNum, column));
- }
- }
- }
-
- }
-
- List<Integer> addedFields = new ArrayList<Integer>();
-
- if(groupByAdded) {
- addedFields.add(0); //for the column 'group'
- mapFields = null; //since 'group' is an added column there is no mapping
- }
-
- //the columns 1 through n - 1 are generated by cogroup
- for(int i = 0; i < groupByPlans.keySet().size(); ++i) {
- addedFields.add(i+ 1);
- }
-
- if(removedFields.size() == 0) {
- removedFields = null;
- }
-
- mProjectionMap = new ProjectionMap(mapFields, removedFields, addedFields);
- return mProjectionMap;
- }
-
- @Override
- public List<RequiredFields> getRequiredFields() {
- List<LogicalOperator> predecessors = mPlan.getPredecessors(this);
-
- if(predecessors == null) {
- return null;
- }
-
- List<RequiredFields> requiredFields = new ArrayList<RequiredFields>();
-
- for(int inputNum = 0; inputNum < predecessors.size(); ++inputNum) {
- Set<Pair<Integer, Integer>> fields = new HashSet<Pair<Integer, Integer>>();
- Set<LOProject> projectSet = new HashSet<LOProject>();
- boolean groupByStar = false;
-
- for (LogicalPlan plan : getGroupByPlans().get(predecessors.get(inputNum))) {
- TopLevelProjectFinder projectFinder = new TopLevelProjectFinder(plan);
- try {
- projectFinder.visit();
- } catch (VisitorException ve) {
- requiredFields.clear();
- requiredFields.add(null);
- return requiredFields;
- }
- projectSet.addAll(projectFinder.getProjectSet());
- if(projectFinder.getProjectStarSet() != null) {
- groupByStar = true;
- }
- }
-
- if(groupByStar) {
- requiredFields.add(new RequiredFields(true));
- } else {
- for (LOProject project : projectSet) {
- for (int inputColumn : project.getProjection()) {
- fields.add(new Pair<Integer, Integer>(inputNum, inputColumn));
- }
- }
-
- if(fields.size() == 0) {
- requiredFields.add(new RequiredFields(false, true));
- } else {
- requiredFields.add(new RequiredFields(new ArrayList<Pair<Integer, Integer>>(fields)));
- }
- }
- }
-
- return (requiredFields.size() == 0? null: requiredFields);
- }
-
- /* (non-Javadoc)
- * @see org.apache.pig.impl.plan.Operator#rewire(org.apache.pig.impl.plan.Operator, org.apache.pig.impl.plan.Operator)
- */
- @Override
- public void rewire(Operator<LOVisitor> oldPred, int oldPredIndex, Operator<LOVisitor> newPred, boolean useOldPred) throws PlanException {
- super.rewire(oldPred, oldPredIndex, newPred, useOldPred);
- if(newPred == null) {
- int errCode = 1097;
- String msg = "Replacement node cannot be null.";
- throw new PlanException(msg, errCode, PigException.INPUT);
- }
- LogicalOperator previous = (LogicalOperator) oldPred;
- LogicalOperator current = (LogicalOperator) newPred;
- Set<LogicalOperator> cogroupInputs = new HashSet<LogicalOperator>(mGroupByPlans.keySet());
- for(LogicalOperator input: cogroupInputs) {
- if(input.equals(previous)) {
- //replace the references to the key(i.e., previous) in the values with current
- for(LogicalPlan plan: mGroupByPlans.get(input)) {
- try {
- ProjectFixerUpper projectFixer = new ProjectFixerUpper(
- plan, previous, oldPredIndex, current, useOldPred, this);
- projectFixer.visit();
- } catch (VisitorException ve) {
- int errCode = 2144;
- String msg = "Problem while fixing project inputs during rewiring.";
- throw new PlanException(msg, errCode, PigException.BUG, ve);
- }
- }
- //remove the key and the values
- List<LogicalPlan> plans = mGroupByPlans.get(previous);
- mGroupByPlans.removeKey(previous);
-
- //reinsert new key and values
- mGroupByPlans.put(current, plans);
- }
- }
- }
- @Override
- public List<RequiredFields> getRelevantInputs(int output, int column) throws FrontendException {
- if (!mIsSchemaComputed)
- getSchema();
-
- if (output!=0)
- return null;
-
- List<LogicalOperator> predecessors = (ArrayList<LogicalOperator>)mPlan.getPredecessors(this);
- if(predecessors == null) {
- return null;
- }
-
- if (column>predecessors.size())
- return null;
-
- if (column==0)
- {
- return getRequiredFields();
- }
-
- List<RequiredFields> result = new ArrayList<RequiredFields>();
- for(int inputNum = 0; inputNum < predecessors.size(); ++inputNum) {
-
- if(inputNum == column-1) {
- result.add(new RequiredFields(true));
- } else {
- result.add(null);
- }
- }
- return result;
- }
-
- @Override
- public boolean pruneColumns(List<Pair<Integer, Integer>> columns)
- throws FrontendException {
- if (!mIsSchemaComputed)
- getSchema();
- if (mSchema == null) {
- log
- .warn("Cannot prune columns in cogroup, no schema information found");
- return false;
- }
-
- List<LogicalOperator> predecessors = mPlan.getPredecessors(this);
-
- if (predecessors == null) {
- int errCode = 2190;
- throw new FrontendException("Cannot find predecessors for cogroup",
- errCode, PigException.BUG);
- }
-
- for (int i=columns.size()-1;i>=0;i--) {
- Pair<Integer, Integer> column = columns.get(i);
- if (column.first < 0 || column.first > predecessors.size()) {
- int errCode = 2191;
- throw new FrontendException("No input " + column.first
- + " to prune in cocogroup", errCode, PigException.BUG);
- }
- if (column.second < 0) {
- int errCode = 2192;
- throw new FrontendException("column to prune does not exist", errCode, PigException.BUG);
- }
- for (LogicalPlan plan : getGroupByPlans().get(
- predecessors.get(column.first))) {
- pruneColumnInPlan(plan, column.second);
- }
- }
- super.pruneColumns(columns);
- return true;
- }
-}
Modified: pig/branches/branch-0.9/src/org/apache/pig/impl/logicalLayer/LOConst.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.9/src/org/apache/pig/impl/logicalLayer/LOConst.java?rev=1100420&r1=1100419&r2=1100420&view=diff
==============================================================================
--- pig/branches/branch-0.9/src/org/apache/pig/impl/logicalLayer/LOConst.java (original)
+++ pig/branches/branch-0.9/src/org/apache/pig/impl/logicalLayer/LOConst.java Sat May 7 00:15:40 2011
@@ -1,107 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.pig.impl.logicalLayer;
-
-import org.apache.pig.PigException;
-import org.apache.pig.impl.plan.OperatorKey;
-import org.apache.pig.impl.plan.PlanVisitor;
-import org.apache.pig.impl.plan.VisitorException;
-import org.apache.pig.impl.logicalLayer.schema.Schema;
-import org.apache.pig.data.DataType;
-
-public class LOConst extends ExpressionOperator {
-
- // Cast has an expression that has to be converted to a specified type
-
- private static final long serialVersionUID = 2L;
- private Object mValue;
-
- /**
- *
- * @param plan
- * Logical plan this operator is a part of.
- * @param k
- * Operator key to assign to this node.
- * @param value
- * the value of the constant
- */
- public LOConst(LogicalPlan plan, OperatorKey k, Object value) {
- super(plan, k);
- mValue = value;
- }// End Constructor LOConst
-
- public Object getValue() {
- return mValue;
- }
-
- @Override
- public void visit(LOVisitor v) throws VisitorException {
- v.visit(this);
- }
-
- @Override
- public Schema getSchema() {
- return mSchema;
- }
-
- @Override
- public Schema.FieldSchema getFieldSchema() throws FrontendException {
- if(!mIsFieldSchemaComputed) {
- try {
- mFieldSchema = DataType.determineFieldSchema(mValue);
- mIsFieldSchemaComputed = true;
- } catch (Exception e) {
- mFieldSchema = null;
- mIsFieldSchemaComputed = false;
- int errCode = 1015;
- String msg = "Error determining fieldschema of constant: " + this;
- throw new FrontendException(msg, errCode, PigException.INPUT, false, null, e);
- }
- }
- return mFieldSchema;
- }
-
- @Override
- public String name() {
- return "Const " + mKey.scope + "-" + mKey.id + "( " + mValue + " )";
- }
-
- @Override
- public boolean supportsMultipleInputs() {
- return false;
- }
-
- // This allows us to assign a constant to an alias
- @Override
- public boolean supportsMultipleOutputs() {
- return true;
- }
-
- /**
- * @see org.apache.pig.impl.logicalLayer.ExpressionOperator#clone()
- * Do not use the clone method directly. Operators are cloned when logical plans
- * are cloned using {@link LogicalPlanCloner}
- */
- @Override
- protected Object clone() throws CloneNotSupportedException {
- LOConst clone = (LOConst)super.clone();
- return clone;
- }
-
-}