You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by xu...@apache.org on 2011/05/07 02:15:44 UTC

svn commit: r1100420 [6/19] - in /pig/branches/branch-0.9: ./ src/ src/org/apache/pig/ src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/ src/org/apache/pig/impl/logicalLayer/ src/org/apache/pig/impl/logicalLayer/optimizer/ src/org/apache...

Modified: pig/branches/branch-0.9/src/org/apache/pig/impl/logicalLayer/LONot.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.9/src/org/apache/pig/impl/logicalLayer/LONot.java?rev=1100420&r1=1100419&r2=1100420&view=diff
==============================================================================
--- pig/branches/branch-0.9/src/org/apache/pig/impl/logicalLayer/LONot.java (original)
+++ pig/branches/branch-0.9/src/org/apache/pig/impl/logicalLayer/LONot.java Sat May  7 00:15:40 2011
@@ -1,69 +0,0 @@
-/* 
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.pig.impl.logicalLayer;
-
-import org.apache.pig.impl.plan.OperatorKey;
-import org.apache.pig.impl.plan.PlanVisitor;
-import org.apache.pig.impl.plan.VisitorException;
-import org.apache.pig.impl.logicalLayer.schema.Schema;
-import org.apache.pig.data.DataType;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-
-public class LONot extends UnaryExpressionOperator {
-
-    private static final long serialVersionUID = 2L;
-    private static Log log = LogFactory.getLog(LONot.class);
-
-    /**
-     * 
-     * @param plan
-     *            Logical plan this operator is a part of.
-     * @param k
-     *            Operator key to assign to this node.
-     */
-    public LONot(LogicalPlan plan, OperatorKey k) {
-        super(plan, k);
-    }
-    
-    @Override
-    public Schema getSchema() {
-        return mSchema;
-    }
-
-    @Override
-    public Schema.FieldSchema getFieldSchema() throws FrontendException {
-        if(!mIsFieldSchemaComputed) {
-            mFieldSchema = new Schema.FieldSchema(null, DataType.BOOLEAN);
-            mFieldSchema.setParent(getOperand().getFieldSchema().canonicalName, getOperand());
-            mIsFieldSchemaComputed = true;
-        }
-        return mFieldSchema;
-    }
-
-    @Override
-    public void visit(LOVisitor v) throws VisitorException {
-        v.visit(this);
-    }
-
-    @Override
-    public String name() {
-        return "Not " + mKey.scope + "-" + mKey.id;
-    }
-}

Modified: pig/branches/branch-0.9/src/org/apache/pig/impl/logicalLayer/LONotEqual.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.9/src/org/apache/pig/impl/logicalLayer/LONotEqual.java?rev=1100420&r1=1100419&r2=1100420&view=diff
==============================================================================
--- pig/branches/branch-0.9/src/org/apache/pig/impl/logicalLayer/LONotEqual.java (original)
+++ pig/branches/branch-0.9/src/org/apache/pig/impl/logicalLayer/LONotEqual.java Sat May  7 00:15:40 2011
@@ -1,70 +0,0 @@
-/* 
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.pig.impl.logicalLayer;
-
-import org.apache.pig.impl.plan.OperatorKey;
-import org.apache.pig.impl.plan.PlanVisitor;
-import org.apache.pig.impl.plan.VisitorException;
-import org.apache.pig.impl.logicalLayer.schema.Schema;
-import org.apache.pig.data.DataType;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-
-public class LONotEqual extends BinaryExpressionOperator {
-
-    private static final long serialVersionUID = 2L;
-    private static Log log = LogFactory.getLog(LONotEqual.class);
-
-    /**
-     * 
-     * @param plan
-     *            Logical plan this operator is a part of.
-     * @param k
-     *            Operator key to assign to this node.
-     */
-    public LONotEqual(LogicalPlan plan, OperatorKey k) {
-        super(plan, k);
-    }
-
-    @Override
-    public Schema getSchema() {
-        return mSchema;
-    }
-
-    @Override
-    public Schema.FieldSchema getFieldSchema() throws FrontendException {
-        if(!mIsFieldSchemaComputed) {
-            mFieldSchema = new Schema.FieldSchema(null, DataType.BOOLEAN);
-            mFieldSchema.setParent(getLhsOperand().getFieldSchema().canonicalName, getLhsOperand());
-            mFieldSchema.setParent(getRhsOperand().getFieldSchema().canonicalName, getRhsOperand());
-            mIsFieldSchemaComputed = true;
-        }
-        return mFieldSchema;
-    }
-
-    @Override
-    public void visit(LOVisitor v) throws VisitorException {
-        v.visit(this);
-    }
-
-    @Override
-    public String name() {
-        return "NotEqual " + mKey.scope + "-" + mKey.id;
-    }
-}

Modified: pig/branches/branch-0.9/src/org/apache/pig/impl/logicalLayer/LOOr.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.9/src/org/apache/pig/impl/logicalLayer/LOOr.java?rev=1100420&r1=1100419&r2=1100420&view=diff
==============================================================================
--- pig/branches/branch-0.9/src/org/apache/pig/impl/logicalLayer/LOOr.java (original)
+++ pig/branches/branch-0.9/src/org/apache/pig/impl/logicalLayer/LOOr.java Sat May  7 00:15:40 2011
@@ -1,70 +0,0 @@
-/* 
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.pig.impl.logicalLayer;
-
-import org.apache.pig.impl.plan.OperatorKey;
-import org.apache.pig.impl.plan.PlanVisitor;
-import org.apache.pig.impl.plan.VisitorException;
-import org.apache.pig.impl.logicalLayer.schema.Schema;
-import org.apache.pig.data.DataType;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-
-public class LOOr extends BinaryExpressionOperator {
-
-    private static final long serialVersionUID = 2L;
-    private static Log log = LogFactory.getLog(LOOr.class);
-
-    /**
-     * 
-     * @param plan
-     *            Logical plan this operator is a part of.
-     * @param k
-     *            Operator key to assign to this node.
-     */
-    public LOOr(LogicalPlan plan, OperatorKey k) {
-        super(plan, k);
-    }
-    
-    @Override
-    public Schema getSchema() {
-        return mSchema;
-    }
-
-    @Override
-    public Schema.FieldSchema getFieldSchema() throws FrontendException {
-        if(!mIsFieldSchemaComputed) {
-            mFieldSchema = new Schema.FieldSchema(null, DataType.BOOLEAN);
-            mFieldSchema.setParent(getLhsOperand().getFieldSchema().canonicalName, getLhsOperand());
-            mFieldSchema.setParent(getRhsOperand().getFieldSchema().canonicalName, getRhsOperand());
-            mIsFieldSchemaComputed = true;
-        }
-        return mFieldSchema;
-    }
-
-    @Override
-    public void visit(LOVisitor v) throws VisitorException {
-        v.visit(this);
-    }
-
-    @Override
-    public String name() {
-        return "Or " + mKey.scope + "-" + mKey.id;
-    }
-}

Modified: pig/branches/branch-0.9/src/org/apache/pig/impl/logicalLayer/LOPrinter.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.9/src/org/apache/pig/impl/logicalLayer/LOPrinter.java?rev=1100420&r1=1100419&r2=1100420&view=diff
==============================================================================
--- pig/branches/branch-0.9/src/org/apache/pig/impl/logicalLayer/LOPrinter.java (original)
+++ pig/branches/branch-0.9/src/org/apache/pig/impl/logicalLayer/LOPrinter.java Sat May  7 00:15:40 2011
@@ -1,217 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.pig.impl.logicalLayer;
-
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
-import java.io.PrintStream;
-import java.io.OutputStream;
-import java.io.IOException;
-import java.io.ByteArrayOutputStream;
-import java.util.Collection;
-import java.util.Collections;
-
-import org.apache.pig.data.DataType;
-import org.apache.pig.impl.logicalLayer.schema.Schema;
-import org.apache.pig.impl.plan.DepthFirstWalker;
-import org.apache.pig.impl.plan.DependencyOrderWalker;
-import org.apache.pig.impl.util.MultiMap;
-import org.apache.pig.impl.plan.VisitorException;
-
-/**
- * A visitor mechanism printing out the logical plan.
- */
-public class LOPrinter extends LOVisitor {
-
-    private PrintStream mStream = null;
-    private String TAB1 = "    ";
-    private String TABMore = "|   ";
-    private String LSep = "|\n|---";
-    private String USep = "|   |\n|   ";
-    private int levelCntr = -1;
-    private boolean isVerbose = true;
-
-    /**
-     * @param ps PrintStream to output plan information to
-     * @param plan Logical plan to print
-     */
-    public LOPrinter(PrintStream ps, LogicalPlan plan) {
-        //super(plan, new DependencyOrderWalker(plan));
-        super(plan, new DepthFirstWalker<LogicalOperator, LogicalPlan>(plan));
-        mStream = ps;
-    }
-
-    @Override
-    public void visit() throws VisitorException {
-        try {
-            mStream.write(depthFirstLP().getBytes());
-        } catch (IOException e) {
-            throw new VisitorException(e);
-        }
-    }
-
-    public void setVerbose(boolean verbose) {
-        isVerbose = verbose;
-    }
-
-    public void print(OutputStream printer) throws VisitorException, IOException {
-        printer.write(depthFirstLP().getBytes());
-    }
-
-
-    protected String depthFirstLP() throws VisitorException, IOException {
-        StringBuilder sb = new StringBuilder();
-        List<LogicalOperator> leaves = mPlan.getLeaves();
-        Collections.sort(leaves);
-        for (LogicalOperator leaf : leaves) {
-            sb.append(depthFirst(leaf));
-            sb.append("\n");
-        }
-        //sb.delete(sb.length() - "\n".length(), sb.length());
-        //sb.delete(sb.length() - "\n".length(), sb.length());
-        return sb.toString();
-    }
-    
-    private String planString(LogicalPlan lp) throws VisitorException, IOException {
-        StringBuilder sb = new StringBuilder();
-        ByteArrayOutputStream baos = new ByteArrayOutputStream();
-        if(lp!=null)
-            lp.explain(baos, mStream);
-        else
-            return "";
-        sb.append(USep);
-        sb.append(shiftStringByTabs(baos.toString(), 2));
-        return sb.toString();
-    }
-    
-    private String planString(
-            List<LogicalPlan> logicalPlanList) throws VisitorException, IOException {
-        StringBuilder sb = new StringBuilder();
-        if(logicalPlanList!=null)
-            for (LogicalPlan lp : logicalPlanList) {
-                sb.append(planString(lp));
-            }
-        return sb.toString();
-    }
-
-    private String depthFirst(LogicalOperator node) throws VisitorException, IOException {
-        StringBuilder sb = new StringBuilder(node.name());
-        if(node instanceof ExpressionOperator) {
-            sb.append(" FieldSchema: ");
-            try {
-                sb.append(((ExpressionOperator)node).getFieldSchema());
-            } catch (Exception e) {
-                sb.append("Caught Exception: " + e.getMessage());
-            }
-        } else {
-            sb.append(" Schema: ");
-            try {
-                sb.append(node.getSchema());
-            } catch (Exception e) {
-                sb.append("Caught exception: " + e.getMessage());
-            }
-        }
-        sb.append(" Type: " + DataType.findTypeName(node.getType()));
-        sb.append("\n");
-
-        if (isVerbose) {
-            if(node instanceof LOFilter){
-                sb.append(planString(((LOFilter)node).getComparisonPlan()));
-            }
-            else if(node instanceof LOForEach){
-                sb.append(planString(((LOForEach)node).getForEachPlans()));        
-            }
-            else if(node instanceof LOGenerate){
-                sb.append(planString(((LOGenerate)node).getGeneratePlans())); 
-                
-            }
-            else if(node instanceof LOCogroup){
-                MultiMap<LogicalOperator, LogicalPlan> plans = ((LOCogroup)node).getGroupByPlans();
-                for (LogicalOperator lo : plans.keySet()) {
-                    // Visit the associated plans
-                    for (LogicalPlan plan : plans.get(lo)) {
-                        sb.append(planString(plan));
-                    }
-                }
-            }
-            else if(node instanceof LOJoin){
-                MultiMap<LogicalOperator, LogicalPlan> plans = ((LOJoin)node).getJoinPlans();
-                for (LogicalOperator lo : plans.keySet()) {
-                    // Visit the associated plans
-                    for (LogicalPlan plan : plans.get(lo)) {
-                        sb.append(planString(plan));
-                    }
-                }
-            }
-            else if(node instanceof LOSort){
-                sb.append(planString(((LOSort)node).getSortColPlans())); 
-            }
-            else if(node instanceof LOSplitOutput){
-                sb.append(planString(((LOSplitOutput)node).getConditionPlan()));
-            }
-            else if (node instanceof LOProject) {
-                sb.append("Input: ");
-                sb.append(((LOProject)node).getExpression().name());
-            }
-        }
-        
-        List<LogicalOperator> originalPredecessors =  mPlan.getPredecessors(node);
-        if (originalPredecessors == null)
-            return sb.toString();
-        
-        List<LogicalOperator> predecessors =  new ArrayList<LogicalOperator>(originalPredecessors);
-        
-        Collections.sort(predecessors);
-        int i = 0;
-        for (LogicalOperator pred : predecessors) {
-            i++;
-            String DFStr = depthFirst(pred);
-            if (DFStr != null) {
-                sb.append(LSep);
-                if (i < predecessors.size())
-                    sb.append(shiftStringByTabs(DFStr, 2));
-                else
-                    sb.append(shiftStringByTabs(DFStr, 1));
-            }
-        }
-        return sb.toString();
-    }
-
-    private String shiftStringByTabs(String DFStr, int TabType) {
-        StringBuilder sb = new StringBuilder();
-        String[] spl = DFStr.split("\n");
-
-        String tab = (TabType == 1) ? TAB1 : TABMore;
-
-        sb.append(spl[0] + "\n");
-        for (int i = 1; i < spl.length; i++) {
-            sb.append(tab);
-            sb.append(spl[i]);
-            sb.append("\n");
-        }
-        return sb.toString();
-    }
-
-    private void dispTabs() {
-        for (int i = 0; i < levelCntr; i++)
-            System.out.print(TAB1);
-    }
-}
-
-        

Modified: pig/branches/branch-0.9/src/org/apache/pig/impl/logicalLayer/LOProject.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.9/src/org/apache/pig/impl/logicalLayer/LOProject.java?rev=1100420&r1=1100419&r2=1100420&view=diff
==============================================================================
--- pig/branches/branch-0.9/src/org/apache/pig/impl/logicalLayer/LOProject.java (original)
+++ pig/branches/branch-0.9/src/org/apache/pig/impl/logicalLayer/LOProject.java Sat May  7 00:15:40 2011
@@ -1,473 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.pig.impl.logicalLayer;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Iterator;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.pig.PigException;
-import org.apache.pig.data.DataType;
-import org.apache.pig.impl.plan.OperatorKey;
-import org.apache.pig.impl.plan.PlanVisitor;
-import org.apache.pig.impl.plan.VisitorException;
-import org.apache.pig.impl.logicalLayer.parser.ParseException;
-import org.apache.pig.impl.logicalLayer.schema.Schema;
-import org.apache.pig.impl.logicalLayer.schema.Schema.FieldSchema;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-
-/**
- * LOProject is designed like a singly linked list; A few examples will
- * illustrate the point about the linked list nature of the design;
- * a = load 'input1' as (name, age);
- * b = group a by name;
- * foreach b generate a, a.name;
- * The project operator occurs in two places in the above script:
- * generate a(here) and a.name(here)
- * In the first occurrence, we are trying to project the elements of
- * the bag a; In order to retrieve the bag, we need to project the
- * the second column ($1) or column number 1 (using the zero based index)
- * from the input (the relation or bag b)
- * In the second occurence, we are trying to project the first column
- * ($0) or column number 0 from the bag a which in turn is the column
- * number 1 in the relation b; As you can see, the nested structure or
- * the singly linked list nature is clearly visible;
- * Given that it's a singly linked list, the null pointer or the sentinel
- * is marked explictly using the boolean variable mSentinel; The sentinel
- * is marked true only when the input is a relational operator; This occurs
- * when we create the innermost operator
- */
-public class LOProject extends ExpressionOperator {
-    private static final long serialVersionUID = 2L;
-
-    /**
-     * The expression and the column to be projected.
-     */
-    private LogicalOperator mExp;
-    private List<Integer> mProjection;
-    private boolean mIsStar = false;
-    private static Log log = LogFactory.getLog(LOProject.class);
-    private boolean mSentinel;
-    private boolean mOverloaded = false;
-
-    private boolean sendEmptyBagOnEOP = false;
-
-    /**
-     * 
-     * @param plan
-     *            Logical plan this operator is a part of.
-     * @param key
-     *            Operator key to assign to this node.
-     * @param exp
-     *            the expression which might contain the column to project
-     * @param projection
-     *            the list of columns to project
-     */
-    public LOProject(LogicalPlan plan, OperatorKey key, LogicalOperator exp,
-            List<Integer> projection) {
-        super(plan, key);
-        mExp = exp;
-        mProjection = projection;
-        if(mExp instanceof ExpressionOperator) {
-            mSentinel = false;
-        } else {
-            mSentinel = true;
-        }
-    }
-
-    /**
-     * 
-     * @param plan
-     *            Logical plan this operator is a part of.
-     * @param key
-     *            Operator key to assign to this node.
-     * @param exp
-     *            the expression which might contain the column to project
-     * @param projection
-     *            the column to project
-     */
-    public LOProject(LogicalPlan plan, OperatorKey key, LogicalOperator exp,
-            Integer projection) {
-        super(plan, key);
-        mExp = exp;
-        mProjection = new ArrayList<Integer>(1);
-        mProjection.add(projection);
-        if(mExp instanceof ExpressionOperator) {
-            mSentinel = false;
-        } else {
-            mSentinel = true;
-        }
-    }
-
-    public LogicalOperator getExpression() {
-        return mExp;
-    }
-
-    public void setExpression(LogicalOperator exp) {
-        mExp = exp;
-    }
-
-    public boolean isStar() { 
-        return mIsStar;
-    }
-
-    public List<Integer> getProjection() {
-        return mProjection;
-    }
-
-    public void setProjection(List<Integer> proj) {
-        mProjection = proj;
-    }
-
-    public int getCol() {
-        if (mProjection.size() != 1)
-            
-            throw new RuntimeException(
-                    "Internal error: improper use of getCol in "
-                            + LOProject.class.getName());
-        return mProjection.get(0);
-
-    }
-
-    public void setStar(boolean b) {
-        mIsStar = b;
-    }
-
-    public boolean getSentinel() {
-        return mSentinel;
-    }
-
-    public void setSentinel(boolean b) {
-        mSentinel = b;
-    }
-
-    public boolean getOverloaded() {
-        return mOverloaded;
-    }
-
-    public void setOverloaded(boolean b) {
-        mOverloaded = b;
-    }
-
-    @Override
-    public String name() {
-        return "Project " + mKey.scope + "-" + mKey.id + " Projections: " + (mIsStar? " [*] ": mProjection) + " Overloaded: " + mOverloaded;
-    }
-
-    @Override
-    public boolean supportsMultipleInputs() {
-        return false;
-    }
-
-    @SuppressWarnings("deprecation")
-    @Override
-    public Schema.FieldSchema getFieldSchema() throws FrontendException {
-        log.debug("Inside getFieldSchemas");
-        log.debug("Number of columns: " + mProjection.size());
-        for (int i : mProjection) {
-            log.debug("Column: " + i);
-        }
-
-        if (mExp == null){
-            String msg = "The input for a projection operator cannot be null";
-            int errCode = 2998;
-            throw new FrontendException(msg, errCode, PigException.BUG, false, null);
-        }
-
-        LogicalOperator expressionOperator = mExp;
-        log.debug("expressionOperator = " + expressionOperator);
-        log.debug("mIsStar: " + mIsStar);
-
-        if (!mIsFieldSchemaComputed) {
-
-            if (mIsStar) {
-                log.debug("mIsStar is true");
-                try {
-                    if(!mSentinel) {
-                        //we have an expression operator and hence a list of field shcemas
-                        Schema.FieldSchema fs = ((ExpressionOperator)expressionOperator).getFieldSchema();
-                         mFieldSchema = Schema.FieldSchema.copyAndLink(fs, expressionOperator);
-                    } else {
-                        //we have a relational operator as input and hence a schema
-                        log.debug("expression operator alias: " + expressionOperator.getAlias());
-                        log.debug("expression operator schema: " + expressionOperator.getSchema());
-                        log.debug("expression operator type: " + expressionOperator.getType());
-                        //TODO
-                        //the type of the operator will be unknown. when type checking is in place
-                        //add the type of the operator as a parameter to the fieldschema creation
-                        mFieldSchema = new Schema.FieldSchema(expressionOperator.getAlias(), expressionOperator.getSchema(), DataType.TUPLE);
-                        mFieldSchema.setParent(null, expressionOperator);
-                        //mFieldSchema = new Schema.FieldSchema(expressionOperator.getAlias(), expressionOperator.getSchema());
-                    }
-                    mIsFieldSchemaComputed = true;
-                } catch (FrontendException fee) {
-                    mFieldSchema = null;
-                    mIsFieldSchemaComputed = false;
-                    throw fee;
-                }
-            } else {
-                //its n list of columns to project including a single column
-                List<Schema.FieldSchema> fss = new ArrayList<Schema.FieldSchema>(mProjection.size());
-                //try {
-                        log.debug("expressionOperator is not null");
-                        if(mProjection.size() == 1) {
-                            //if there is only one element then extract and return the field schema
-                            log.debug("Only one element");
-                            if(!mSentinel) {
-                                log.debug("Input is an expression operator");
-                                Schema.FieldSchema expOpFs = ((ExpressionOperator)expressionOperator).getFieldSchema();
-                                if(null != expOpFs) {
-                                    Schema s = expOpFs.schema;
-                                    if(null != s) {
-                                        Schema.FieldSchema fs;
-                                        if(s.isTwoLevelAccessRequired()) {
-                                            // this is the case where the schema is that of
-                                            // a bag which has just one tuple fieldschema which
-                                            // in turn has a list of fieldschemas. So the field
-                                            // schema we are trying to construct would be of the
-                                            // item we are trying to project inside the tuple 
-                                            // fieldschema - because currently when we say b.i where
-                                            // b is a bag, we are trying to access the item i
-                                            // present in the tuple in the bag.
-                                            
-                                            // check that indeed we only have one field schema
-                                            // which is that of a tuple
-                                            if(s.getFields().size() != 1) {
-                                                int errCode = 1008;
-                                                String msg = "Expected a bag schema with a single " +
-                                                "element of type "+ DataType.findTypeName(DataType.TUPLE) +
-                                                " but got a bag schema with multiple elements.";
-                                                throw new FrontendException(msg, errCode, PigException.INPUT, false, null);
-                                            }
-                                            Schema.FieldSchema tupleFS = s.getField(0);
-                                            if(tupleFS.type != DataType.TUPLE) {
-                                                int errCode = 1009;
-                                                String msg = "Expected a bag schema with a single " +
-                                                "element of type "+ DataType.findTypeName(DataType.TUPLE) +
-                                                " but got an element of type " +
-                                                DataType.findTypeName(tupleFS.type);
-                                                throw new FrontendException(msg, errCode, PigException.INPUT, false, null);
-                                            }
-                                            fs = tupleFS.schema.getField(mProjection.get(0));
-                                        } else {
-                                            // normal single level access
-                                            fs = s.getField(mProjection.get(0));
-                                        }
-                                        mFieldSchema = FieldSchema.copyAndLink( fs, expressionOperator );
-                                    } else {
-                                        mFieldSchema = new Schema.FieldSchema(null, DataType.BYTEARRAY);
-                                        mFieldSchema.setParent(expOpFs.canonicalName, expressionOperator);
-                                    }
-                                } else {
-                                    mFieldSchema = new Schema.FieldSchema(null, DataType.BYTEARRAY);
-                                    mFieldSchema.setParent(null, expressionOperator);
-                                }
-                            } else {
-                                log.debug("Input is a logical operator");
-                                Schema s = expressionOperator.getSchema();
-                                log.debug("s: " + s);
-                                if(null != s) {
-                                    Schema.FieldSchema fs = s.getField(mProjection.get(0));
-                                    mFieldSchema = FieldSchema.copyAndLink( fs, expressionOperator );
-                                    log.debug("mFieldSchema alias: " + mFieldSchema.alias);
-                                    log.debug("mFieldSchema schema: " + mFieldSchema.schema);
-                                } else {
-                                    mFieldSchema = new Schema.FieldSchema(null, DataType.BYTEARRAY);
-                                    mFieldSchema.setParent(null, expressionOperator);
-                                }
-                                mType = mFieldSchema.type ;
-                            }
-                            mIsFieldSchemaComputed = true;
-                            return mFieldSchema;
-                        }
-                        
-                        for (int colNum : mProjection) {
-                            log.debug("Col: " + colNum);
-                            Schema.FieldSchema fs;
-                            if(!mSentinel) {
-                                Schema.FieldSchema expOpFs = ((ExpressionOperator)expressionOperator).getFieldSchema();
-                                if(null != expOpFs) {
-                                    Schema s = expOpFs.schema;
-                                    log.debug("Schema s: " + s);
-                                    if(null != s) {
-                                        if(colNum < s.size()) {
-                                            Schema.FieldSchema parentFs = s.getField(colNum);
-                                            fs = Schema.FieldSchema.copyAndLink(parentFs, expressionOperator );
-                                            fss.add(fs);
-                                        } else {
-                                            fs = new Schema.FieldSchema(null, DataType.BYTEARRAY);
-                                            fss.add(fs);
-                                            fs.setParent(expOpFs.canonicalName, expressionOperator);
-                                        }
-                                    } else {
-                                        fs = new Schema.FieldSchema(null, DataType.BYTEARRAY);
-                                        fss.add(fs);
-                                        fs.setParent(expOpFs.canonicalName, expressionOperator);
-                                    }
-                                } else {
-                                    fs = new Schema.FieldSchema(null, DataType.BYTEARRAY);
-                                    fss.add(new Schema.FieldSchema(null, DataType.BYTEARRAY));
-                                    fs.setParent(null, expressionOperator);
-                                }
-                            } else {
-                                Schema s = expressionOperator.getSchema();
-                                if(null != s) {
-                                    Schema.FieldSchema parentFs = s.getField(colNum);
-                                    fs = Schema.FieldSchema.copyAndLink(parentFs, expressionOperator);
-                                    fss.add(fs);
-                                } else {
-                                    fs = new Schema.FieldSchema(null, DataType.BYTEARRAY);
-                                    fss.add(fs);
-                                    fs.setParent(null, expressionOperator);
-                                }
-                            }
-                        }
-    
-                //} catch(ParseException pe) {
-                //    mFieldSchema = null;
-                //    mIsFieldSchemaComputed = false;
-                //    throw new FrontendException(pe.getMessage());
-                //}
-                mFieldSchema = new Schema.FieldSchema(expressionOperator.getAlias(), new Schema(fss));
-                Schema.FieldSchema expOpFs = ((ExpressionOperator)expressionOperator).getFieldSchema();
-                mFieldSchema.setParent( expOpFs.canonicalName, expressionOperator );
-                mIsFieldSchemaComputed = true;
-            }
-
-        }
-
-        if(null != mFieldSchema) {
-            mType = mFieldSchema.type;
-        }
-        
-        List<LogicalOperator> succList = mPlan.getSuccessors(this) ;
-        List<LogicalOperator> predList = mPlan.getPredecessors(this) ;
-        if((null != succList) && !(succList.get(0) instanceof ExpressionOperator)) {
-            if(!DataType.isSchemaType(mType)) {
-                Schema pjSchema = new Schema(mFieldSchema);
-                mFieldSchema = new Schema.FieldSchema(getAlias(), pjSchema, DataType.TUPLE);
-                mFieldSchema.setParent(null, expressionOperator);
-            } else {
-                if(null != mFieldSchema) {
-                    mFieldSchema.type = DataType.TUPLE;
-                }
-            }
-            setOverloaded(true);
-            setType(DataType.TUPLE);
-        } else if(null != predList) {
-            LogicalOperator projectInput = getExpression();
-            if(((projectInput instanceof LOProject) || !(predList.get(0) instanceof ExpressionOperator)) && (projectInput.getType() == DataType.BAG)) {
-                if(!DataType.isSchemaType(mType)) {
-                    Schema pjSchema = new Schema(mFieldSchema);
-                    mFieldSchema = new Schema.FieldSchema(getAlias(), pjSchema, DataType.BAG);
-                    mFieldSchema.setParent( ((LOProject)expressionOperator).mFieldSchema.canonicalName, expressionOperator );
-                } else {
-                    if(null != mFieldSchema) {
-                        mFieldSchema.type = DataType.BAG;
-                    }
-                }
-                setType(DataType.BAG);
-            }
-        }
-        
-        log.debug("Exiting getFieldSchema");
-        return mFieldSchema;
-    }
-
-    public boolean isSingleProjection()  {
-        return mProjection.size() == 1 ;
-    }
-
-    @Override
-    public void visit(LOVisitor v) throws VisitorException {
-        v.visit(this);
-    }
-
-    @Override
-    public Schema getSchema() throws FrontendException{
-        // Called to make sure we've constructed the field schema before trying
-        // to read it.
-        getFieldSchema();
-        if (mFieldSchema != null){
-            return mFieldSchema.schema ;
-        }
-        else {
-            return null ;
-        }
-    }
-
-    /* For debugging only */
-    public String toDetailString() {
-        StringBuilder sb = new StringBuilder() ;
-        sb.append("LOProject") ;
-        sb.append(" Id=" + this.mKey.id) ;
-        sb.append(" Projection=") ;
-        boolean isFirst = true ;
-        for(int i=0;i< mProjection.size();i++) {
-            if (isFirst) {
-                isFirst = false ;
-            }
-            else {
-                sb.append(",") ;
-            }
-            sb.append(mProjection.get(i)) ;
-        }
-        sb.append(" isStart=") ;
-        sb.append(mIsStar) ;
-        sb.append(" isSentinel=") ;
-        sb.append(mSentinel) ;
-        return sb.toString() ;
-    }
-
-    /**
-     * @see org.apache.pig.impl.logicalLayer.ExpressionOperator#clone()
-     * Do not use the clone method directly. Operators are cloned when logical plans
-     * are cloned using {@link LogicalPlanCloner}
-     */
-    @Override
-    protected Object clone() throws CloneNotSupportedException {
-        LOProject clone = (LOProject)super.clone();
-        
-        // deep copy project specific attributes
-        clone.mProjection = new ArrayList<Integer>();
-        for (Iterator<Integer> it = mProjection.iterator(); it.hasNext();) {
-            clone.mProjection.add(Integer.valueOf(it.next()));
-        }
-        
-        return clone;
-    }
-
-    /**
-     * @param sendEmptyBagOnEOP the sendEmptyBagOnEOP to set
-     */
-    public void setSendEmptyBagOnEOP(boolean sendEmptyBagOnEOP) {
-        this.sendEmptyBagOnEOP = sendEmptyBagOnEOP;
-    }
-
-    /**
-     * @return the sendEmptyBagOnEOP
-     */
-    public boolean isSendEmptyBagOnEOP() {
-        return sendEmptyBagOnEOP;
-    }
-
-}

Modified: pig/branches/branch-0.9/src/org/apache/pig/impl/logicalLayer/LORegexp.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.9/src/org/apache/pig/impl/logicalLayer/LORegexp.java?rev=1100420&r1=1100419&r2=1100420&view=diff
==============================================================================
--- pig/branches/branch-0.9/src/org/apache/pig/impl/logicalLayer/LORegexp.java (original)
+++ pig/branches/branch-0.9/src/org/apache/pig/impl/logicalLayer/LORegexp.java Sat May  7 00:15:40 2011
@@ -1,103 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.pig.impl.logicalLayer;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.pig.data.DataType;
-import org.apache.pig.impl.plan.VisitorException;
-import org.apache.pig.impl.logicalLayer.schema.Schema;
-import org.apache.pig.impl.plan.OperatorKey;
-import org.apache.pig.impl.plan.PlanVisitor;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-
-public class LORegexp extends BinaryExpressionOperator {
-    private static final long serialVersionUID = 2L;
-
-    /**
-     * The expression and the column to be projected.
-     */
-    private static Log log = LogFactory.getLog(LORegexp.class);
-
-    /**
-     * 
-     * @param plan
-     *            Logical plan this operator is a part of.
-     * @param key
-     *            Operator key to assign to this node.
-     */
-    public LORegexp(LogicalPlan plan, OperatorKey key) {
-        super(plan, key);
-    }
-
-    public ExpressionOperator getOperand() {
-        return getLhsOperand();
-    }
-
-    public String getRegexp() {
-        ExpressionOperator op = getRhsOperand();
-        if (!(op instanceof LOConst)) {
-            throw new RuntimeException(
-                "Regular expression patterns must be a constant.");
-        }
-        Object o = ((LOConst)op).getValue();
-        // better be a string
-        if (!(o instanceof String)) {
-            throw new RuntimeException(
-                "Regular expression patterns must be a string.");
-        }
-
-        return (String)o;
-    }
-    
-    @Override
-    public String name() {
-        return "Regexp " + mKey.scope + "-" + mKey.id;
-    }
-
-    @Override
-    public boolean supportsMultipleInputs() {
-        return true;
-    }
-
-    @Override
-    public Schema getSchema() {
-        return mSchema;
-    }
-
-    @Override
-    public Schema.FieldSchema getFieldSchema() throws FrontendException {
-        if(!mIsFieldSchemaComputed) {
-            mFieldSchema = new Schema.FieldSchema(null, DataType.BOOLEAN);
-            mFieldSchema.setParent(getLhsOperand().getFieldSchema().canonicalName, getLhsOperand());
-            mFieldSchema.setParent(getRhsOperand().getFieldSchema().canonicalName, getRhsOperand());
-            mIsFieldSchemaComputed = true;
-        }
-        return mFieldSchema;
-    }
-
-    @Override
-    public void visit(LOVisitor v) throws VisitorException {
-        v.visit(this);
-    }
-
-
-}

Modified: pig/branches/branch-0.9/src/org/apache/pig/impl/logicalLayer/LOSort.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.9/src/org/apache/pig/impl/logicalLayer/LOSort.java?rev=1100420&r1=1100419&r2=1100420&view=diff
==============================================================================
--- pig/branches/branch-0.9/src/org/apache/pig/impl/logicalLayer/LOSort.java (original)
+++ pig/branches/branch-0.9/src/org/apache/pig/impl/logicalLayer/LOSort.java Sat May  7 00:15:40 2011
@@ -1,405 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.pig.impl.logicalLayer;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Set;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.pig.FuncSpec;
-import org.apache.pig.PigException;
-import org.apache.pig.SortColInfo;
-import org.apache.pig.SortInfo;
-import org.apache.pig.data.DataType;
-import org.apache.pig.impl.logicalLayer.schema.Schema;
-import org.apache.pig.impl.plan.Operator;
-import org.apache.pig.impl.plan.OperatorKey;
-import org.apache.pig.impl.plan.PlanException;
-import org.apache.pig.impl.plan.ProjectionMap;
-import org.apache.pig.impl.plan.RequiredFields;
-import org.apache.pig.impl.plan.VisitorException;
-import org.apache.pig.impl.util.Pair;
-
-public class LOSort extends RelationalOperator {
-    private static final long serialVersionUID = 2L;
-
-    private List<Boolean> mAscCols;
-    private FuncSpec mSortFunc;
-    private boolean mIsStar = false;
-    private long limit;
-    private List<LogicalPlan> mSortColPlans;
-    private static Log log = LogFactory.getLog(LOSort.class);
-
-    /**
-     * @param plan
-     *            LogicalPlan this operator is a part of.
-     * @param key
-     *            OperatorKey for this operator
-     * @param sortColPlans
-     *            Array of column numbers that will be used for sorting data.
-     * @param ascCols
-     *            Array of booleans. Should be same size as sortCols. True
-     *            indicates sort ascending (default), false sort descending. If
-     *            this array is null, then all columns will be sorted ascending.
-     * @param sortFunc
-     *            the user defined sorting function
-     */
-    public LOSort(
-            LogicalPlan plan,
-            OperatorKey key,
-            List<LogicalPlan> sortColPlans,
-            List<Boolean> ascCols,
-            FuncSpec sortFunc) {
-        super(plan, key);
-        mSortColPlans = sortColPlans;
-        mAscCols = ascCols;
-        mSortFunc = sortFunc;
-        limit = -1;
-    }
-
-    public LogicalOperator getInput() {
-        return mPlan.getPredecessors(this).get(0);
-    }
-    
-    public List<LogicalPlan> getSortColPlans() {
-        return mSortColPlans;
-    }
-
-    public void setSortColPlans(List<LogicalPlan> sortPlans) {
-        mSortColPlans = sortPlans;
-    }
-
-    public List<Boolean> getAscendingCols() {
-        return mAscCols;
-    }
-
-    public void setAscendingCols(List<Boolean> ascCols) {
-        mAscCols = ascCols;
-    }
-
-    public FuncSpec getUserFunc() {
-        return mSortFunc;
-    }
-
-    public void setUserFunc(FuncSpec func) {
-        mSortFunc = func;
-    }
-
-    public boolean isStar() {
-        return mIsStar;
-    }
-
-    public void setStar(boolean b) {
-        mIsStar = b;
-    }
-
-    public void setLimit(long l)
-    {
-    	limit = l;
-    }
-    
-    public long getLimit()
-    {
-    	return limit;
-    }
-    
-    public boolean isLimited()
-    {
-    	return (limit!=-1);
-    }
-
-    @Override
-    public String name() {
-        return getAliasString() + "SORT " + mKey.scope + "-" + mKey.id;
-    }
-
-    @Override
-    public Schema getSchema() throws FrontendException {
-        if (!mIsSchemaComputed) {
-            // get our parent's schema
-            Collection<LogicalOperator> s = mPlan.getPredecessors(this);
-            ArrayList<Schema.FieldSchema> fss = new ArrayList<Schema.FieldSchema>();
-            try {
-                LogicalOperator op = s.iterator().next();
-                if (null == op) {
-                    int errCode = 1006;
-                    String msg = "Could not find operator in plan";                    
-                    throw new FrontendException(msg, errCode, PigException.INPUT, false, null);
-                }
-                if(op instanceof ExpressionOperator) {
-                    Schema.FieldSchema fs = Schema.FieldSchema.copyAndLink(((ExpressionOperator)op).getFieldSchema(), op);
-                    if(DataType.isSchemaType(fs.type)) {
-                        mSchema = fs.schema;
-                    } else {
-                        fss.add(fs);
-                        mSchema = new Schema(fss);
-                    }
-                } else {
-                    if (getInput().getSchema()!=null) {
-                        mSchema = Schema.copyAndLink( op.getSchema(), op );
-                    }
-                    else
-                        mSchema = null;
-                }
-                mIsSchemaComputed = true;
-            } catch (FrontendException ioe) {
-                mSchema = null;
-                mIsSchemaComputed = false;
-                throw ioe;
-            }
-        }
-        return mSchema;
-    }
-
-    @Override
-    public boolean supportsMultipleInputs() {
-        return false;
-    }
-
-    public void visit(LOVisitor v) throws VisitorException {
-        v.visit(this);
-    }
-
-    public byte getType() {
-        return DataType.BAG ;
-    }
-
-    /**
-     * @see org.apache.pig.impl.logicalLayer.LogicalOperator#clone()
-     * Do not use the clone method directly. Operators are cloned when logical plans
-     * are cloned using {@link LogicalPlanCloner}
-     */
-    @Override
-    protected Object clone() throws CloneNotSupportedException {
-        LOSort clone = (LOSort) super.clone();
-        
-        // deep copy sort related attributes
-        if(mAscCols != null) {
-            clone.mAscCols = new ArrayList<Boolean>();
-            for (Iterator<Boolean> it = mAscCols.iterator(); it.hasNext();) {
-                clone.mAscCols.add(Boolean.valueOf(it.next()));
-            }
-        }
-        
-        if(mSortFunc != null)
-            clone.mSortFunc = mSortFunc.clone();
-        
-        if(mSortColPlans != null) {
-            clone.mSortColPlans = new ArrayList<LogicalPlan>();
-            for (Iterator<LogicalPlan> it = mSortColPlans.iterator(); it.hasNext();) {
-                LogicalPlanCloneHelper lpCloneHelper = new LogicalPlanCloneHelper(it.next());
-                clone.mSortColPlans.add(lpCloneHelper.getClonedPlan());            
-            }
-        }
-        return clone;
-    }
-    
-    @Override
-    public ProjectionMap getProjectionMap() {
-        if(mIsProjectionMapComputed) return mProjectionMap;
-        mIsProjectionMapComputed = true;
-
-        Schema outputSchema;
-        try {
-            outputSchema = getSchema();
-        } catch (FrontendException fee) {
-            mProjectionMap = null;
-            return mProjectionMap;
-        }
-
-        Schema inputSchema = null;        
-        
-        List<LogicalOperator> predecessors = (ArrayList<LogicalOperator>)mPlan.getPredecessors(this);
-        if(predecessors != null) {
-            try {
-                inputSchema = predecessors.get(0).getSchema();
-            } catch (FrontendException fee) {
-                mProjectionMap = null;
-                return mProjectionMap;
-            }
-        } else {
-            mProjectionMap = null;
-            return mProjectionMap;
-        }
-
-        if(Schema.equals(inputSchema, outputSchema, false, true)) {
-            //there is a one is to one mapping between input and output schemas
-            mProjectionMap = new ProjectionMap(false);
-            return mProjectionMap;
-        } else {
-            //problem - input and output schemas for a sort have to match!
-            mProjectionMap = null;
-            return mProjectionMap;
-        }
-    }
-    
-    @Override
-    public List<RequiredFields> getRequiredFields() {
-        List<RequiredFields> requiredFields = new ArrayList<RequiredFields>();
-        Set<Pair<Integer, Integer>> fields = new HashSet<Pair<Integer, Integer>>();
-        Set<LOProject> projectSet = new HashSet<LOProject>();
-        boolean orderByStar = false;
-
-        for (LogicalPlan plan : getSortColPlans()) {
-            TopLevelProjectFinder projectFinder = new TopLevelProjectFinder(
-                    plan);
-            try {
-                projectFinder.visit();
-            } catch (VisitorException ve) {
-                requiredFields.clear();
-                requiredFields.add(null);
-                return requiredFields;
-            }
-            projectSet.addAll(projectFinder.getProjectSet());
-            if(projectFinder.getProjectStarSet() != null) {
-                orderByStar = true;
-            }
-        }
-
-        if(orderByStar) {
-            requiredFields.add(new RequiredFields(true));
-            return requiredFields;
-        } else {
-            for (LOProject project : projectSet) {
-                for (int inputColumn : project.getProjection()) {
-                    fields.add(new Pair<Integer, Integer>(0, inputColumn));
-                }
-            }
-    
-            if(fields.size() == 0) {
-                requiredFields.add(new RequiredFields(false, true));
-            } else {                
-                requiredFields.add(new RequiredFields(new ArrayList<Pair<Integer, Integer>>(fields)));
-            }
-            return (requiredFields.size() == 0? null: requiredFields);
-        }
-    }
-
-    /* (non-Javadoc)
-     * @see org.apache.pig.impl.plan.Operator#rewire(org.apache.pig.impl.plan.Operator, org.apache.pig.impl.plan.Operator)
-     */
-    @Override
-    public void rewire(Operator<LOVisitor> oldPred, int oldPredIndex, Operator<LOVisitor> newPred, boolean useOldPred) throws PlanException {
-        super.rewire(oldPred, oldPredIndex, newPred, useOldPred);
-        LogicalOperator previous = (LogicalOperator) oldPred;
-        LogicalOperator current = (LogicalOperator) newPred;
-        for(LogicalPlan plan: mSortColPlans) {
-            try {
-                ProjectFixerUpper projectFixer = new ProjectFixerUpper(
-                        plan, previous, oldPredIndex, current, useOldPred, this);
-                projectFixer.visit();
-            } catch (VisitorException ve) {
-                int errCode = 2144;
-                String msg = "Problem while fixing project inputs during rewiring.";
-                throw new PlanException(msg, errCode, PigException.BUG, ve);
-            }
-        }
-    }
-    
-    public SortInfo getSortInfo() throws FrontendException {
-        Schema schema = this.getSchema();
-        List<SortColInfo> sortColInfoList = new ArrayList<SortColInfo>();
-        for (int i = 0; i < mSortColPlans.size(); i++) {
-            LogicalPlan lp = mSortColPlans.get(i);
-            Iterator<LogicalOperator> opsIterator = lp.iterator();
-            List<LogicalOperator> opsList = new ArrayList<LogicalOperator>();
-            while(opsIterator.hasNext()) {
-                opsList.add(opsIterator.next());
-            }
-            if(opsList.size() != 1 || !(opsList.get(0) instanceof LOProject)) {
-                int errCode = 2066;
-                String msg = "Unsupported operator in inner plan: " + opsList.get(0);
-                throw new PlanException(msg, errCode, PigException.BUG);
-            }
-            LOProject project = (LOProject) opsList.get(0);
-            int sortColIndex = project.getCol();
-            String sortColName = (schema == null) ? null :
-                schema.getField(sortColIndex).alias;
-            sortColInfoList.add(new SortColInfo(sortColName, sortColIndex, 
-                    mAscCols.get(i)? SortColInfo.Order.ASCENDING :
-                        SortColInfo.Order.DESCENDING));
-        }
-        return new SortInfo(sortColInfoList);
-    }
-    
-    @Override
-    public List<RequiredFields> getRelevantInputs(int output, int column) throws FrontendException {
-        if (!mIsSchemaComputed)
-            getSchema();
-        
-        if (output!=0)
-            return null;
-        
-        if (column<0)
-            return null;
-        
-        // if we have schema information, check if output column is valid
-        if (mSchema!=null)
-        {
-            if (column >= mSchema.size())
-                return null;
-        }
-        
-        ArrayList<Pair<Integer, Integer>> inputList = new ArrayList<Pair<Integer, Integer>>();
-        inputList.add(new Pair<Integer, Integer>(0, column));
-        List<RequiredFields> result = new ArrayList<RequiredFields>();
-        result.add(new RequiredFields(inputList));
-        return result;
-    }
-    
-    @Override
-    public boolean pruneColumns(List<Pair<Integer, Integer>> columns)
-            throws FrontendException {
-        if (!mIsSchemaComputed)
-            getSchema();
-        if (mSchema == null) {
-            log
-                    .warn("Cannot prune columns in sort, no schema information found");
-            return false;
-        }
-
-        List<LogicalOperator> predecessors = mPlan.getPredecessors(this);
-
-        if (predecessors == null)
-            return false;
-
-        for (int i=columns.size()-1;i>=0;i--) {
-            Pair<Integer, Integer> column = columns.get(i);
-            if (column.first != 0) {
-                int errCode = 2191;
-                throw new FrontendException(
-                        "Sort only take 1 input, cannot prune input with index "
-                                + column.first, errCode, PigException.BUG);
-            }
-            if (column.second < 0) {
-                int errCode = 2192;
-                throw new FrontendException("Column to prune does not exist", errCode, PigException.BUG);
-            }
-            for (LogicalPlan plan : mSortColPlans) {
-                pruneColumnInPlan(plan, column.second);
-            }
-        }
-        super.pruneColumns(columns);;
-        return true;
-    }
-}

Modified: pig/branches/branch-0.9/src/org/apache/pig/impl/logicalLayer/LOSplit.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.9/src/org/apache/pig/impl/logicalLayer/LOSplit.java?rev=1100420&r1=1100419&r2=1100420&view=diff
==============================================================================
--- pig/branches/branch-0.9/src/org/apache/pig/impl/logicalLayer/LOSplit.java (original)
+++ pig/branches/branch-0.9/src/org/apache/pig/impl/logicalLayer/LOSplit.java Sat May  7 00:15:40 2011
@@ -1,222 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.pig.impl.logicalLayer;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-import org.apache.pig.PigException;
-import org.apache.pig.impl.logicalLayer.schema.Schema;
-import org.apache.pig.impl.plan.Operator;
-import org.apache.pig.impl.plan.OperatorKey;
-import org.apache.pig.impl.plan.PlanException;
-import org.apache.pig.impl.plan.PlanVisitor;
-import org.apache.pig.impl.plan.ProjectionMap;
-import org.apache.pig.impl.plan.RequiredFields;
-import org.apache.pig.impl.plan.VisitorException;
-import org.apache.pig.impl.util.Pair;
-import org.apache.pig.data.DataType;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-
-public class LOSplit extends RelationalOperator {
-    private static final long serialVersionUID = 2L;
-
-    private ArrayList<LogicalOperator> mOutputs;
-    private static Log log = LogFactory.getLog(LOSplit.class);
-
-    /**
-     * @param plan
-     *            LogicalPlan this operator is a part of.
-     * @param key
-     *            OperatorKey for this operator
-     * @param outputs
-     *            list of aliases that are the output of the split
-     */
-    public LOSplit(LogicalPlan plan, OperatorKey key,
-            ArrayList<LogicalOperator> outputs) {
-        super(plan, key);
-        mOutputs = outputs;
-    }
-
-    public List<LogicalOperator> getOutputs() {
-        return mOutputs;
-    }
-
-    public void setOutputs(ArrayList<LogicalOperator> outputs) {
-        mOutputs = outputs;
-    }
-
-    public void addOutput(LogicalOperator lOp) {
-        mOutputs.add(lOp);
-    }
-
-    @Override
-    public String name() {
-        return getAliasString() + "Split " + mKey.scope + "-" + mKey.id;
-    }
-
-    @Override
-    public Schema getSchema() throws FrontendException {
-        if (!mIsSchemaComputed) {
-            // get our parent's schema
-            Collection<LogicalOperator> s = mPlan.getPredecessors(this);
-            try {
-                LogicalOperator op = s.iterator().next();
-                if (null == op) {
-                    int errCode = 1006;
-                    String msg = "Could not find operator in plan";
-                    throw new FrontendException(msg, errCode, PigException.INPUT, false, null);
-                }
-                LogicalOperator input = s.iterator().next();
-                if (input.getSchema()!=null) {
-                    mSchema = Schema.copyAndLink(input.getSchema(), input);
-                }
-                else
-                    mSchema = null;
-                mIsSchemaComputed = true;
-            } catch (FrontendException ioe) {
-                mSchema = null;
-                mIsSchemaComputed = false;
-                throw ioe;
-            }
-        }
-        return mSchema;
-    }
-
-    @Override
-    public boolean supportsMultipleInputs() {
-        return false;
-    }
-
-    @Override
-    public boolean supportsMultipleOutputs() {
-        return true;
-    }
-
-    public void visit(LOVisitor v) throws VisitorException {
-        v.visit(this);
-    }
-
-    @Override
-    public byte getType() {
-        return DataType.BAG;
-    }
-    
-    /**
-     * @see org.apache.pig.impl.logicalLayer.LogicalOperator#clone()
-     * Do not use the clone method directly. Operators are cloned when logical plans
-     * are cloned using {@link LogicalPlanCloner}
-     */
-    @Override
-    protected Object clone() throws CloneNotSupportedException {
-        LOSplit splitClone = (LOSplit)super.clone();
-        return splitClone;
-    }
-    
-    @Override
-    public ProjectionMap getProjectionMap() {
-        
-        if(mIsProjectionMapComputed) return mProjectionMap;
-        mIsProjectionMapComputed = true;
-        
-        Schema outputSchema;
-        try {
-            outputSchema = getSchema();
-        } catch (FrontendException fee) {
-            mProjectionMap = null;
-            return mProjectionMap;
-        }
-        
-        Schema inputSchema = null;        
-        
-        List<LogicalOperator> predecessors = (ArrayList<LogicalOperator>)mPlan.getPredecessors(this);
-        if(predecessors != null) {
-            try {
-                inputSchema = predecessors.get(0).getSchema();
-            } catch (FrontendException fee) {
-                mProjectionMap = null;
-                return mProjectionMap;
-            }
-        } else {
-            mProjectionMap = null;
-            return mProjectionMap;
-        }
-        
-        if(Schema.equals(inputSchema, outputSchema, false, true)) {
-            //there is a one is to one mapping between input and output schemas
-            mProjectionMap = new ProjectionMap(false);
-            return mProjectionMap;
-        } else {
-            //problem - input and output schemas for a distinct have to match!
-            mProjectionMap = null;
-            return mProjectionMap;
-        }
-    }
-    
-    @Override
-    public List<RequiredFields> getRequiredFields() {
-        List<RequiredFields> requiredFields = new ArrayList<RequiredFields>();
-        requiredFields.add(new RequiredFields(false, true));
-        return requiredFields;
-    }
-    
-    /* (non-Javadoc)
-    * @see org.apache.pig.impl.plan.Operator#rewire(org.apache.pig.impl.plan.Operator, org.apache.pig.impl.plan.Operator)
-    */
-   @Override
-   public void rewire(Operator<LOVisitor> oldPred, int oldPredIndex, Operator<LOVisitor> newPred, boolean useOldPred) throws PlanException {
-       for(LogicalOperator output: mPlan.getSuccessors(this)) {
-           output.rewire(oldPred, oldPredIndex, newPred, useOldPred);
-       }
-   }
-   
-   @Override
-   public List<RequiredFields> getRelevantInputs(int output, int column) throws FrontendException {
-       if (!mIsSchemaComputed)
-           getSchema();
-       
-       if (output<0)
-           return null;
-       
-       List<LogicalOperator> successors = mPlan.getSuccessors(this);
-       
-       if (output>=successors.size())
-           return null;
-       
-       if (column<0)
-           return null;
-       
-       // if we have schema information, check if output column is valid
-       if (mSchema!=null)
-       {
-           if (column >= mSchema.size())
-               return null;
-       }
-       
-       ArrayList<Pair<Integer, Integer>> inputList = new ArrayList<Pair<Integer, Integer>>();
-       inputList.add(new Pair<Integer, Integer>(0, column));
-       List<RequiredFields> result = new ArrayList<RequiredFields>();
-       result.add(new RequiredFields(inputList));
-       return result;
-   }
-
-}

Modified: pig/branches/branch-0.9/src/org/apache/pig/impl/logicalLayer/LOSplitOutput.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.9/src/org/apache/pig/impl/logicalLayer/LOSplitOutput.java?rev=1100420&r1=1100419&r2=1100420&view=diff
==============================================================================
--- pig/branches/branch-0.9/src/org/apache/pig/impl/logicalLayer/LOSplitOutput.java (original)
+++ pig/branches/branch-0.9/src/org/apache/pig/impl/logicalLayer/LOSplitOutput.java Sat May  7 00:15:40 2011
@@ -1,302 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.pig.impl.logicalLayer;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.pig.PigException;
-import org.apache.pig.impl.logicalLayer.LogicalOperator;
-import org.apache.pig.impl.logicalLayer.schema.Schema;
-import org.apache.pig.impl.plan.Operator;
-import org.apache.pig.impl.plan.OperatorKey;
-import org.apache.pig.impl.plan.PlanException;
-import org.apache.pig.impl.plan.ProjectionMap;
-import org.apache.pig.impl.plan.RequiredFields;
-import org.apache.pig.impl.plan.VisitorException;
-import org.apache.pig.impl.util.Pair;
-import org.apache.pig.data.DataType;
-import org.apache.pig.impl.logicalLayer.optimizer.SchemaRemover;
-
-
-public class LOSplitOutput extends RelationalOperator {
-    private static final long serialVersionUID = 2L;
-
-    protected int mIndex;
-    private LogicalPlan mCondPlan;
-    private static Log log = LogFactory.getLog(LOSplitOutput.class);
-    
-    /**
-     * @param plan
-     *            LogicalPlan this operator is a part of.
-     * @param key
-     *            OperatorKey for this operator
-     * @param index
-     *            index of this output in the split
-     * @param condPlan
-     *            logical plan containing the condition for this split output
-     */
-    public LOSplitOutput(
-            LogicalPlan plan,
-            OperatorKey key,
-            int index,
-            LogicalPlan condPlan) {
-        super(plan, key);
-        this.mIndex = index;
-        this.mCondPlan = condPlan;
-    }
-
-    public LogicalPlan getConditionPlan() {
-        return mCondPlan;
-    }
-    
-    @Override
-    public String name() {
-        return getAliasString() + "SplitOutput[" + getAlias() + "] " + mKey.scope + "-" + mKey.id;
-    }
-
-    @Override
-    public Schema getSchema() throws FrontendException{
-        if (!mIsSchemaComputed) {
-            // get our parent's schema
-            try {
-                LogicalOperator input = mPlan.getPredecessors(this).get(0);
-                if (null == input) {
-                    int errCode = 1006;
-                    String msg = "Could not find operator in plan";
-                    throw new FrontendException(msg, errCode, PigException.INPUT, false, null);
-                }
-                if (input.getSchema()!=null) {
-                    mSchema = Schema.copyAndLink(input.getSchema(), input);
-                }
-                else
-                    mSchema = null;
-                mIsSchemaComputed = true;
-            } catch (FrontendException fe) {
-                mSchema = null;
-                mIsSchemaComputed = false;
-                throw fe;
-            }
-        }
-        return mSchema;
-    }
-
-    public void visit(LOVisitor v) throws VisitorException{
-        v.visit(this);
-    }
-
-    @Override
-    public boolean supportsMultipleInputs() {
-        return false;
-    }
-
-    public int getReadFrom() {
-        return mIndex;
-    }
-
-    public byte getType() {
-        return DataType.BAG ;
-    }
-
-    public void unsetSchema() throws VisitorException{
-        SchemaRemover sr = new SchemaRemover(mCondPlan);
-        sr.visit();
-        super.unsetSchema();
-    }
-
-    /**
-     * @see org.apache.pig.impl.plan.Operator#clone()
-     * Do not use the clone method directly. Operators are cloned when logical plans
-     * are cloned using {@link LogicalPlanCloner}
-     */
-    @Override
-    protected Object clone() throws CloneNotSupportedException {
-        LOSplitOutput splitOutputClone = (LOSplitOutput)super.clone();
-        LogicalPlanCloneHelper lpCloner = new LogicalPlanCloneHelper(mCondPlan);
-        splitOutputClone.mCondPlan = lpCloner.getClonedPlan();
-        return splitOutputClone;
-    }
-
-    @Override
-    public ProjectionMap getProjectionMap() {
-        
-        if(mIsProjectionMapComputed) return mProjectionMap;
-        mIsProjectionMapComputed = true;
-        
-        Schema outputSchema;
-        try {
-            outputSchema = getSchema();
-        } catch (FrontendException fee) {
-            mProjectionMap = null;
-            return mProjectionMap;
-        }
-/*        
-        if(outputSchema == null) {
-            mProjectionMap = null;
-            return mProjectionMap;
-        }
-*/        
-        Schema inputSchema = null;        
-        
-        List<LogicalOperator> predecessors = (ArrayList<LogicalOperator>)mPlan.getPredecessors(this);
-        if(predecessors != null) {
-            try {
-                inputSchema = predecessors.get(0).getSchema();
-            } catch (FrontendException fee) {
-                mProjectionMap = null;
-                return mProjectionMap;
-            }
-        } else {
-            mProjectionMap = null;
-            return mProjectionMap;
-        }
-/*        
-        if(inputSchema == null) {
-            mProjectionMap = null;
-            return mProjectionMap;
-        }
-*/        
-        if(Schema.equals(inputSchema, outputSchema, false, true)) {
-            //there is a one is to one mapping between input and output schemas
-            mProjectionMap = new ProjectionMap(false);
-            return mProjectionMap;
-        } else {
-            //problem - input and output schemas for a split output have to match!
-            mProjectionMap = null;
-            return mProjectionMap;
-        }
-    }
-
-    @Override
-    public List<RequiredFields> getRequiredFields() {
-        List<RequiredFields> requiredFields = new ArrayList<RequiredFields>();
-        Set<Pair<Integer, Integer>> fields = new HashSet<Pair<Integer, Integer>>();
-        TopLevelProjectFinder projectFinder = new TopLevelProjectFinder(
-                mCondPlan);
-        try {
-            projectFinder.visit();
-        } catch (VisitorException ve) {
-            requiredFields.clear();
-            requiredFields.add(null);
-            return requiredFields;
-        }
-        Set<LOProject> projectStarSet = projectFinder.getProjectStarSet();
-
-        if (projectStarSet != null) {
-            requiredFields.add(new RequiredFields(true));
-            return requiredFields;
-        } else {
-            for (LOProject project : projectFinder.getProjectSet()) {
-                for (int inputColumn : project.getProjection()) {
-                    fields.add(new Pair<Integer, Integer>(0,
-                            inputColumn));
-                }
-            }
-            if(fields.size() == 0) {
-                requiredFields.add(new RequiredFields(false, true));
-            } else {                
-                requiredFields.add(new RequiredFields(new ArrayList<Pair<Integer, Integer>>(fields)));
-            }
-            return (requiredFields.size() == 0? null: requiredFields);
-        }
-    }
-
-    /* (non-Javadoc)
-     * @see org.apache.pig.impl.plan.Operator#rewire(org.apache.pig.impl.plan.Operator, org.apache.pig.impl.plan.Operator)
-     */
-    @Override
-    public void rewire(Operator<LOVisitor> oldPred, int oldPredIndex, Operator<LOVisitor> newPred, boolean useOldPred) throws PlanException {
-        super.rewire(oldPred, oldPredIndex, newPred, useOldPred);
-        LogicalOperator previous = (LogicalOperator) oldPred;
-        LogicalOperator current = (LogicalOperator) newPred;
-        try {
-            ProjectFixerUpper projectFixer = new ProjectFixerUpper(
-                    mCondPlan, previous, oldPredIndex, current, useOldPred, this);
-            projectFixer.visit();
-        } catch (VisitorException ve) {
-            int errCode = 2144;
-            String msg = "Problem while fixing project inputs during rewiring.";
-            throw new PlanException(msg, errCode, PigException.BUG, ve);
-        }
-        
-        //ideally we should be fixing mIndex too but split and split output should always 
-        //be treated as one operator. Any operations on split will imply an operation on
-        //split output
-    }
-    @Override
-    public List<RequiredFields> getRelevantInputs(int output, int column) throws FrontendException {
-        if (!mIsSchemaComputed)
-            getSchema();
-        
-        if (output!=0)
-            return null;
-
-        if (column<0)
-            return null;
-        
-        // if we have schema information, check if output column is valid
-        if (mSchema!=null)
-        {
-            if (column >= mSchema.size())
-                return null;
-        }
-        
-        ArrayList<Pair<Integer, Integer>> inputList = new ArrayList<Pair<Integer, Integer>>();
-        inputList.add(new Pair<Integer, Integer>(0, column));
-        List<RequiredFields> result = new ArrayList<RequiredFields>();
-        result.add(new RequiredFields(inputList));
-        return result;
-    }
-    
-    @Override
-    public boolean pruneColumns(List<Pair<Integer, Integer>> columns)
-            throws FrontendException {
-        if (!mIsSchemaComputed)
-            getSchema();
-        if (mSchema == null) {
-            log.warn("Cannot prune columns in splitoutput, no schema information found");
-            return false;
-        }
-
-        for (int i=columns.size()-1;i>=0;i--) {
-            Pair<Integer, Integer> column = columns.get(i);
-            if (column.first != 0) {
-                int errCode = 2191;
-                throw new FrontendException(
-                        "Splitoutput only take 1 input, cannot prune input with index "
-                                + column.first, errCode, PigException.BUG);
-            }
-            if (column.second < 0) {
-                int errCode = 2192;
-                throw new FrontendException("Column to prune does not exist", errCode, PigException.BUG);
-            }
-
-            pruneColumnInPlan(mCondPlan, column.second);
-        }
-        super.pruneColumns(columns);
-        return true;
-    }
-}

Modified: pig/branches/branch-0.9/src/org/apache/pig/impl/logicalLayer/LOStore.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.9/src/org/apache/pig/impl/logicalLayer/LOStore.java?rev=1100420&r1=1100419&r2=1100420&view=diff
==============================================================================
--- pig/branches/branch-0.9/src/org/apache/pig/impl/logicalLayer/LOStore.java (original)
+++ pig/branches/branch-0.9/src/org/apache/pig/impl/logicalLayer/LOStore.java Sat May  7 00:15:40 2011
@@ -1,239 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.pig.impl.logicalLayer;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.pig.FuncSpec;
-import org.apache.pig.SortInfo;
-import org.apache.pig.StoreFuncInterface;
-import org.apache.pig.impl.PigContext;
-import org.apache.pig.impl.io.FileSpec;
-import org.apache.pig.impl.plan.OperatorKey;
-import org.apache.pig.impl.plan.ProjectionMap;
-import org.apache.pig.impl.plan.RequiredFields;
-import org.apache.pig.impl.plan.VisitorException;
-import org.apache.pig.impl.logicalLayer.schema.Schema;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-
-public class LOStore extends RelationalOperator {
-    private static final long serialVersionUID = 2L;
-
-    private FileSpec mOutputFile;
-
-    // If we know how to reload the store, here's how. The lFile
-    // FileSpec is set in PigServer.postProcess. It can be used to
-    // reload this store, if the optimizer has the need.
-    private FileSpec mInputSpec;
-    
-    private String signature;
-
-    transient private StoreFuncInterface mStoreFunc;
-    private static Log log = LogFactory.getLog(LOStore.class);
-    private boolean isTmpStore;
-    
-    public boolean isTmpStore() {
-        return isTmpStore;
-    }
-
-    public void setTmpStore(boolean isTmpStore) {
-        this.isTmpStore = isTmpStore;
-    }
-
-    private SortInfo sortInfo;
-
-    public SortInfo getSortInfo() {
-        return sortInfo;
-    }
-
-    public void setSortInfo(SortInfo sortInfo) {
-        this.sortInfo = sortInfo;
-    }
-
-    /**
-     * @param plan
-     *            LogicalPlan this operator is a part of.
-     * @param key
-     *            OperatorKey for this operator
-     * @param outputFileSpec
-     *            the file to be stored
-     */
-    public LOStore(LogicalPlan plan, OperatorKey key,
-            FileSpec outputFileSpec, String alias) throws IOException {
-        super(plan, key);
-
-        mOutputFile = outputFileSpec;
-        isTmpStore = false;
-
-        // TODO
-        // The code below is commented out as PigContext pulls in
-        // HExecutionEngine which in turn is completely commented out
-        // Also remove the commented out import org.apache.pig.impl.PigContext
-
-        try { 
-             mStoreFunc = (StoreFuncInterface) PigContext.instantiateFuncFromSpec(outputFileSpec.getFuncSpec());
-             this.mAlias = alias;
-             this.signature = constructSignature(mAlias, outputFileSpec.getFileName(), mOutputFile.getFuncSpec());
-             mStoreFunc.setStoreFuncUDFContextSignature(this.signature);
-        } catch (Exception e) { 
-            IOException ioe = new IOException(e.getMessage()); 
-            ioe.setStackTrace(e.getStackTrace());
-            throw ioe; 
-        }
-    }
-    
-    public static String constructSignature(String alias, String filename, FuncSpec funcSpec) {
-        return alias+"_"+filename+"_"+funcSpec.toString();
-    }
-
-    public FileSpec getOutputFile() {
-        return mOutputFile;
-    }
-    
-    public void setOutputFile(FileSpec outputFileSpec) throws IOException {
-        try { 
-            mStoreFunc = (StoreFuncInterface) PigContext.instantiateFuncFromSpec(outputFileSpec.getFuncSpec()); 
-       } catch (Exception e) { 
-           IOException ioe = new IOException(e.getMessage()); 
-           ioe.setStackTrace(e.getStackTrace());
-           throw ioe; 
-       }
-       mOutputFile = outputFileSpec;
-    }
-
-    public StoreFuncInterface getStoreFunc() {
-        return mStoreFunc;
-    }
-
-    @Override
-    public String name() {
-        return getAliasString() + "Store " + mKey.scope + "-" + mKey.id;
-    }
-
-    @Override
-    public Schema getSchema() throws FrontendException {
-        //throw new RuntimeException("Internal error: Requested schema of a "
-         //       + "store operator.");
-        return mPlan.getPredecessors(this).get(0).getSchema();
-    }
-
-    @Override
-    public boolean supportsMultipleInputs() {
-        return false;
-    }
-
-    @Override
-    public boolean supportsMultipleOutputs() {
-        return true;
-    }
-
-    @Override
-    public void visit(LOVisitor v) throws VisitorException {
-        v.visit(this);
-    }
-
-    public void setInputSpec(FileSpec in) {
-        mInputSpec = in;
-    }
-
-    public FileSpec getInputSpec() {
-        return mInputSpec;
-    }
-    
-    @Override
-    public ProjectionMap getProjectionMap() {
-        
-        if(mIsProjectionMapComputed) return mProjectionMap;
-        mIsProjectionMapComputed = true;
-        
-        Schema outputSchema;
-        try {
-            outputSchema = getSchema();
-        } catch (FrontendException fee) {
-            mProjectionMap = null;
-            return mProjectionMap;
-        }
-        
-        Schema inputSchema = null;        
-        
-        List<LogicalOperator> predecessors = (ArrayList<LogicalOperator>)mPlan.getPredecessors(this);
-        if(predecessors != null) {
-            try {
-                inputSchema = predecessors.get(0).getSchema();
-            } catch (FrontendException fee) {
-                mProjectionMap = null;
-                return mProjectionMap;
-            }
-        }
-        
-        
-        if(Schema.equals(inputSchema, outputSchema, false, true)) {
-            //there is a one is to one mapping between input and output schemas
-            mProjectionMap = new ProjectionMap(false);
-            return mProjectionMap;
-        } else {
-            //problem - input and output schemas for a store have to match!
-            mProjectionMap = null;
-            return mProjectionMap;
-        }
-    }
-
-    @Override
-    public List<RequiredFields> getRequiredFields() {
-        List<RequiredFields> requiredFields = new ArrayList<RequiredFields>();
-        requiredFields.add(new RequiredFields(false, true));
-        return requiredFields;
-    }
-    
-    @Override
-    public List<RequiredFields> getRelevantInputs(int output, int column) throws FrontendException {
-        if (!mIsSchemaComputed)
-            getSchema();
-        
-        if (output!=0)
-            return null;
-        
-        if (column<0)
-            return null;
-        
-        // if we have schema information, check if output column is valid
-        if (mSchema!=null)
-        {
-            if (column >= mSchema.size())
-                return null;
-        }
-        
-        List<RequiredFields> result = new ArrayList<RequiredFields>();
-        result.add(new RequiredFields(true));
-        return result;
-    }
-    
-    @Override
-    public void setAlias(String newAlias) {
-        super.setAlias(newAlias);
-        signature = constructSignature(mAlias, mOutputFile.getFileName(), mOutputFile.getFuncSpec());
-        mStoreFunc.setStoreFuncUDFContextSignature(signature);
-    }
-    
-    public String getSignature() {
-        return signature;
-    }
-}